Flume监控
最近在对服务做压力测试时,发现当 QPS 达到很高的一个值时,Flume 会出现报错。为了排查原因,一边结合 Telegraf 的数据收集,给 Flume 加上监控和报警;一边查看 Flume 代码。
异常排查
当 QPS 压到3w时,Flume出现一下错误信息。
09 Jun 2017 12:06:30,703 ERROR [pool-3-thread-1] (org.apache.flume.source.ExecSource$ExecRunnable.run:352) - Failed while running command: tail -F /work/app/nginx-app/data/logs/log_192.168.1.1.log org.apache.flume.ChannelException: Cannot commit transaction. Byte capacity allocated to store event body 1200000.0reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:120) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194) at org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:381) at org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:341) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 09 Jun 2017 12:06:30,724 INFO [pool-3-thread-1] (org.apache.flume.source.ExecSource$ExecRunnable.run:375) - Command [tail -F /work/app/nginx-app/data/logs/log_192.168.1.1.log] exited with 141
根据错误信息描述,是 source
接收端的容量达到上线,无法再接收数据,直接 kill
掉了 tail
命令。
在排查问题之前,先了解一下 fluem
配置信息,以及 event
在三大组件中的数据流程。
flume
的配置。
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /work/app/logs/applog.log a1.sources.s1.batchSize = 1000 a1.sources.s1.batchTimeout = 3000 a1.sources.s1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test a1.sinks.k1.kafka.bootstrap.servers = kafka02:9092,kafka02:9092,kafka03:9092 a1.sinks.k1.kafka.flumeBatchSize = 100 a1.sinks.k1.kafka.producer.acks = 1
主要参数说明:
属性名称 | 默认值 | 含义 |
---|---|---|
capacity | 100 | 存储在channel中的最大event数量,即 queue 的大小 |
transactionCapacity | 100 | source 将 event put 到 channel,或者 sink 从 channel take event 的每次最大事物数。 putList和takeList的大小。 transactionCapacity 必须不大于 capacity |
keep-alive | 3 | 添加或移除 event 的超时时间 |
byteCapacityBufferPercentage | 20 | 所有 event 的 header 字节数占 byteCapacity 的百分比 |
byteCapacity | JVM的80% | channel 中允许所有 event 字节数之和的最大值(只计算 event 的 body 字节数,不包含 header)。 defaultByteCapacity等于JVM可用最大内存的80%(-Xmx的80%)。 如果定义了byteCapacity,则 实际byteCapacity = 定义的byteCapacity * (1- Event header百分比) / byteCapacitySlotSize 。byteCapacitySlotSize 默认等于100 |
flumeBatchSize | 100 | KafkaSink 一次事物发送的 event 数 |
event
的大体数据流程。
source --doPut()--> putList --doCommit()--> queue --doTake()--> takeList / sink
source
执行 channel
的 doPut
方法,将接收到的 event
先放入 putList
临时缓存起来,当达到一定量和时间时,会执行 doCommit
方法,将 putlist
中的 Event
放入 channel
的 queue
。
sink
执行 process
方法时,会调用 channel
的 doTake
方法,将取出的 event
放入 takeList
,同时,将 event
发送到下游。(这里并不是先把 event
放入 takeList
,再从 takeList
取数据发送到下游。 而是为了事物回滚,当 event
发送到下游失败时,可以将 takeList
中的数据全部放回到 queue
。)
channel
在执行 doCommit
时,会对存储容量进行判断。
int remainingChange = takeList.size() - putList.size(); if (remainingChange < 0) { if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) { throw new ChannelException("Cannot commit transaction. Byte capacity " + "allocated to store event body " + byteCapacity * byteCapacitySlotSize + "reached. Please increase heap space/byte capacity allocated to " + "the channel as the sinks may not be keeping up with the sources"); } if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { bytesRemaining.release(putByteCounter); throw new ChannelFullException("Space for commit to queue couldn't be acquired." + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } }
当 takeList.size() - putList.size() < 0
,表示放入的 event
比取出的多,如果在 keepAlive
的时间内, putList
中 所有 event
的 body
字节总数在 byteCapacity
的容量内没有可用的的空间时,则抛出上面的异常。
以上内容最直接的解释就是,channel
中 event
占用太多内存,空间不够用。
通过以上分析,知道了错误原因。只需要调整一下配置参数应该就可以解决这个问题。
1、先调整 flume-env.sh
中 agent
默认初始化的内存。
export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"
2、修改 agent 的配置参数。
a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /work/app/logs/applog.log a1.sources.s1.batchSize = 1000 #每次doput的容量,或者source读取多少event时,发送到channel a1.sources.s1.batchTimeout = 3000 #每隔多长时间,批量将event发送到channel a1.sources.s1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 //增大queue容量 a1.channels.c1.transactionCapacity = 100000 //putlist和takelist的容量 a1.channels.c1.byteCapacityBufferPercentage = 10 //减少header占用jvm的比例 #a1.channels.c1.byteCapacity = 800000 注释掉,采用flume-env.sh中的配置 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test a1.sinks.k1.kafka.bootstrap.servers = kafka02:9092,kafka02:9092,kafka03:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 //增大sink批量操作的大小,提高处理速度 a1.sinks.k1.kafka.producer.acks = 1
重新压测,查看日志,一切正常。
监控
问题虽然解决了,但是并不能保证以后不会再出问题。所以,必须加上监控,并根据监控增加应对措施。
先看看 flume
支持的监控方式。
JMX Reporting
可以通过使用 flume-env.sh
在 JAVA_OPTS
环境变量中指定 JMX
参数来启用JMX报告。
例如:
JAVA_OPTS = "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.rmi.port=5446 -Djava.rmi.server.hostname=192.168.16.214"
可以监控 Flume
内存、线程、类、CPU等使用情况。
Ganglia Reporting
Flume
可以将监控指标报告发送给 Ganglia 3
或 Ganglia 3.1 metanodes
。要将 metrics
报告到 Ganglia
,必须在启动的时候就支持 Flume Agent
。
bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
JSON Reporting
Flume
也可以以 JSON
格式报告指标。要使用 JSON
格式报告,Flume
会在可配置端口上托管 Web
服务器。
使用方式:
bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
服务以下列 JSON
格式报告指标:
{ "typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"}, "typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"} }
自定义报告
自定义的监控需要实现 org.apache.flume.instrumentation.MonitorService
接口。这个不是本次研究的重点,就不详细说明了。
通过比较,决定先用 JSON
的方式,因为很容易和 Telegraf
进行结合使用。当然,JVM
方式也可以,打算后期再加上。
Flume + Telegraf
启动 agent
时,加上监控参数。
./bin/flume-ng agent -n a1 -c conf -f conf/flume-test.properties -Dflume.root.logger=INFO,DAILY -Dflume.log.dir=/work/app/flume/logs -Dflume.log.file=flume-test.log -DappFlag=flume-test -Dflume.monitoring.type=http -Dflume.monitoring.port=1100 &
查看监控数据。通过 curl http://localhost:1100/metrics
得到以下数据。
{ "SOURCE.s1": { "StopTime": "0", //source组件停止时间 "AppendBatchReceivedCount": "0", //source端刚刚追加的批量的数量,比如一批100,该度量为2,就是source端收到了200个events "EventReceivedCount": "4913189", //source端累计收到的event数量 "AppendBatchAcceptedCount": "0", //source端追加到channel的数量 "Type": "SOURCE", "EventAcceptedCount": "4912519", //source端累计成功放入channel的event数量 "AppendReceivedCount": "0", //source端刚刚追加的且目前已收到的event数量 "StartTime": "1496839492821", //source组件启动的时间 "AppendAcceptedCount": "0", //source端刚刚追加放入channel的event数量 "OpenConnectionCount": "0" //当前有效的连接数 }, "CHANNEL.c1": { "StopTime": "0", //channel组件停止时间 "EventPutSuccessCount": "4912519", //成功放入channel的event数量 "ChannelCapacity": "200000", //channel容量 "ChannelFillPercentage": "5.0E-4", //channel使用比例 "Type": "CHANNEL", "ChannelSize": "1", //目前在channel中的event数量 "EventTakeSuccessCount": "4912589", //从channel中成功取出event的数量 "EventTakeAttemptCount": "4912661", //正在从channel中取event的数量 "StartTime": "1496839492318", //channel组件启动时间 "EventPutAttemptCount": "4912519" //正在放进channel的event数量 }, "SINK.k1": { "StopTime": "0", //sink组件停止时间 "KafkaEventSendTimer": "403088", //从channel批量取event,并成功发送到kafka的耗时,单位:毫微秒 "EventDrainSuccessCount": "4912589", //sink成功发送出的event数量 "RollbackCount": "0", //失败回滚的event数量 "Type": "SINK", "ConnectionCreatedCount": "0", //连接被创建的数量 "BatchCompleteCount": "0", //成功完成输出的批量事件个数 "BatchEmptyCount": "62", //批量取空的数量 "EventDrainAttemptCount": "0", //试图从channel消耗的事件数量 "StartTime": "1496839492829", //sink组件开始时间 "BatchUnderflowCount": "8", //正处于批量处理的batch数 "ConnectionFailedCount": "0", //连接失败数 "ConnectionClosedCount": "0" //连接关闭数 } }
Telegraf
可以支持很多形式的输入收集。根据目前的情况,比较合适httpjson
和 exec
。
Telegraf 的 httpjson 方式
Telegraf
支持 httpjson
方式的数据收集。即 http
服务返回 json
格式的数据,就可以将 json
数据发送到存储目标。
在 Telegraf
的配置文件中修改以下参数:
[[inputs.httpjson]] name = "flume_monitoring" servers = [ "http://localhost:1100/metrics", ] response_timeout = "10s" method = "GET"
重启 Telegraf
后,结果不行,报以下错误。
Error in plugin [inputs.exec]: Metric cannot be made without any fields
看了一下,发现是 JSON
格式问题,key/value
中,必须存在 value
为数字类型的 key
。因为 InfluxDB
的 field
是用来统计计算的,需要用数字类型。
这种问题,可以修改源码来解决。
Telegraf 的 exec 方式
Telegraf
支持 调用脚本的方式收集数据,只要返回的数据格式符合要求即可。
在 Telegraf
的配置文件中修改以下参数:
[[inputs.exec]] commands = [ "/tmp/flume_monitor.sh" ] timeout = "5s" data_format = "json"
shell
脚本内容如下。
json=$(curl http://localhost:1100/metrics) #echo $(echo $json | jq '.[] | length') jsonstr=$(echo $json | jq 'del(.["SOURCE.s1"].Type) | del(.["CHANNEL.c1"].Type) | del(.["SINK.k1"].Type)') newjsonstr=$(echo $jsonstr | sed 's/: "/: /g') newjsonstr=$(echo $newjsonstr | sed 's/",/,/g') newjsonstr=$(echo $newjsonstr | sed 's/" / /g') #echo $newjsonstr echo $newjsonstr | jq '.'
重启 Telegraf
服务,在 InfluxDB
中可以查看到监控数据。
但是这样有一个问题,JSON
的所有内容都存入了 InfluxDB
,虽然这是一个无关痛痒的问题,但是总感觉不够简洁。
如是,我又做了一些调整,将 JSON
格式改为 InfluxDB Line Protocol
。
[[inputs.exec]] commands = [ "/tmp/flume_monitor_influxdb.sh flume-test 192.168.1.1" ] timeout = "10s" data_format = "influx"//修改处
脚本内容如下:
AGENT=$1 HOST=$2 STR="" json=$(curl http://localhost:1100/metrics) function outputSource() { startTime=$(echo $json | jq -r '.["SOURCE.s1"].StartTime') stopTime=$(echo $json | jq -r '.["SOURCE.s1"].StopTime') eventReceivedCount=$(echo $json | jq -r '.["SOURCE.s1"].EventReceivedCount') eventAcceptedCount=$(echo $json | jq -r '.["SOURCE.s1"].EventAcceptedCount') appendBatchReceivedCount=$(echo $json | jq -r '.["SOURCE.s1"].AppendBatchReceivedCount') appendBatchAcceptedCount=$(echo $json | jq -r '.["SOURCE.s1"].AppendBatchAcceptedCount') STR="${STR}flume_monitor_source,agent=${AGENT},host=${HOST},name=s1 startTime=${startTime},stopTime=${stopTime},eventAcceptedCount=${eventAcceptedCount},eventReceivedCount=${eventReceivedCount},appendBatchReceivedCount=${appendBatchReceivedCount},appendBatchAcceptedCount=${appendBatchAcceptedCount}\n" } function outputChannel() { startTime=$(echo $json | jq -r '.["CHANNEL.c1"].StartTime') stopTime=$(echo $json | jq -r '.["CHANNEL.c1"].StopTime') eventPutSuccessCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventPutSuccessCount') eventPutAttemptCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventPutAttemptCount') eventTakeAttemptCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventTakeAttemptCount') eventTakeSuccessCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventTakeSuccessCount') channelSize=$(echo $json | jq -r '.["CHANNEL.c1"].ChannelSize') channelFillPercentage=$(echo $json | jq -r '.["CHANNEL.c1"].ChannelFillPercentage') STR="${STR}flume_monitor_channel,agent=${AGENT},host=${HOST},name=c1 startTime=${startTime},stopTime=${stopTime},eventPutSuccessCount=${eventPutSuccessCount},eventPutAttemptCount=${eventPutAttemptCount},eventTakeAttemptCount=${eventTakeAttemptCount},eventTakeSuccessCount=${eventTakeSuccessCount},channelSize=${channelSize},channelFillPercentage=${channelFillPercentage}\n" } function outputSink() { startTime=$(echo $json | jq -r '.["SINK.k1"].StartTime') stopTime=$(echo $json | jq -r '.["SINK.k1"].StopTime') eventDrainAttemptCount=$(echo $json | jq -r '.["SINK.k1"].EventDrainAttemptCount') eventDrainSuccessCount=$(echo $json | jq -r '.["SINK.k1"].EventDrainSuccessCount') batchUnderflowCount=$(echo $json | jq -r '.["SINK.k1"].BatchUnderflowCount') connectionFailedCount=$(echo $json | jq -r '.["SINK.k1"].ConnectionFailedCount') rollbackCount=$(echo $json | jq -r '.["SINK.k1"].RollbackCount') STR="${STR}flume_monitor_sink,agent=${AGENT},host=${HOST},name=k1 startTime=${startTime},stopTime=${stopTime},eventDrainAttemptCount=${eventDrainAttemptCount},eventDrainSuccessCount=${eventDrainSuccessCount},batchUnderflowCount=${batchUnderflowCount},connectionFailedCount= ${connectionFailedCount},rollbackCount=${rollbackCount}" } function main() { outputSource outputChannel outputSink } main echo -e "$STR"
输出格式如下:
flume_monitor_source,agent=flume-test,host=192.168.1.1,name=s1, startTime=,stopTime=,eventReceivedCount=,eventAcceptedCount=,eventReceivedCount=,appendBatchReceivedCount=,appendBatchAcceptedCount= flume_monitor_channel,agent=flume-test,host=192.168.1.1,name=c1, startTime=,stopTime=,eventPutSuccessCount=,eventPutAttemptCount=,eventTakeAttemptCount=,eventTakeSuccessCount=,channelSize=,channelFillPercentage= flume_monitor_sink,agent=flume-test,host=192.168.1.1,name=k1, startTime=,stopTime=,eventDrainAttemptCount=,eventDrainSuccessCount=,batchUnderflowCount=,connectionFailedCount=,rollbackCount=
这样,就完成了 Flume 的监控数据收集。
监控展示
这里的展示,要和 Grafana
结合使用(Grafana使用)。
由于监控数据中记录的是每次的事件总量,而我这里想展示增量的波动。
查看 InfluxDB
的函数,发现 DIFFERENCE
能很好的解决这个问题(InfluxDB使用)。
select DIFFERENCE(eventAcceptedCount) from flume_monitor_source where agent='flume-test' and time>=1497442340000000000 group by host;
Grafana
的配置。

最终监控结果展示。


再加上报警,监控就完成了。
但是,还有一个问题需要注意,就是在采用 ExecSource
时,有时 flume
出现异常时,会 kill
掉 tail
命令。如果不注意到这点,会导致不能及时发现监控问题。
这种情况,可以通过给 channel
的队列容量设置一个阈值,也可以通过 shell
脚本监控进程。
ps -ef | grep "tail -F /work/apps/nginx/data/logs/flume-test.log" | grep -v grep if [ $? -ne 0 ]; then 报警 else info "progress is ok." fi
参考:
I’d always want to be update on new blog posts on this site, saved to bookmarks!