Storm自动监控报警

 2017-05-14 15:19:32     Storm  监控  Telegraf  Grafana   2080


导读: 虽然Storm自身提供的有UI,可以查看监控状态。但总不能让人实时的盯着吧。为了满足自己的监控需求,查看了官网,发现有Storm UI REST API,提供了丰富的接口,Storm UI就是基于这个API实现的。有了这些API,我们就可以实现自身的一些监控了。

需求


由于我们没有统一的监控报警平台,需要我们自己根据服务情况来做监控。监控要求很简单:

  • 1、指定的Topology服务出现异常要能及时报警
  • 2、Topology中的bolt出现压力超过阈值(0.8)时,要能预警,便于及时跟踪

Storm UI REST API


查看Storm UI REST API,里面有很详细的描述。主要分GET和Post请求的两类API。

GET类API主要提供获取集群或Topology的信息。

POST类API主要提供对集群或Topology的调整操作。

使用方式:

http://<ui-host>:<ui-port>/api/v1/...

假设我们的 ui-host 为192.168.1.23, port 为9999。

获取配置信息

http://192.168.1.23:9999/api/v1/cluster/configuration
{
"dev.zookeeper.path": "/tmp/dev-storm-zookeeper",
"topology.tick.tuple.freq.secs": null,
"topology.builtin.metrics.bucket.size.secs": 60,
"topology.fall.back.on.java.serialization": true,
"topology.max.error.report.per.interval": 5,
"zmq.linger.millis": 5000,
"topology.skip.missing.kryo.registrations": false,
"storm.messaging.netty.client_worker_threads": 1,
"ui.childopts": "-Xmx768m",
"storm.zookeeper.session.timeout": 20000,
"nimbus.reassign": true,
"topology.trident.batch.emit.interval.millis": 500,
"storm.messaging.netty.flush.check.interval.ms": 10,
"nimbus.monitor.freq.secs": 10,
"logviewer.childopts": "-Xmx128m",
"java.library.path": "/usr/local/lib:/opt/local/lib:/usr/lib",
"topology.executor.send.buffer.size": 1024,
}

获取集群摘要信息

http://192.168.1.23:9999/api/v1/cluster/summary
{
"stormVersion": "0.9.2-incubating-SNAPSHOT",
"supervisors": 1,
"slotsTotal": 4,
"slotsUsed": 3,
"slotsFree": 1,
"executorsTotal": 28,
"tasksTotal": 28
}

获取supervisor摘要

http://192.168.1.23:9999/api/v1/supervisor/summary

获取nimbus摘要

http://192.168.1.23:9999/api/v1/nimbus/summary

获取历史摘要

http://192.168.1.23:9999/api/v1/history/summary

获取topology摘要

http://192.168.1.23:9999/api/v1/topology/summary

获取topology的worker信息

http://192.168.1.23:9999/api/v1/topology-workers/:id

获取topology实例下的组件信息

http://192.168.1.23:9999/api/v1/topology/:id/component/:component

监控脚本


假设我们在A服务器上监控Storm,A服务器与 ui-host 是互通的。

由于涉及到shell操作json,需要安装 jq,一款shell操作json的利器。

apt-get install jq

编写监控脚本 monitor-storm.sh

#!/bin/bash

CURRENT_DIR=$(cd `dirname $0`; cd ..; pwd)
source $CURRENT_DIR/bin/env.sh

topologys=(TopologyA TopologyB)
url="http://192.168.1.23:9999"
topologyid=1

#根据topology name获取 topology id
function getTopologyid() {
    info "topology name is $1"
    error=$(cat /tmp/topologys.json | jq ".error")
    if [ "$error" = "null" ]; then
        stat=$(cat /tmp/topologys.json | jq -r '.topologies[] | select(.name == "'$1'") | .status')
        if [ "$stat" = "ACTIVE" ]; then
            topologyid=$(cat /tmp/topologys.json | jq -r '.topologies[] | select(.name == "'$1'") | .id')
            return 0
        else
            content="storm service warning. topology name '${1}' status is not ACTIVE, please check!"
            warn "${content}"
            sendMail "${content}" "data-node" 1
            return 1
        fi
    else
        content="storm service warning. api error with '/api/v1/topology/summary', please check!"
        warn "${content}"
        sendMail "${content}" "data-node" 1
        return 1
    fi
}

# 根据 topology id 检查其下bolt的压力capacity,大于阈值的报警
function checkBoltCapacity() {
    info "topology id is $1"
    curl "${url}/api/v1/topology/${1}" > /tmp/topology.json
    error=$(cat /tmp/topology.json | jq ".error")
    if [ "$error" = "null" ]; then
        bolts=$(cat /tmp/topology.json | jq '.bolts [] | select(.capacity >= "0.8") | .boltId' | xargs)
        if [ -n "$bolts" ]; then
            content="storm service warning. the bolts '${bolts}' capacity of '${1}' gt 0.8, please check!"
            warn "${content}"
            sendMail "${content}" "data-node" 1
        else
            info "the bolts of '${1}' is ok"
        fi
        return 0
    else
        content="storm service warning. api error with '/api/v1/topology/${1}', please check!"
        sendMail "$content" "data-node" 1
        return 1
    fi
}

# 遍历所有的topology
function monitorTopology() {
    for topology in ${topologys[@]};
    do
        curl ${url}/api/v1/topology/summary > /tmp/topologys.json
        topologyid=1
        getTopologyid "$topology"
        if [[ $? -eq 0 ]]; then
            checkBoltCapacity ${topologyid}
        fi
    done
}

monitorTopology

这里有几个自定义函数在脚本 env.sh中。

#!/bin/bash

source ~/.bashrc

JAVA_HOME=/usr/local/jdk1.8.0_101
JAVA_OPTS="-Xms100m -Xmx1024m"
CLEAN_FLAG=1

function info() {
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    local msg=$1
    echo "Info: $msg" >&2
  fi
}

function warn() {
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    local msg=$1
    echo "Warning: $msg" >&2
  fi
}

function error() {
  local msg=$1
  local exit_code=$2

  echo "Error: $msg" >&2

  if [ -n "$exit_code" ] ; then
    exit $exit_code
  fi
}


function sendMail() {
    info "start send email. tag: $2"
    curl -H "content-type: application/x-www-form-urlencoded; charset=UTF-8" -d "content=$1" "http://192.168.10.45/monitor-alarm/alarm/send?tag=$2&valid=$3"
    info "email send over."
}

这里的 sendMail 函数调用了一个Java报警服务,向其发送报警信息。

到这里,根据需求,脚本已经完成。再加上 crontab 就可以实时监控了。

如果需要对bolt的 failed 或其它属性进行监控,可以在此基础上做适当调整。


2017.06.27 更新

发现使用了 Telegraf 之后,很多监控变得简单多了,尤其是那些自带监控API的服务,结合起来更容易。再加上 InfluxDB 存储,Grafana 展示和报警,一切都变得那么美好。所以,Storm 也适合。

根据监控方式,设计监控指标在 InfluxDB 的存储格式。假设你想监控 spout 吞吐量。可以 spoutacked 值。

我的存储格式:

monitor_storm,topology=TestTopology,component=spout,id=kafka-spout,host=def emitted=1512280,acked=2442240,transferred=1512280,failed=0
monitor_storm,topology=TestTopology,component=bolt,id=parse-bolt,host=def emitted=0,acked=117557860,transferred=0,failed=0

JSON 取值函数。

function monitor_spout_telegraf() {
    spoutId=$(cat /tmp/topology.json | jq -r '.spouts[0].spoutId')
    spout_emitted=$(cat /tmp/topology.json | jq '.spouts[0].emitted')
    spout_acked=$(cat /tmp/topology.json | jq '.spouts[0].acked')
    spout_transferred=$(cat /tmp/topology.json | jq '.spouts[0].transferred')
    spout_failed=$(cat /tmp/topology.json | jq '.spouts[0].failed')

    boltsId=$(cat /tmp/topology.json | jq -r '.bolts[0].boltId')
    bolt_emitted=$(cat /tmp/topology.json | jq '.bolts[0].emitted')
    bolt_acked=$(cat /tmp/topology.json | jq '.bolts[0].acked')
    bolt_transferred=$(cat /tmp/topology.json | jq '.bolts[0].transferred')
    bolt_failed=$(cat /tmp/topology.json | jq '.bolts[0].failed')

    if [ -n spoutId ]; then
        STR="${STR}monitor_storm,topology=${1},component=spout,id=${spoutId},host=tmp emitted=${spout_emitted},acked=${spout_acked},transferred=${spout_transferred},failed=${spout_failed}\n"
        STR="${STR}monitor_storm,topology=${1},component=bolt,id=${boltsId},host=tmp emitted=${bolt_emitted},acked=${bolt_acked},transferred=${bolt_transferred},failed=${bolt_failed}\n"
    fi
}

Grafana 展示结果。

storm监控

详细使用,请看Flume监控


参考:
Storm UI REST API
命令行 JSON 处理工具 jq 的使用介绍
jq官方文档