Flume源码分析(一)启动

 2017-04-23 20:23:32     Flume  Java   914


导读: Flume源码分析学习系列一,服务启动整体分析。详细内容,请阅读全文。

编译


先从Github上下载源码到本地,这里我用的1.8版本,源码地址为:

https://github.com/apache/flume

导入idea,通过maven进行编译。

mvn clean
mvn package -DskipTests

分析


分析之前,先看一下各module的作用:

module 说明
flume-ng-core 核心模块
flume-ng-sources source组件的实现
flume-ng-channels channel组件的实现
flume-ng-sinks sink组件的实现
flume-ng-tools 工具模块
flume-ng-node agent节点启动入口
flume-ng-configuration 配置文件处理模块
flume-ng-auth 安全认证模块
flume-ng-clients 客户端模块,目前只有日志功能
flume-ng-dist 利用maven-assembly-plugin组织项目结构
flume-ng-embedded-agent 应用程序嵌入flume agent所需要的API
flume-checkstyle Java代码规范检查
flume-ng-sdk Flume开发API
flume-ng-tests 集成测试
flume-shared 共享工具

先从最普通的agent开始,查看Flume的结构。

启动命令:

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

启动脚本:

FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

....

run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}

run_flume函数可以看出,org.apache.flume.node.Application为启动入口类。

我们先来看整体的一个类图,从结构上做一个大体的理解。

Flume启动类图

Application类

下图为Application的类描述。

Flume启动类

从上面的整体类图可以看出,最主要的是PollingPropertiesFileConfigurationProviderPropertiesFileConfigurationProvider类,它们同时继承了AbstractConfigurationProvider类。

PropertiesFileConfigurationProvider 负责加载配置文件信息,获取FlumeConfiguration对象。

PollingPropertiesFileConfigurationProvider 继承了PropertiesFileConfigurationProvider 类,本身职责是自动加载配置文件。当修改了配置文件后,不需要手动重启agent,Flume会自动重新加载配置文件。

AbstractConfigurationProvider 主要负责完成三大组件的加载和初始化。

首先,通过commons-cli解析命令行参数,并获取配置文件路径。

根据命令行参数no-reload-conf判断是否需要开启自动加载功能,不加该参数,默认为开启自动加载。

Options options = new Options();

Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);

...

CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);

if (commandLine.hasOption('h')) {
    new HelpFormatter().printHelp("flume-ng agent", options, true);
    return;
}

String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");

PollingPropertiesFileConfigurationProvider

在初始化PollingPropertiesFileConfigurationProvider 时。用到了Guava EventBus的发布订阅功能。将Application 注册到EventBus

if (reload) {
    EventBus eventBus = new EventBus(agentName + "-event-bus");
    PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(
                    agentName, configurationFile, eventBus, 30);
    components.add(configurationProvider);
    application = new Application(components);
    eventBus.register(application);
} else {
    PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName, configurationFile);
    application = new Application();
    application.handleConfigurationEvent(configurationProvider.getConfiguration());
}

PollingPropertiesFileConfigurationProvider 类中,创建了一个FileWatcherRunnable内部类线程,该线程每隔30秒会检测配置文件是否发生变化,如果发生变化,会将新的配置信息发布到EventBus

public void run() {
    LOGGER.debug("Checking file:{} for changes", file);

    counterGroup.incrementAndGet("file.checks");

    long lastModified = file.lastModified();

    if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
            eventBus.post(getConfiguration());
        } catch (Exception e) {
            LOGGER.error("Failed to load configuration data. Exception follows.",
                    e);
        } catch (NoClassDefFoundError e) {
            LOGGER.error("Failed to start agent because dependencies were not " +
                    "found in classpath. Error follows.", e);
        } catch (Throwable t) {
            // caught because the caller does not handle or log Throwables
            LOGGER.error("Unhandled error", t);
        }
    }
}
  • 注意
    1、这里PollingPropertiesFileConfigurationProvider类调用了父类AbstractConfigurationProvider的getConfiguration方法,完成配置文件的加载和组件的实例化。这个方法至关重要。
    2、如果不是no-reload-conf自动加载方式,则是PropertiesFileConfigurationProvider类调用父类AbstractConfigurationProvider的getConfiguration方法。

Application 类中,通过注解的方式订阅了EventBus中的事件。当FileWatcherRunnable发布新的消息后,会触发订阅方法,完成服务的重启。

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
}

MonitorService

Application 类中,还定义了监控服务对象,用于监控agent,该服务也是根据配置文件信息进行初始化的。如果不配置,则没有监控信息。Flume官方建议使用GangliaServer类,通过Ganglia进行监控。

关闭服务

Flume并没有stop这样的命令来停止服务,而是需要手动kill。而手动kill会带来一些问题,就是怎么处理运行中的数据(例如内存中的数据)。

Flume用到了线程的addShutdownHook方法,该方法的作用是在jvm中放一个hook,在退出程序时,触发该hook。经常用来清理内存,资源回收操作。我们来看看代码。

注意:

  • 退出是指ctrl+c或者kill -15,但如果用kill -9,就无法触发hook。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
    @Override
    public void run() {
        appReference.stop();
    }
});

这行代码的作用是在关闭jvm前,执行agent的stop,至于stop方法里面做了什么,后期再分析。


参考