Flume源码分析(三)实例化组件对象

 2017-05-06 19:23:02     Flume  Java   683


导读: Flume源码分析学习系列三,实例化组件对象,并启动各组件服务。详细内容,请阅读全文。

在分析之前,先看一下三大组件的大体类图。

Flume三大组件类图

三大组件的实例化分别在loadSourcesloadChannelsloadSinks方法中完成。

实例化Source组件


loadSources

AgentConfiguration类中获取配置信息,通过DefaultSourceFactory来完成实例对象的创建。

public Class<? extends Source> getClass(String type) throws FlumeException {
    String sourceClassName = type;
    SourceType srcType = SourceType.OTHER;
    try {
        srcType = SourceType.valueOf(type.toUpperCase(Locale.ENGLISH));
    } catch (IllegalArgumentException ex) {
        logger.debug("Source type {} is a custom type", type);
    }
    if (!srcType.equals(SourceType.OTHER)) {
        sourceClassName = srcType.getSourceClassName();
    }
    try {
        return (Class<? extends Source>) Class.forName(sourceClassName);
    } catch (Exception ex) {
        throw new FlumeException("Unable to load source type: " + type
                + ", class: " + sourceClassName, ex);
    }
}

根据配置文件中a1.sources.src-1.type的配置,匹配枚举类中的org.apache.flume.source.SpoolDirectorySource类,进行初始化。


Source实现类需要引用ChannelProcessorChannelProcessor有两个作用:

作用一:选择器ChannelSelector

用来控制Event put 到Channel的操作,这个控制具体实现是由ChannelSelector完成的,根据配置文件中a1.sources.r1.selector.type的配置,由ChannelSelectorFactory创建ChannelSelector

目前,有两种Selector,如果没有配置,默认为REPLICATING

REPLICATING -> org.apache.flume.channel.ReplicatingChannelSelector
MULTIPLEXING -> org.apache.flume.channel.MultiplexingChannelSelector

REPLICATING作用:Event 发送到每一个对应的 channel,每个 channel 都有完整的一份。
MULTIPLEXING作用: 根据Event的header参数选择一个channel(类似hash算法),把 Event 发送到设置的映射的 channel

作用二:拦截链InterceptorChain

ChannelProcessor还有一个功能,那就是对Event拦截,在ChannelProcessor中定义了拦截器链InterceptorChain

如果在配置文件中定义了拦截,ChannelProcessor在将Event put到 Channel 前,会先调用拦截器,经过拦截处理后,再将符合要求的Event put到 Channel

例如,我们根据正则表达式过滤敏感单词。

a1.sources.src-1.interceptors = regex
a1.sources.src-1.interceptors.regex.type = REGEX_FILTER
a1.sources.src-1.interceptors.regex.regex= (word1)|(word2)
a1.sources.src-1.interceptors.regex.excludeEvents = false

当然,也可以自定义拦截器来实现拦截功能。

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type =  com.interceptor.A
a1.sources.r1.interceptors.i2.type = com.interceptor.B

那么,到底是怎么启动 Source 的呢?

启动Source组件


从上面的类图中可以看出,SourceRunner中调用了ChannelProcessor,而SourceRunnerEventDrivenSourceRunnerPollableSourceRunner两个子类。

EventDrivenSourceRunner的代码来看调用方式。

public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
}

那么,又是在哪个地方调用了 SourceRunner 的start方法了。

这时候,我们需要回头看一下Flume源码分析(一)中提到的Application类的startAllComponents方法。

private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
    logger.info("Starting new configuration:{}", materializedConfiguration);

    this.materializedConfiguration = materializedConfiguration;

    for (Entry<String, Channel> entry :
            materializedConfiguration.getChannels().entrySet()) {
        try {
            logger.info("Starting Channel " + entry.getKey());
            supervisor.supervise(entry.getValue(),
                    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
        } catch (Exception e) {
            logger.error("Error while starting {}", entry.getValue(), e);
        }
    }

/*
 * Wait for all channels to start.
 */
    for (Channel ch : materializedConfiguration.getChannels().values()) {
        while (ch.getLifecycleState() != LifecycleState.START
                && !supervisor.isComponentInErrorState(ch)) {
            try {
                logger.info("Waiting for channel: " + ch.getName() +
                        " to start. Sleeping for 500 ms");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                logger.error("Interrupted while waiting for channel to start.", e);
                Throwables.propagate(e);
            }
        }
    }

    for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
        try {
            logger.info("Starting Sink " + entry.getKey());
            supervisor.supervise(entry.getValue(),
                    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
        } catch (Exception e) {
            logger.error("Error while starting {}", entry.getValue(), e);
        }
    }

    for (Entry<String, SourceRunner> entry :
            materializedConfiguration.getSourceRunners().entrySet()) {
        try {
            logger.info("Starting Source " + entry.getKey());
            supervisor.supervise(entry.getValue(),
                    new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
        } catch (Exception e) {
            logger.error("Error while starting {}", entry.getValue(), e);
        }
    }

    this.loadMonitoring();
}

从代码中我们可以看出:

首先,三大组件的启动顺序是:channel、sink、source。

其次,各组件的Runner类负责启动各组件。

具体看下面这行代码。

supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START)

SourceRunner实例传给了supervise方法。

MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;//各组件Runner类的实例
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;

supervisedProcesses.put(lifecycleAware, process);

ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);

MonitorRunnable的run方法中,真正完成了组件的start。

switch (supervisoree.status.desiredState) {
    case START:
        try {
            lifecycleAware.start();//调用各组件Runner类实例的start方法
        } catch (Throwable e) {
            ...
            supervisoree.status.failures++;
        }
        break;
    case STOP:
        try {
            lifecycleAware.stop();
        } catch (Throwable e) {
            ...
            supervisoree.status.failures++;
        }
        break;
    default:
        logger.warn("I refuse to acknowledge {} as a desired state",
                supervisoree.status.desiredState);
}

至于source的start方法具体做了什么,根据业务,每个实现类都是不同的。

启动总结


回顾以下的分析,可以将agent的启动简单归为以下几步:

  • 1、Application解析启动命令;
  • 2、PropertiesFileConfigurationProvider完成配置文件的解析、组件实例化和组件Runner类实例化;
  • 3、MonitorRunnable负责各组件的启动调用;
  • 4、ApplicationPropertiesFileConfigurationProviderMonitorRunnable的调用,完成agent服务启动。

loadChannels和loadSinks的实现,与loadSources的逻辑大体相同,不再详细分析。