Flume监控

 2017-07-05 14:23:51     Flume  监控  InfluxDB  Telegraf  Grafana   2713


导读: 最近在对服务做压力测试时,发现当 QPS 达到很高的一个值时,Flume 会出现报错。为了排查原因,一边结合 Telegraf 的数据收集,给 Flume 加上监控和报警;一边查看 Flume 代码。

异常排查


当 QPS 压到3w时,Flume出现一下错误信息。

09 Jun 2017 12:06:30,703 ERROR [pool-3-thread-1] (org.apache.flume.source.ExecSource$ExecRunnable.run:352)  - Failed while running command: tail -F /work/app/nginx-app/data/logs/log_192.168.1.1.log
org.apache.flume.ChannelException: Cannot commit transaction. Byte capacity allocated to store event body 1200000.0reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:120)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
    at org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:381)
    at org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:341)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
09 Jun 2017 12:06:30,724 INFO  [pool-3-thread-1] (org.apache.flume.source.ExecSource$ExecRunnable.run:375)  - Command [tail -F /work/app/nginx-app/data/logs/log_192.168.1.1.log] exited with 141

根据错误信息描述,是 source 接收端的容量达到上线,无法再接收数据,直接 kill 掉了 tail 命令。


在排查问题之前,先了解一下 fluem 配置信息,以及 event 在三大组件中的数据流程。

flume 的配置。

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /work/app/logs/applog.log
a1.sources.s1.batchSize = 1000
a1.sources.s1.batchTimeout = 3000
a1.sources.s1.channels = c1


a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = kafka02:9092,kafka02:9092,kafka03:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

主要参数说明:

属性名称 默认值 含义
capacity 100 存储在channel中的最大event数量,即 queue 的大小
transactionCapacity 100 source 将 event put 到 channel,或者 sink 从 channel take event 的每次最大事物数。
putList和takeList的大小。
transactionCapacity 必须不大于 capacity
keep-alive 3 添加或移除 event 的超时时间
byteCapacityBufferPercentage 20 所有 event 的 header 字节数占 byteCapacity 的百分比
byteCapacity JVM的80% channel 中允许所有 event 字节数之和的最大值(只计算 event 的 body 字节数,不包含 header)。
defaultByteCapacity等于JVM可用最大内存的80%(-Xmx的80%)。
如果定义了byteCapacity,则 实际byteCapacity = 定义的byteCapacity * (1- Event header百分比) / byteCapacitySlotSize 。byteCapacitySlotSize 默认等于100
flumeBatchSize 100 KafkaSink 一次事物发送的 event

event 的大体数据流程。

source --doPut()--> putList --doCommit()--> queue --doTake()--> takeList / sink

source 执行 channeldoPut 方法,将接收到的 event 先放入 putList 临时缓存起来,当达到一定量和时间时,会执行 doCommit 方法,将 putlist 中的 Event 放入 channelqueue

sink 执行 process 方法时,会调用 channeldoTake 方法,将取出的 event 放入 takeList,同时,将 event 发送到下游。(这里并不是先把 event 放入 takeList,再从 takeList 取数据发送到下游。 而是为了事物回滚,当 event 发送到下游失败时,可以将 takeList 中的数据全部放回到 queue 。)

channel 在执行 doCommit 时,会对存储容量进行判断。

int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
    if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
        throw new ChannelException("Cannot commit transaction. Byte capacity " +
                "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
                "reached. Please increase heap space/byte capacity allocated to " +
                "the channel as the sinks may not be keeping up with the sources");
    }
    if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
        bytesRemaining.release(putByteCounter);
        throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
                " Sinks are likely not keeping up with sources, or the buffer size is too tight");
    }
}

takeList.size() - putList.size() < 0,表示放入的 event 比取出的多,如果在 keepAlive 的时间内, putList 中 所有 eventbody 字节总数在 byteCapacity 的容量内没有可用的的空间时,则抛出上面的异常。

以上内容最直接的解释就是,channelevent 占用太多内存,空间不够用。


通过以上分析,知道了错误原因。只需要调整一下配置参数应该就可以解决这个问题。

1、先调整 flume-env.shagent 默认初始化的内存。

export JAVA_OPTS="-Xms1024m -Xmx2048m -Xss256k -Xmn1024m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xloggc:/work/app/flume/logs/server-gc.log.$(date +%F) -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=1 -XX:GCLogFileSize=200M"

2、修改 agent 的配置参数。

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /work/app/logs/applog.log
a1.sources.s1.batchSize = 1000 #每次doput的容量,或者source读取多少event时,发送到channel
a1.sources.s1.batchTimeout = 3000 #每隔多长时间,批量将event发送到channel
a1.sources.s1.channels = c1


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000  //增大queue容量
a1.channels.c1.transactionCapacity = 100000 //putlist和takelist的容量
a1.channels.c1.byteCapacityBufferPercentage = 10 //减少header占用jvm的比例
#a1.channels.c1.byteCapacity = 800000 注释掉,采用flume-env.sh中的配置

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = kafka02:9092,kafka02:9092,kafka03:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000 //增大sink批量操作的大小,提高处理速度
a1.sinks.k1.kafka.producer.acks = 1

重新压测,查看日志,一切正常。

监控


问题虽然解决了,但是并不能保证以后不会再出问题。所以,必须加上监控,并根据监控增加应对措施。

先看看 flume 支持的监控方式。

JMX Reporting

可以通过使用 flume-env.shJAVA_OPTS 环境变量中指定 JMX 参数来启用JMX报告。

例如:

JAVA_OPTS = "-Dcom.sun.management.jmxremote 
    -Dcom.sun.management.jmxremote.authenticate=false 
    -Dcom.sun.management.jmxremote.ssl=false
    -Dcom.sun.management.jmxremote.port=5445 
    -Dcom.sun.management.jmxremote.rmi.port=5446
    -Djava.rmi.server.hostname=192.168.16.214"

可以监控 Flume 内存、线程、类、CPU等使用情况。

Ganglia Reporting

Flume 可以将监控指标报告发送给 Ganglia 3Ganglia 3.1 metanodes。要将 metrics 报告到 Ganglia,必须在启动的时候就支持 Flume Agent

 bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON Reporting

Flume 也可以以 JSON 格式报告指标。要使用 JSON 格式报告,Flume 会在可配置端口上托管 Web 服务器。

使用方式:

bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

服务以下列 JSON 格式报告指标:

{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}

自定义报告

自定义的监控需要实现 org.apache.flume.instrumentation.MonitorService 接口。这个不是本次研究的重点,就不详细说明了。

通过比较,决定先用 JSON 的方式,因为很容易和 Telegraf 进行结合使用(Telegraf使用)。当然,JVM 方式也可以,打算后期再加上。

Flume + Telegraf

启动 agent 时,加上监控参数。

./bin/flume-ng agent -n a1 -c conf -f conf/flume-test.properties -Dflume.root.logger=INFO,DAILY -Dflume.log.dir=/work/app/flume/logs -Dflume.log.file=flume-test.log -DappFlag=flume-test -Dflume.monitoring.type=http -Dflume.monitoring.port=1100 &

查看监控数据。通过 curl http://localhost:1100/metrics 得到以下数据。

 {
  "SOURCE.s1": {
    "StopTime": "0",                  //source组件停止时间
    "AppendBatchReceivedCount": "0",  //source端刚刚追加的批量的数量,比如一批100,该度量为2,就是source端收到了200个events
    "EventReceivedCount": "4913189",  //source端累计收到的event数量
    "AppendBatchAcceptedCount": "0",  //source端追加到channel的数量
    "Type": "SOURCE",
    "EventAcceptedCount": "4912519",  //source端累计成功放入channel的event数量
    "AppendReceivedCount": "0",       //source端刚刚追加的且目前已收到的event数量
    "StartTime": "1496839492821",     //source组件启动的时间
    "AppendAcceptedCount": "0",       //source端刚刚追加放入channel的event数量
    "OpenConnectionCount": "0"        //当前有效的连接数
  },
  "CHANNEL.c1": {
    "StopTime": "0",                       //channel组件停止时间
    "EventPutSuccessCount": "4912519",     //成功放入channel的event数量
    "ChannelCapacity": "200000",           //channel容量
    "ChannelFillPercentage": "5.0E-4",     //channel使用比例
    "Type": "CHANNEL",
    "ChannelSize": "1",                    //目前在channel中的event数量
    "EventTakeSuccessCount": "4912589",    //从channel中成功取出event的数量
    "EventTakeAttemptCount": "4912661",    //正在从channel中取event的数量
    "StartTime": "1496839492318",          //channel组件启动时间
    "EventPutAttemptCount": "4912519"      //正在放进channel的event数量
  },
  "SINK.k1": {
    "StopTime": "0",                       //sink组件停止时间
    "KafkaEventSendTimer": "403088",       //从channel批量取event,并成功发送到kafka的耗时,单位:毫微秒
    "EventDrainSuccessCount": "4912589",   //sink成功发送出的event数量
    "RollbackCount": "0",                  //失败回滚的event数量
    "Type": "SINK",
    "ConnectionCreatedCount": "0",         //连接被创建的数量
    "BatchCompleteCount": "0",             //成功完成输出的批量事件个数
    "BatchEmptyCount": "62",               //批量取空的数量
    "EventDrainAttemptCount": "0",         //试图从channel消耗的事件数量
    "StartTime": "1496839492829",          //sink组件开始时间
    "BatchUnderflowCount": "8",            //正处于批量处理的batch数
    "ConnectionFailedCount": "0",          //连接失败数
    "ConnectionClosedCount": "0"           //连接关闭数
  }
}

Telegraf 可以支持很多形式的输入收集。根据目前的情况,比较合适httpjsonexec

Telegraf 的 httpjson 方式

Telegraf 支持 httpjson 方式的数据收集。即 http 服务返回 json 格式的数据,就可以将 json 数据发送到存储目标。

Telegraf 的配置文件中修改以下参数:

[[inputs.httpjson]]
    name = "flume_monitoring"
    servers = [
        "http://localhost:1100/metrics",
    ]

    response_timeout = "10s"
    method = "GET"

重启 Telegraf 后,结果不行,报以下错误。

Error in plugin [inputs.exec]: Metric cannot be made without any fields

看了一下,发现是 JSON 格式问题,key/value 中,必须存在 value 为数字类型的 key。因为 InfluxDBfield 是用来统计计算的,需要用数字类型。

这种问题,可以修改源码来解决。

Telegraf 的 exec 方式

Telegraf 支持 调用脚本的方式收集数据,只要返回的数据格式符合要求即可。

Telegraf 的配置文件中修改以下参数:

[[inputs.exec]]
    commands = [
        "/tmp/flume_monitor.sh"
    ]

    timeout = "5s"
    data_format = "json"

shell 脚本内容如下。

json=$(curl http://localhost:1100/metrics)

#echo $(echo $json | jq '.[] | length')

jsonstr=$(echo $json | jq 'del(.["SOURCE.s1"].Type) | del(.["CHANNEL.c1"].Type) | del(.["SINK.k1"].Type)')
newjsonstr=$(echo $jsonstr | sed 's/: "/: /g')
newjsonstr=$(echo $newjsonstr | sed 's/",/,/g')
newjsonstr=$(echo $newjsonstr | sed 's/" / /g')
#echo $newjsonstr

echo $newjsonstr | jq '.'

重启 Telegraf 服务,在 InfluxDB 中可以查看到监控数据。

但是这样有一个问题,JSON 的所有内容都存入了 InfluxDB,虽然这是一个无关痛痒的问题,但是总感觉不够简洁。

如是,我又做了一些调整,将 JSON 格式改为 InfluxDB Line Protocol

[[inputs.exec]]
    commands = [
        "/tmp/flume_monitor_influxdb.sh flume-test 192.168.1.1"
    ]

    timeout = "10s"
    data_format = "influx"//修改处

脚本内容如下:

AGENT=$1
HOST=$2
STR=""

json=$(curl http://localhost:1100/metrics)

function outputSource() {
    startTime=$(echo $json | jq -r '.["SOURCE.s1"].StartTime')
    stopTime=$(echo $json | jq -r '.["SOURCE.s1"].StopTime')
    eventReceivedCount=$(echo $json | jq -r '.["SOURCE.s1"].EventReceivedCount')
    eventAcceptedCount=$(echo $json | jq -r '.["SOURCE.s1"].EventAcceptedCount')
    appendBatchReceivedCount=$(echo $json | jq -r '.["SOURCE.s1"].AppendBatchReceivedCount')
    appendBatchAcceptedCount=$(echo $json | jq -r '.["SOURCE.s1"].AppendBatchAcceptedCount')

    STR="${STR}flume_monitor_source,agent=${AGENT},host=${HOST},name=s1 startTime=${startTime},stopTime=${stopTime},eventAcceptedCount=${eventAcceptedCount},eventReceivedCount=${eventReceivedCount},appendBatchReceivedCount=${appendBatchReceivedCount},appendBatchAcceptedCount=${appendBatchAcceptedCount}\n"
}

function outputChannel() {
    startTime=$(echo $json | jq -r '.["CHANNEL.c1"].StartTime')
    stopTime=$(echo $json | jq -r '.["CHANNEL.c1"].StopTime')
    eventPutSuccessCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventPutSuccessCount')
    eventPutAttemptCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventPutAttemptCount')
    eventTakeAttemptCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventTakeAttemptCount')
    eventTakeSuccessCount=$(echo $json | jq -r '.["CHANNEL.c1"].EventTakeSuccessCount')
    channelSize=$(echo $json | jq -r '.["CHANNEL.c1"].ChannelSize')
    channelFillPercentage=$(echo $json | jq -r '.["CHANNEL.c1"].ChannelFillPercentage')

    STR="${STR}flume_monitor_channel,agent=${AGENT},host=${HOST},name=c1 startTime=${startTime},stopTime=${stopTime},eventPutSuccessCount=${eventPutSuccessCount},eventPutAttemptCount=${eventPutAttemptCount},eventTakeAttemptCount=${eventTakeAttemptCount},eventTakeSuccessCount=${eventTakeSuccessCount},channelSize=${channelSize},channelFillPercentage=${channelFillPercentage}\n"
}

function outputSink() {
    startTime=$(echo $json | jq -r '.["SINK.k1"].StartTime')
    stopTime=$(echo $json | jq -r '.["SINK.k1"].StopTime')
    eventDrainAttemptCount=$(echo $json | jq -r '.["SINK.k1"].EventDrainAttemptCount')
    eventDrainSuccessCount=$(echo $json | jq -r '.["SINK.k1"].EventDrainSuccessCount')
    batchUnderflowCount=$(echo $json | jq -r '.["SINK.k1"].BatchUnderflowCount')
    connectionFailedCount=$(echo $json | jq -r '.["SINK.k1"].ConnectionFailedCount')
    rollbackCount=$(echo $json | jq -r '.["SINK.k1"].RollbackCount')

    STR="${STR}flume_monitor_sink,agent=${AGENT},host=${HOST},name=k1 startTime=${startTime},stopTime=${stopTime},eventDrainAttemptCount=${eventDrainAttemptCount},eventDrainSuccessCount=${eventDrainSuccessCount},batchUnderflowCount=${batchUnderflowCount},connectionFailedCount=
    ${connectionFailedCount},rollbackCount=${rollbackCount}"
}

function main() {
    outputSource
    outputChannel
    outputSink
}

main

echo -e "$STR"

输出格式如下:

flume_monitor_source,agent=flume-test,host=192.168.1.1,name=s1, startTime=,stopTime=,eventReceivedCount=,eventAcceptedCount=,eventReceivedCount=,appendBatchReceivedCount=,appendBatchAcceptedCount=

flume_monitor_channel,agent=flume-test,host=192.168.1.1,name=c1, startTime=,stopTime=,eventPutSuccessCount=,eventPutAttemptCount=,eventTakeAttemptCount=,eventTakeSuccessCount=,channelSize=,channelFillPercentage=

flume_monitor_sink,agent=flume-test,host=192.168.1.1,name=k1, startTime=,stopTime=,eventDrainAttemptCount=,eventDrainSuccessCount=,batchUnderflowCount=,connectionFailedCount=,rollbackCount=

这样,就完成了 Flume 的监控数据收集。

监控展示


这里的展示,要和 Grafana 结合使用(Grafana使用)。

由于监控数据中记录的是每次的事件总量,而我这里想展示增量的波动。

查看 InfluxDB 的函数,发现 DIFFERENCE 能很好的解决这个问题(InfluxDB使用)。

select DIFFERENCE(eventAcceptedCount) from flume_monitor_source where agent='flume-test' and time>=1497442340000000000 group by host;

Grafana 的配置。

监控配置

最终监控结果展示。

监控展现1

监控展现2

再加上报警,监控就完成了。

但是,还有一个问题需要注意,就是在采用 ExecSource 时,有时 flume 出现异常时,会 killtail 命令。如果不注意到这点,会导致不能及时发现监控问题。

这种情况,可以通过给 channel 的队列容量设置一个阈值,也可以通过 shell 脚本监控进程。

ps -ef | grep "tail -F /work/apps/nginx/data/logs/flume-test.log" | grep -v grep
if [ $? -ne 0 ]; then
    报警
else
    info "progress is ok."
fi

参考: