Flume源码分析(二)解析配置文件

 2017-04-23 20:23:32     Flume  Java   683


导读: Flume源码分析学习系列二,解析配置文件。详细内容,请阅读全文。

Flume的一个配置文件对应一个agentName。配置文件每行内容中,等号前的内容为key,等号后的内容value

key的组成格式为以下两种:

  • 1、agentName.组件类型。例如:a1.channels
  • 2、agentName.组件类型.组件名称.属性。例如:a1.sources.src-1.type

假设我们用Flume监控本地一个目录,将新增的文件内容写入HDFS上,配置文件内容如下:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = sin-1

# Describe/configure the source
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

# Use a channel which buffers events in memory
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 1000
a1.channels.ch-1.transactionCapacity = 100

# Describe the sink
a1.sinks.sin-1.type = hdfs
a1.sinks.sin-1.channel = ch-1
a1.sinks.sin-1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.sin-1.hdfs.filePrefix = events-
a1.sinks.sin-1.hdfs.round = true
a1.sinks.sin-1.hdfs.roundValue = 10
a1.sinks.sin-1.hdfs.roundUnit = minute

按照以上配置,我们看一下Flume是怎么对配置文件进行处理的。

从上一章的Flume源码分析(一)启动加载中知道,AbstractConfigurationProvider的getConfiguration方法至关重要。它完成了两件事情。

一是解析配置文件,最终的实现,都是在flume-ng-configuration module完成的。

二是根据配置文件内容,创建三大组件的实例对象。

public MaterializedConfiguration getConfiguration() {
    MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
    FlumeConfiguration fconfig = getFlumeConfiguration();//解析配置文件
    AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
    if (agentConf != null) {
        Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
        Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
        Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
        try {
            loadChannels(agentConf, channelComponentMap);//实例化channel
            loadSources(agentConf, channelComponentMap, sourceRunnerMap);//实例化source
            loadSinks(agentConf, channelComponentMap, sinkRunnerMap);//实例化sink
            ...
        } catch (InstantiationException ex) {
            LOGGER.error("Failed to instantiate component", ex);
        } finally {
            channelComponentMap.clear();
            sourceRunnerMap.clear();
            sinkRunnerMap.clear();
        }
    } else {
        LOGGER.warn("No configuration found for this host:{}", getAgentName());
    }
    return conf;
}

先看一下配置文件解析的类图。

Flume三大组件类图

解析配置


AbstractConfigurationProvider的getFlumeConfiguration抽象方法负责,最终实现由PropertiesFileConfigurationProvider完成。从Flume源码分析(一)启动加载中的整体类图可以看出来。

PropertiesFileConfigurationProvider

解析配置文件的目的,是将配置信息中的agentName、source、channel、sink、sinkGroup转换对应的实例,然后,再将这些实例封装成FlumeConfiguration对象。

操作步骤如下:

  • 1、将配置文件转成Properties对象
  • 2、将Properties转成Map<String, String>对象
  • 3、将Map<String, String>转成Map<String, AgentConfiguration>,即FlumeConfiguration对象

以上三个步骤,最重要的是将Map<String, String>转成Map<String, AgentConfiguration>。是在FlumeConfiguration类中完成的。

public FlumeConfiguration(Map<String, String> properties) {
    agentConfigMap = new HashMap<String, AgentConfiguration>();
    errors = new LinkedList<FlumeConfigurationError>();
    // Construct the in-memory component hierarchy
    for (String name : properties.keySet()) {
        String value = properties.get(name);

        if (!addRawProperty(name, value)) {
            logger.warn("Configuration property ignored: " + name + " = " + value);
        }
    }
    // Now iterate thru the agentContext and create agent configs and add them
    // to agentConfigMap

    // validate and remove improperly configured components
    validateConfiguration();
}

具体操作由方法addRawProperty完成,addRawProperty完成了agentName的解析赋值,其他的解析最终由其内部类AgentConfigurationaddProperty方法实现。

AgentConfiguration

AgentConfiguration有以下属性。

String: agentName、sources、sinks、channels、sinkgroups;
Map<String, ComponentConfiguration>: sourceConfigMap、sinkConfigMap、channelConfigMap、sinkgroupConfigMap;
Map<String, Context>: sourceContextMap、sinkContextMap、channelContextMap、sinkGroupContextMap;
Set<String>: sinkSet、sourceSet、channelSet、sinkgroupSet。

agentName

FlumeConfiguration类中的addRawProperty方法中,直接对key进行了字符串截取,得到agentName等于a1。

sources

key去掉agentName和分割符后,剩余内容直接进行判断,如果等于CONFIG_SOURCES,则将value赋值给属性sources变量。如果value是多个组件的话(例如:a1.channels = c1,c2,c3),同样也是直接赋值,不在此处做分开处理。

if (key.equals(BasicConfigurationConstants.CONFIG_SOURCES)) {
    if (sources == null) {
        sources = value;
        return true;
    } else {
        logger
                .warn("Duplicate source list specified for agent: " + agentName);
        errorList.add(new FlumeConfigurationError(agentName,
                BasicConfigurationConstants.CONFIG_SOURCES,
                FlumeConfigurationErrorType.DUPLICATE_PROPERTY,
                ErrorOrWarning.ERROR));
        return false;
    }
}

sinks、channels、sinkgroups的处理同sources。

sourceContextMap

sourceConfigMapMap<String, ComponentConfiguration>类型,主要描述各组件的属性信息。

只要key格式符合<agentName>.sources.<sourceName>.<component-name>.<config-key>的,都会记录到ComponentNameAndConfigKey类中。ComponentNameAndConfigKey类并没有记录属性对应的值。

private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
    // key must start with prefix
    if (!key.startsWith(prefix)) {
        return null;
    }

    // key must have a component name part after the prefix of the format:
    // <prefix><component-name>.<config-key>
    int index = key.indexOf('.', prefix.length() + 1);

    if (index == -1) {
        return null;
    }

    String name = key.substring(prefix.length(), index);
    String configKey = key.substring(prefix.length() + name.length() + 1);

    // name and config key must be non-empty
    if (name.length() == 0 || configKey.length() == 0) {
        return null;
    }

    return new ComponentNameAndConfigKey(name, configKey);
}

生成了ComponentNameAndConfigKey实例后,再生成Context实例,将组件的属性和属性值赋值到Context

最后,将组件的名称和Context一起,赋值到Map<String, Context>,形成sourceContextMap

从中可以看出,Map<String, Context>的key存放的是组件的名称,value为该组件所有的属性和对应值。

if (cnck != null) {
    // it is a source
    String name = cnck.getComponentName();
    Context srcConf = sourceContextMap.get(name);

    if (srcConf == null) {
        srcConf = new Context();
        sourceContextMap.put(name, srcConf);
    }

    srcConf.put(cnck.getConfigKey(), value);
    return true;
}

sinkContextMap、channelContextMap、sinkGroupContextMap的处理同sourceContextMap。

sourceConfigMap

完成Map<String, Context>属性的赋值后,FlumeConfiguration会调用validateConfiguration方法来验证配置信息是否合法。在验证的过程中,合法的配置,会将信息赋值到SourceConfiguration

SourceConfiguration继承了ComponentConfigurationComponentConfiguration是所有组件配置的父类,它包含了两个基本属性:组件名称componentName和组件类型type。这也是所有组件都必须具备的属性。

各组件独特的配置,由子类进行扩展。

所以,sourceConfigMap中,key为source的名称,value为ComponentConfiguration的子类SourceConfiguration

sinkConfigMap、channelConfigMap、sinkgroupConfigMap的处理同sourceConfigMap;

sourceSet

在验证的过程中,会对sourceConfigMapsourceContextMap的所有key进行遍历,重新获取各组件的名称,然后生成sourceSet

所以,sourceSet为配置文件中所有source的集合。并会将集合重新生成字符串赋值给sources

所以,如果a1.sources的配置不正确,但后面其他相关source配置正确,程序则会纠正过来。(从程序上看是这样的,实际没有验证。)

sinkSet、channelSet、sinkgroupSet处理同sourceSet。

获取到了配置文件的内容后,就可以进行实例化对象了。