Flume源码分析(四)Event数据流程图

 2017-09-30 18:23:02     Flume  Java  开源框架   1041


导读: 刚开始梳理这篇内容时,赶上比较忙的业务,一直就闲搁了,最近才有时间,既然花了时间,再放弃或遗忘,就浪费了。如是便重新整理,以便以后用到时可以快速回顾。

FlumeEvent 是各组件传输数据的载体。通过分析 Event 的数据流,可以更详细的分析 Flume 的各个组件。

先看一下 Event 的大体的流程图。

Event数据流程图

大体流程说明:

  • 启动 Flume Agent,加载配置,初始化实例。
  • Source 接收外部数据源,并将其转化为 Event 对象。
  • SourceEvent 传递给 ChannelProcessor,在 ChannelProcessor 中完成 Event 的拦截处理,Channel 的选择和将 Event put到 Channel中。
  • Sinkprocess 方法中,完成从 Chanenel 中取出 Event,并将其发送到目标组件。

这里只以 ExecSourceMemoryChannelKafkaSink 分析为例,其它的应该大同小异。

ExecSource


启动时,创建单例线程池 ExecutorService,线程池中的线程为其子类 ExecRunnableExecRunnable 初始化时,会根据初始化配置,获取 ChannelProcessor 的实例。

executor = Executors.newSingleThreadExecutor();
//shell、command等参数,为flume配置文件中的属性值
//例如,command对应的是agentName.sources.sourceName.command = tail -F /path/xx.log
runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart,
        restartThrottle, logStderr, bufferCount, batchTimeout, charset);

线程 ExecRunnable 的执行过程如下。

首先,初始化一个 ScheduledExecutorService,将 ExecRunnable 的实例化线程加入其中。保证 ExecRunnable 可以定时执行。

final List<Event> eventList = new ArrayList<Event>();

timedFlushService = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactoryBuilder().setNameFormat(
                "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());

然后,初始化 final 类型的 ArrayList<Event>(eventList实例),用来存放 Source 获取到的event,此处创建的 Event 都是 SimpleEvent 类型,headers为空。如果需要根据headers进行业务标识,需要自定义实现 Event

ExecSource 中,Source 获取外部数据是通过 ProcessBuilder 实现的。ProcessBuilder 使 Java 平台可以执行操作系统的Shell命令,并把Shell的输出数据流,输入到JVM,从而在 Java 平台可以获取到本地文件的数据。

if (shell != null) {
    String[] commandArgs = formulateShellCommand(shell, command);
    process = Runtime.getRuntime().exec(commandArgs);
} else {
    String[] commandArgs = command.split("\\s+");
    process = new ProcessBuilder(commandArgs).start();
}
reader = new BufferedReader(
        new InputStreamReader(process.getInputStream(), charset));

do something...

while ((line = reader.readLine()) != null) {
    sourceCounter.incrementEventReceivedCount();
    synchronized (eventList) {
        //此处创建Event
        eventList.add(EventBuilder.withBody(line.getBytes(charset)));
        //bufferCount和timeout为source的batchSize、batchTimeout配置
        if (eventList.size() >= bufferCount || timeout()) {
            flushEventBatch(eventList);
        }
    }
}

最后,将当前eventList实例传递到 ChannelProcessor(执行 flushEventBatch 方法),并清空eventList。由 ChannelProcessor 负责将eventList发送给 Channel

在满足以下三种条件的任何一种,便会执行 flushEventBatch 方法

  • 1、达到 ScheduledExecutorService 的定时时间
  • 2、达到 Flume 配置文件中 SourcebatchSizebatchTimeout
  • 3、以上代码执行完,eventList仍不为空

ChannelProcessor


ChannelProcessor 将 eventList 发送到 Channel 的过程如下:

第一步:将 Event 经过拦截链处理。

第二步:获取对应的 channel 实例,并以 key-value 方式将 Channel 和 eventList置入 Map(由 ReplicatingMultiplexing 的配置决定)。从 Channel 实例中 new 一个 Transaction 子类。

第三步:执行 Channel 的事务操作,将eventList发送到 Channel

Transaction tx = channel.getTransaction();
try {
    tx.begin();

    List<Event> batch = optChannelQueue.get(optChannel);

    for (Event event : batch) {
        optChannel.put(event);
    }

    tx.commit();
} catch (Throwable t) {
    tx.rollback();
    LOG.error("Unable to put batch on optional channel: " + optChannel, t);
    if (t instanceof Error) {
        throw (Error) t;
    }
} finally {
    if (tx != null) {
        tx.close();
    }
}

MemoryChannel


ChannelProcessor 只是调用了 ChanneldoPutdoCommit 方法,数据传输的具体操作是在Channel 完成的。

在每次实例化事务时,会初始化 putListtakeList 两个双向并发阻塞队列。

@Override
protected BasicTransactionSemantics createTransaction() {
    return new MemoryTransaction(transCapacity, channelCounter);
}

public MemoryTransaction(int transCapacity, ChannelCounter counter) {
    putList = new LinkedBlockingDeque<Event>(transCapacity);
    takeList = new LinkedBlockingDeque<Event>(transCapacity);

    channelCounter = counter;
}

doPut 是对 Channelput 方法具体实现。作用就是将 ChannelProcessor 发送过来的 Event 临时粗放到 putList

putList 的初始化是指定了队列大小的(transactionCapacity属性),所以,当出现以下异常时,需要将此处值调大一点。建议将此参数适当调大一点。

ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

doTake 是对 Channeltake 方法具体实现。作用就是从 queue 中消费一个 event,并将该 event 放入 takeList,用作回滚时使用。

doCommit 是对 Transactioncommit 方法具体实现。在执行 doCommit 时,会判断当前 Channel 的所有 putList 中body字节是否超出指定大小和 queue 空间是否富裕。否则,就会抛出异常。

int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
    if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
        throw new ChannelException("Cannot commit transaction. Byte capacity " +
                "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
                "reached. Please increase heap space/byte capacity allocated to " +
                "the channel as the sinks may not be keeping up with the sources");
    }
    if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
        bytesRemaining.release(putByteCounter);
        throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
                " Sinks are likely not keeping up with sources, or the buffer size is too tight");
    }
}

对应的配置参数如下:

capacity: queue队列的大小
byteCapacityBufferPercentage: 所有event中header占用byteCapacity的比例
byteCapacity: 当前channel中所有putList的body允许占用的最大字节数,该值与byteCapacityBufferPercentage和byteCapacity的设置有关

如果不设置 byteCapacity,则默认为JVM字节数的0.8。但是,它会被重新计算赋值:

byteCapacity = (int) ((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01)) / byteCapacitySlotSize);

举例说明:

假设当前 byteCapacity 设置为100000000,byteCapacityBufferPercentage 按照默认设置为20,则最终的 byteCapacity 等于800000:

(100000000 * (1 - 20 * 0.01))/100 = 800000

所以,当所用的 event 中,header没有数据,或数据内容较小时,可以考虑将 byteCapacityBufferPercentage 的值调小。

当满足空间时,会将 putList 中的 event 放入 queue,同时从 putList 中移除 event

MemoryChannel 中,doCommit 是被 ChannelProcessorSink 公用的,只不过在 Sink 调用时,只用来清空实例,并未做其它操作。

doRollback 是对 Transactionrollback 方法具体实现。用作 commit失败时,回滚 event

MemoryChannel 中,doRollback 只对 Sink 组件 commit 失败时有效。

synchronized (queueLock) {
    Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
            "Not enough space in memory channel " +
                    "queue to rollback takes. This should never happen, please report");
    while (!takeList.isEmpty()) {
        queue.addFirst(takeList.removeLast());
    }
    putList.clear();
}

KafkaSink


通过Flume源码分析(三)实例化组件对象分析,知道是由 SinkRunner 来完成具体 Sink 实现类的的启动的。在 SinkRunner 中,调用了 SinkProcessorprocess 方法,而在 SinkProcessor 的实现类中,会调用 Sinkprocess 方法。

KafkaSink 在启动时,就会事先创建好 kafka 的生产者。在执行 process 时,选获取对应的 Channel,根据Channel创建一个事务,在该事务中完成从Channel中 take event,然后,将event批量发送到kafka`,最后提交完成事务。

以下为简化代码:

Channel channel = getChannel();
Transaction transaction = null;

try {
    transaction = channel.getTransaction();
    transaction.begin();

    #batchSize为配置文件中的flumeBatchSize属性
    for (; processedEvents < batchSize; processedEvents += 1) {
        event = channel.take();
        ProducerRecord<String, byte[]> record;
        if (partitionId != null) {
            record = new ProducerRecord<String, byte[]>(eventTopic, partitionId, eventKey,
            serializeEvent(event, useAvroEventFormat));
        } else {
            record = new ProducerRecord<String, byte[]>(eventTopic, eventKey,
                    serializeEvent(event, useAvroEventFormat));
        }
        kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
    }
} catch (Exception ex) {
    ...
} finally {
    transaction.rollback();
    transaction.close();
}

这样,event在一个 agent 的整个流程就完成。


参考