Flume监控

最近在对服务做压力测试时,发现当 QPS 达到很高的一个值时,Flume 会出现报错。为了排查原因,一边结合 Telegraf 的数据收集,给 Flume 加上监控和报警;一边查看 Flume 代码。

异常排查

当 QPS 压到3w时,Flume出现一下错误信息。

09 Jun 2017 12:06:30,703 ERROR [pool-3-thread-1] (org.apache.flume.source.ExecSource$ExecRunnable.run:352)  - Failed while running command: tail -F /work/app/nginx-app/data/logs/log_192.168.1.1.log
org.apache.flume.ChannelException: Cannot commit transaction. Byte capacity allocated to store event body 1200000.0reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:120)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
    at org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:381)
    at org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:341)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
09 Jun 2017 12:06:30,724 INFO  [pool-3-thread-1] (org.apache.flume.source.ExecSource$ExecRunnable.run:375)  - Command [tail -F /work/app/nginx-app/data/logs/log_192.168.1.1.log] exited with 141

根据错误信息描述,是 source 接收端的容量达到上线,无法再接收数据,直接 kill 掉了 tail 命令。

在排查问题之前,先了解一下 fluem 配置信息,以及 event 在三大组件中的数据流程。

flume 的配置。

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /work/app/logs/applog.log
a1.sources.s1.batchSize = 1000
a1.sources.s1.batchTimeout = 3000
a1.sources.s1.channels = c1


a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = kafka02:9092,kafka02:9092,kafka03:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

主要参数说明:

属性名称默认值含义
capacity100存储在channel中的最大event数量,即 queue 的大小
transactionCapacity100source 将 event put 到 channel,或者 sink 从 channel take event 的每次最大事物数。
putList和takeList的大小。
transactionCapacity 必须不大于 capacity
keep-alive3添加或移除 event 的超时时间
byteCapacityBufferPercentage20所有 event 的 header 字节数占 byteCapacity 的百分比
byteCapacityJVM的80%channel 中允许所有 event 字节数之和的最大值(只计算 event 的 body 字节数,不包含 header)。
defaultByteCapacity等于JVM可用最大内存的80%(-Xmx的80%)。
如果定义了byteCapacity,则 实际byteCapacity = 定义的byteCapacity * (1- Event header百分比) / byteCapacitySlotSize 。byteCapacitySlotSize 默认等于100
flumeBatchSize100KafkaSink 一次事物发送的 event 数

event 的大体数据流程。

source --doPut()--> putList --doCommit()--> queue --doTake()--> takeList / sink

source 执行 channel 的 doPut 方法,将接收到的 event 先放入 putList 临时缓存起来,当达到一定量和时间时,会执行 doCommit 方法,将 putlist 中的 Event 放入 channel 的 queue

sink 执行 process 方法时,会调用 channel 的 doTake 方法,将取出的 event 放入 takeList,同时,将 event 发送到下游。(这里并不是先把 event 放入 takeList,再从 takeList 取数据发送到下游。 而是为了事物回滚,当 event 发送到下游失败时,可以将 takeList 中的数据全部放回到 queue 。)

channel 在执行 doCommit 时,会对存储容量进行判断。

int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
    if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
        throw new ChannelException("Cannot commit transaction. Byte capacity " +
                "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
                "reached. Please increase heap space/byte capacity allocated to " +
                "the channel as the sinks may not be keeping up with the sources");
    }
    if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
        bytesRemaining.release(putByteCounter);
        throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
                " Sinks are likely not keeping up with sources, or the buffer size is too tight");
    }
}

当 takeList.size() - putList.size() < 0,表示放入的 event 比取出的多,如果在 keepAlive 的时间内, putList 中 所有 event 的 body 字节总数在 byteCapacity 的容量内没有可用的的空间时,则抛出上面的异常。

以上内容最直接的解释就是,channel 中 event 占用太多内存,空间不够用。

通过以上分析,知道了错误原因。只需要调整一下配置参数应该就可以解决这个问题。

1、先调整 flume-env.sh 中 agent 默认初始化的内存。

export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"

2、修改 agent 的配置参数。

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /work/app/logs/applog.log
a1.sources.s1.batchSize = 1000 #每次doput的容量,或者source读取多少event时,发送到channel
a1.sources.s1.batchTimeout = 3000 #每隔多长时间,批量将event发送到channel
a1.sources.s1.channels = c1


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000  //增大queue容量
a1.channels.c1.transactionCapacity = 100000 //putlist和takelist的容量
a1.channels.c1.byteCapacityBufferPercentage = 10 //减少header占用jvm的比例
#a1.channels.c1.byteCapacity = 800000 注释掉,采用flume-env.sh中的配置

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = kafka02:9092,kafka02:9092,kafka03:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000 //增大sink批量操作的大小,提高处理速度
a1.sinks.k1.kafka.producer.acks = 1

重新压测,查看日志,一切正常。

监控

问题虽然解决了,但是并不能保证以后不会再出问题。所以,必须加上监控,并根据监控增加应对措施。

先看看 flume 支持的监控方式。

JMX Reporting

可以通过使用 flume-env.sh 在 JAVA_OPTS 环境变量中指定 JMX 参数来启用JMX报告。

例如:

JAVA_OPTS = "-Dcom.sun.management.jmxremote 
    -Dcom.sun.management.jmxremote.authenticate=false 
    -Dcom.sun.management.jmxremote.ssl=false
    -Dcom.sun.management.jmxremote.port=5445 
    -Dcom.sun.management.jmxremote.rmi.port=5446
    -Djava.rmi.server.hostname=192.168.16.214"

可以监控 Flume 内存、线程、类、CPU等使用情况。

Ganglia Reporting

Flume 可以将监控指标报告发送给 Ganglia 3 或 Ganglia 3.1 metanodes。要将 metrics 报告到 Ganglia,必须在启动的时候就支持 Flume Agent

 bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON Reporting

Flume 也可以以 JSON 格式报告指标。要使用 JSON 格式报告,Flume 会在可配置端口上托管 Web 服务器。

使用方式:

bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

服务以下列 JSON 格式报告指标:

{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}

自定义报告

自定义的监控需要实现 org.apache.flume.instrumentation.MonitorService 接口。这个不是本次研究的重点,就不详细说明了。

通过比较,决定先用 JSON 的方式,因为很容易和 Telegraf 进行结合使用。当然,JVM 方式也可以,打算后期再加上。

Flume + Telegraf

启动 agent 时,加上监控参数。

./bin/flume-ng agent -n a1 -c conf -f conf/flume-test.properties -Dflume.root.logger=INFO,DAILY -Dflume.log.dir=/work/app/flume/logs -Dflume.log.file=flume-test.log -DappFlag=flume-test -Dflume.monitoring.type=http -Dflume.monitoring.port=1100 &

查看监控数据。通过 curl http://localhost:1100/metrics 得到以下数据。

 {
  "SOURCE.s1": {
    "StopTime": "0",                  //source组件停止时间
    "AppendBatchReceivedCount": "0",  //source端刚刚追加的批量的数量,比如一批100,该度量为2,就是source端收到了200个events
    "EventReceivedCount": "4913189",  //source端累计收到的event数量
    "AppendBatchAcceptedCount": "0",  //source端追加到channel的数量
    "Type": "SOURCE",
    "EventAcceptedCount": "4912519",  //source端累计成功放入channel的event数量
    "AppendReceivedCount": "0",       //source端刚刚追加的且目前已收到的event数量
    "StartTime": "1496839492821",     //source组件启动的时间
    "AppendAcceptedCount": "0",       //source端刚刚追加放入channel的event数量
    "OpenConnectionCount": "0"        //当前有效的连接数
  },
  "CHANNEL.c1": {
    "StopTime": "0",                       //channel组件停止时间
    "EventPutSuccessCount": "4912519",     //成功放入channel的event数量
    "ChannelCapacity": "200000",           //channel容量
    "ChannelFillPercentage": "5.0E-4",     //channel使用比例
    "Type": "CHANNEL",
    "ChannelSize": "1",                    //目前在channel中的event数量
    "EventTakeSuccessCount": "4912589",    //从channel中成功取出event的数量
    "EventTakeAttemptCount": "4912661",    //正在从channel中取event的数量
    "StartTime": "1496839492318",          //channel组件启动时间
    "EventPutAttemptCount": "4912519"      //正在放进channel的event数量
  },
  "SINK.k1": {
    "StopTime": "0",                       //sink组件停止时间
    "KafkaEventSendTimer": "403088",       //从channel批量取event,并成功发送到kafka的耗时,单位:毫微秒
    "EventDrainSuccessCount": "4912589",   //sink成功发送出的event数量
    "RollbackCount": "0",                  //失败回滚的event数量
    "Type": "SINK",
    "ConnectionCreatedCount": "0",         //连接被创建的数量
    "BatchCompleteCount": "0",             //成功完成输出的批量事件个数
    "BatchEmptyCount": "62",               //批量取空的数量
    "EventDrainAttemptCount": "0",         //试图从channel消耗的事件数量
    "StartTime": "1496839492829",          //sink组件开始时间
    "BatchUnderflowCount": "8",            //正处于批量处理的batch数
    "ConnectionFailedCount": "0",          //连接失败数
    "ConnectionClosedCount": "0"           //连接关闭数
  }
}

Telegraf 可以支持很多形式的输入收集。根据目前的情况,比较合适httpjson 和 exec

Telegraf 的 httpjson 方式

Telegraf 支持 httpjson 方式的数据收集。即 http 服务返回 json 格式的数据,就可以将 json 数据发送到存储目标。

在 Telegraf 的配置文件中修改以下参数:

[[inputs.httpjson]]
    name = "flume_monitoring"
    servers = [
        "http://localhost:1100/metrics",
    ]

    response_timeout = "10s"
    method = "GET"

重启 Telegraf 后,结果不行,报以下错误。

Error in plugin [inputs.exec]: Metric cannot be made without any fields

看了一下,发现是 JSON 格式问题,key/value 中,必须存在 value 为数字类型的 key。因为 InfluxDB 的 field 是用来统计计算的,需要用数字类型。

这种问题,可以修改源码来解决。

Telegraf 的 exec 方式

Telegraf 支持 调用脚本的方式收集数据,只要返回的数据格式符合要求即可。

在 Telegraf 的配置文件中修改以下参数:

[[inputs.exec]]
    commands = [
        "/tmp/flume_monitor.sh"
    ]

    timeout = "5s"
    data_format = "json"

shell 脚本内容如下。

json=$(curl http://localhost:1100/metrics)

#echo $(echo $json | jq '.[] | length')

jsonstr=$(echo $json | jq 'del(.["SOURCE.s1"].Type) | del(.["CHANNEL.c1"].Type) | del(.["SINK.k1"].Type)')
newjsonstr=$(echo $jsonstr | sed 's/: "/: /g')
newjsonstr=$(echo $newjsonstr | sed 's/",/,/g')
newjsonstr=$(echo $newjsonstr | sed 's/" / /g')
#echo $newjsonstr

echo $newjsonstr | jq '.'

重启 Telegraf 服务,在 InfluxDB 中可以查看到监控数据。

但是这样有一个问题,JSON 的所有内容都存入了 InfluxDB,虽然这是一个无关痛痒的问题,但是总感觉不够简洁。

如是,我又做了一些调整,将 JSON 格式改为 InfluxDB Line Protocol

[[inputs.exec]]
    commands = [
        "/tmp/flume_monitor_influxdb.sh flume-test 192.168.1.1"
    ]

    timeout = "10s"
    data_format = "influx"//修改处

脚本内容如下:

AGENT=$1
HOST=$2
STR=""

json=$(curl http://localhost:1100/metrics)

function outputSource() {
    startTime=$(echo $json | jq -r '.["SOURCE.s1"].StartTime')
    stopTime=$(echo $json | jq -r '.["SOURCE.s1"].StopTime')
    eventReceivedCount=$(echo $json | jq -r '.["SOURCE.s1"].EventReceivedCount')
    eventAcceptedCount=$(echo $json | jq -r '.["SOURCE.s1"].EventAcceptedCount')
    appendBatchReceivedCount=$(echo $json | jq -r '.["SOURCE.s1"].AppendBatchReceivedCount')
    appendBatchAcceptedCount=$(echo $json | jq -r '.["SOURCE.s1"].AppendBatchAcceptedCount')

    STR="${STR}flume_monitor_source,agent=${AGENT},host=${HOST},name=s1 startTime=${startTime},stopTime=${stopTime},eventAcceptedCount=${eventAcceptedCount},eventReceivedCount=${eventReceivedCount},appendBatchReceivedCount=${appendBatchReceivedCount},appendBatchAcceptedCount=${appendBatchAcceptedCount}\n"
}

function outputChannel() {
    startTime=$(echo $json | jq -r '.["CHANNEL.c1"].StartTime')
    stopTime=$(echo $json | jq -r '.["CHANNEL.c1"].StopTime')
    eventPutSuccessCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventPutSuccessCount')
    eventPutAttemptCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventPutAttemptCount')
    eventTakeAttemptCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventTakeAttemptCount')
    eventTakeSuccessCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventTakeSuccessCount')
    channelSize=$(echo $json | jq -r '.["CHANNEL.c1"].ChannelSize')
    channelFillPercentage=$(echo $json | jq -r '.["CHANNEL.c1"].ChannelFillPercentage')

    STR="${STR}flume_monitor_channel,agent=${AGENT},host=${HOST},name=c1 startTime=${startTime},stopTime=${stopTime},eventPutSuccessCount=${eventPutSuccessCount},eventPutAttemptCount=${eventPutAttemptCount},eventTakeAttemptCount=${eventTakeAttemptCount},eventTakeSuccessCount=${eventTakeSuccessCount},channelSize=${channelSize},channelFillPercentage=${channelFillPercentage}\n"
}

function outputSink() {
    startTime=$(echo $json | jq -r '.["SINK.k1"].StartTime')
    stopTime=$(echo $json | jq -r '.["SINK.k1"].StopTime')
    eventDrainAttemptCount=$(echo $json | jq -r '.["SINK.k1"].EventDrainAttemptCount')
    eventDrainSuccessCount=$(echo $json | jq -r '.["SINK.k1"].EventDrainSuccessCount')
    batchUnderflowCount=$(echo $json | jq -r '.["SINK.k1"].BatchUnderflowCount')
    connectionFailedCount=$(echo $json | jq -r '.["SINK.k1"].ConnectionFailedCount')
    rollbackCount=$(echo $json | jq -r '.["SINK.k1"].RollbackCount')

    STR="${STR}flume_monitor_sink,agent=${AGENT},host=${HOST},name=k1 startTime=${startTime},stopTime=${stopTime},eventDrainAttemptCount=${eventDrainAttemptCount},eventDrainSuccessCount=${eventDrainSuccessCount},batchUnderflowCount=${batchUnderflowCount},connectionFailedCount=
    ${connectionFailedCount},rollbackCount=${rollbackCount}"
}

function main() {
    outputSource
    outputChannel
    outputSink
}

main

echo -e "$STR"

输出格式如下:

flume_monitor_source,agent=flume-test,host=192.168.1.1,name=s1, startTime=,stopTime=,eventReceivedCount=,eventAcceptedCount=,eventReceivedCount=,appendBatchReceivedCount=,appendBatchAcceptedCount=

flume_monitor_channel,agent=flume-test,host=192.168.1.1,name=c1, startTime=,stopTime=,eventPutSuccessCount=,eventPutAttemptCount=,eventTakeAttemptCount=,eventTakeSuccessCount=,channelSize=,channelFillPercentage=

flume_monitor_sink,agent=flume-test,host=192.168.1.1,name=k1, startTime=,stopTime=,eventDrainAttemptCount=,eventDrainSuccessCount=,batchUnderflowCount=,connectionFailedCount=,rollbackCount=

这样,就完成了 Flume 的监控数据收集。

监控展示

这里的展示,要和 Grafana 结合使用(Grafana使用)。

由于监控数据中记录的是每次的事件总量,而我这里想展示增量的波动。

查看 InfluxDB 的函数,发现 DIFFERENCE 能很好的解决这个问题(InfluxDB使用)。

select DIFFERENCE(eventAcceptedCount) from flume_monitor_source where agent='flume-test' and time>=1497442340000000000 group by host;

Grafana 的配置。

最终监控结果展示。

再加上报警,监控就完成了。

但是,还有一个问题需要注意,就是在采用 ExecSource 时,有时 flume 出现异常时,会 kill 掉 tail 命令。如果不注意到这点,会导致不能及时发现监控问题。

这种情况,可以通过给 channel 的队列容量设置一个阈值,也可以通过 shell 脚本监控进程。

ps -ef | grep "tail -F /work/apps/nginx/data/logs/flume-test.log" | grep -v grep
if [ $? -ne 0 ]; then
    报警
else
    info "progress is ok."
fi

参考:

One Reply to “Flume监控”

发表评论

邮箱地址不会被公开。