2017-04-23 20:23:32 Flume Java 914
导读:
Flume源码分析学习系列一,服务启动整体分析。详细内容,请阅读全文。
先从Github上下载源码到本地,这里我用的1.8版本,源码地址为:
https://github.com/apache/flume
导入idea,通过maven进行编译。
mvn clean
mvn package -DskipTests
分析之前,先看一下各module
的作用:
module | 说明 |
---|---|
flume-ng-core | 核心模块 |
flume-ng-sources | source组件的实现 |
flume-ng-channels | channel组件的实现 |
flume-ng-sinks | sink组件的实现 |
flume-ng-tools | 工具模块 |
flume-ng-node | agent节点启动入口 |
flume-ng-configuration | 配置文件处理模块 |
flume-ng-auth | 安全认证模块 |
flume-ng-clients | 客户端模块,目前只有日志功能 |
flume-ng-dist | 利用maven-assembly-plugin 组织项目结构 |
flume-ng-embedded-agent | 应用程序嵌入flume agent所需要的API |
flume-checkstyle | Java代码规范检查 |
flume-ng-sdk | Flume开发API |
flume-ng-tests | 集成测试 |
flume-shared | 共享工具 |
先从最普通的agent开始,查看Flume的结构。
启动命令:
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
启动脚本:
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
....
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
从run_flume
函数可以看出,org.apache.flume.node.Application
为启动入口类。
我们先来看整体的一个类图,从结构上做一个大体的理解。
Application类
下图为Application
的类描述。
从上面的整体类图可以看出,最主要的是PollingPropertiesFileConfigurationProvider
和PropertiesFileConfigurationProvider
类,它们同时继承了AbstractConfigurationProvider
类。
PropertiesFileConfigurationProvider
负责加载配置文件信息,获取FlumeConfiguration
对象。
PollingPropertiesFileConfigurationProvider
继承了PropertiesFileConfigurationProvider
类,本身职责是自动加载配置文件。当修改了配置文件后,不需要手动重启agent,Flume会自动重新加载配置文件。
AbstractConfigurationProvider
主要负责完成三大组件的加载和初始化。
首先,通过commons-cli
解析命令行参数,并获取配置文件路径。
根据命令行参数no-reload-conf
判断是否需要开启自动加载功能,不加该参数,默认为开启自动加载。
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
...
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
PollingPropertiesFileConfigurationProvider类
在初始化PollingPropertiesFileConfigurationProvider
时。用到了Guava EventBus
的发布订阅功能。将Application
注册到EventBus
。
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
在PollingPropertiesFileConfigurationProvider
类中,创建了一个FileWatcherRunnable
内部类线程,该线程每隔30秒会检测配置文件是否发生变化,如果发生变化,会将新的配置信息发布到EventBus
。
public void run() {
LOGGER.debug("Checking file:{} for changes", file);
counterGroup.incrementAndGet("file.checks");
long lastModified = file.lastModified();
if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);
counterGroup.incrementAndGet("file.loads");
lastChange = lastModified;
try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}
- 注意
1、这里PollingPropertiesFileConfigurationProvider
类调用了父类AbstractConfigurationProvider
的getConfiguration方法,完成配置文件的加载和组件的实例化。这个方法至关重要。
2、如果不是no-reload-conf
自动加载方式,则是PropertiesFileConfigurationProvider
类调用父类AbstractConfigurationProvider
的getConfiguration方法。
在Application
类中,通过注解的方式订阅了EventBus
中的事件。当FileWatcherRunnable
发布新的消息后,会触发订阅方法,完成服务的重启。
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
}
MonitorService
在Application
类中,还定义了监控服务对象,用于监控agent,该服务也是根据配置文件信息进行初始化的。如果不配置,则没有监控信息。Flume官方建议使用GangliaServer
类,通过Ganglia
进行监控。
关闭服务
Flume并没有stop这样的命令来停止服务,而是需要手动kill。而手动kill会带来一些问题,就是怎么处理运行中的数据(例如内存中的数据)。
Flume用到了线程的addShutdownHook方法,该方法的作用是在jvm中放一个hook,在退出程序时,触发该hook。经常用来清理内存,资源回收操作。我们来看看代码。
注意:
- 退出是指ctrl+c或者kill -15,但如果用kill -9,就无法触发hook。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
}
});
这行代码的作用是在关闭jvm前,执行agent的stop,至于stop方法里面做了什么,后期再分析。
参考