Storm消费Kafka监控

 2017-04-18 16:32:32     Storm  Kafka  Zookeeper  监控   2963


导读: Flume+Kafka+Storm是目前比较常用的一种实时处理组合。组合的框架越多,可能出问题的地方就越多,相对应的问题排查也就越麻烦。这时,监控就显得尤为重要,有效的监控,可以帮助我们快速定位问题的环节,减少排查的时间。

Storm服务自带UI,可以查看服务状态,也可以直接调用其提供的rest api,实现监控报警。

Kafka自带的有消费命令脚本,可以查看各topic和partition对应的offset信息。但是由于版本和消费方的使用规则不同,无法做到统一,不一定好用。

Flume监控,这里先不介绍,后期会单独介绍。

Storm消费Kafka监控


由于各项目的升级,所对应的一些细节可能会有所变化,这里先对各版本做一下说明。

项目 版本 说明
Flume 1.7 Flume1.7要求Kafka版本为0.9+
Kafka 0.10.1.0 Kafka从0.9版本开始,提供了新的KafkaConsumer API
Storm 1.0.1 Storm从1.0开始,整个项目包结构进行了调整,使用上没有太大区别

Storm消费Kafka监控分为Kafka生产者的当前offset(endOffset)和Storm当前消费的offset(useOffset),两者的差值即为当前的延迟量lag(lag=endOffset-useOffset)。

endOffset的记录没有存在Zookeeper中,需要调用Kafka的KafkaConsumer API获取。

useOffset的记录记录在Zookeeper中,如果Storm项目中使用了其自带的KafkaSpout,useOffset等信息不会存在consumers目录下,KafkaOffsetMonitor和Kafka自带的命令脚本都将无法使用。

查找了相关资料,发现要实现简单的监控,并非难事。

生产者记录


通过KafkaConsumer API,很容易获得topic在各partition的信息。

首先,maven引用kafka-clients

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.1.0</version>
    <exclusions>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

然后,创建一个KafkaConsumer实例。

public static KafkaConsumer<String, String> createConsumer() {
    Properties properties = new Properties();
    //设置brokerServer(kafka)ip地址
    String[] services = ConfigUtil.getAllConfig().getStringArray(Keys.KAFKA_BOOTSTRAP_SERVERS);
    String service = "";
    for (String str : services) {
        service = service + str + ",";
    }
    properties.put("bootstrap.servers", service.substring(0, service.length() - 1));
    //设置consumer group name
    //properties.put("group.id", "flume");
    //设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
    //props.put("enable.auto.commit", "true");
    //偏移量(offset)提交频率
    // props.put("auto.commit.interval.ms", "1000");
    //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
    //如果采用latest,消费者只能得道其启动后,生产者生产的消息
    //props.put("auto.offset.reset", "earliest");
    //

    properties.put("session.timeout.ms", ConfigUtil.getAllConfig().getInt(Keys.KAFKA_SESSION_TIMEOUT));
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    kafkaConsumer = new KafkaConsumer<String ,String>(properties);
}

最后,获取topic对应的partition的信息。

public static Map<TopicPartition, Long> getLastOffset(KafkaConsumer<String, String> consumer, String topic, int partitionNumber) {
    List<TopicPartition> list = Lists.newArrayList();
    for (int index = 0; index < partitionNumber; index++) {
        TopicPartition partition = new TopicPartition(topic, index);
        list.add(partition);
    }

    /*Map<TopicPartition, Long> map = consumer.endOffsets(list);
    for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
        TopicPartition topicPartition = entry.getKey();
        Long offset = entry.getValue();
        LOG.info("partition: " + topicPartition.partition() + ", topic: " + topicPartition.topic() + ", last offset: " + offset);
    }*/
    return consumer.endOffsets(list);
}

TopicPartition对象中有offfset记录。这样就获取了endOffset记录。

消费者记录


一般情况下,Kafka的消费信息都在Zookeeper的consumers目录下。通过Kafka命令就可以获取。

sh +x ./bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test --group group-test

这个命令可以直接获取生成者的offset和消费者的offset。

前面我们已经提过,Storm自带的KafkaSpout比较特殊。

在介绍获取Storm消费Kafka的offset前,我们先简单介绍一下Kafka在Zookeeper的存储结构。

Kafka在Zookeeper的存储结构

从下图就可以查出大体的结构情况。

Kafka在Zookeeper的存储结构

说明: 该图片转自apache kafka系列之在zookeeper中存储结构,详细内容请点击链接。

假设Kafka在Zookeeper的根目录为/kafka,kafka的topic为test,partition数量为3。

查看Kafka的所有topic信息:

ls /kafka/brokers/topics

查看test在partition=0的信息:

[zk: localhost:2181(CONNECTED) 12] get /kafka/brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]}
cZxid = 0x200144880
ctime = Mon Dec 26 20:47:18 CST 2016
mZxid = 0x200144880
mtime = Mon Dec 26 20:47:18 CST 2016
pZxid = 0x200144880
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0

查看Broker的注册信息
每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)

[zk: localhost:2181(CONNECTED) 13] get /kafka/brokers/ids/0
{"jmx_port":-1,"timestamp":"1481877114183","endpoints":["PLAINTEXT://10.25.X.X:9092"],"host":"10.25.X.X","version":3,"port":9092}
cZxid = 0x100162527
ctime = Fri Dec 16 16:31:54 CST 2016
mZxid = 0x100162527
mtime = Fri Dec 16 16:31:54 CST 2016
pZxid = 0x100162527
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x158768c1755000a
dataLength = 135
numChildren = 0

在创建Storm的KafkaSpout实例时,我们需要指定一个clientId

SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/kafka", "storm-live-test");

查看Zookeeper会看到,在/kafka根目录下,存在对应的storm-live-test。这是KafkaSpout生成的目录。

[zk: localhost:2181(CONNECTED) 15] ls /kafka 
[cluster, controller, controller_epoch, brokers, storm-live-test, admin, isr_change_notification, consumers, config]

查看该目录下的信息,你会找到Storm消费的offset:

[zk: localhost:2181(CONNECTED) 16] get /kafka/storm-live-test/partition_0
{"topology":{"id":"LiveTopology-29-1484213098","name":"LiveTopology"},"offset":21457045,"partition":0,"broker":{"host":"10.25.X.X","port":9092},"topic":"test"}
cZxid = 0x100162995
ctime = Fri Dec 16 16:56:38 CST 2016
mZxid = 0x2005bed2e
mtime = Mon Jan 16 15:32:06 CST 2017
pZxid = 0x100162995
cversion = 0
dataVersion = 821383
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 169
numChildren = 0

知道了Zookeeper中的存储路径,获取offset就容易了许多。

获取Storm消费Kafka的offset

首先,我们需要创建一个Zookeeper连接。

public static ZooKeeper getConnect() throws Exception {
    if (zk == null) {
        String[] hosts = ConfigUtil.getAllConfig().getStringArray(Keys.ZK_HOSTS);
        String host = "";
        for (String str : hosts) {
            host = host + str + ",";
        }
        zk = new ZooKeeper(host.substring(0, host.length() - 1),
                ConfigUtil.getAllConfig().getInt(Keys.ZK_SESSION_TIMEOUT), new ConnectionWatcher());
    }
    return zk;
}

然后,根据ZooKeeper实例,就可以获取指定目录的信息。

#获取topic的partition梳理
List<String> list = ZKConnection.getConnect().getChildren("/kafka/storm-live-test", false);

#获取指定目录的信息
String info = new String(zk.getData("/kafka/storm-live-test/partition_0", false, null));
JSONObject json = JSON.parseObject(info);
long offset = json.getLong("offset");

获取到Storm的消费offset,延迟量就知道了。再加上定时功能,定时获取endOffset和useOffset,一个简单的延迟监控就完成了。

可以根据需要,再加上报警功能,这样就完美了。

以下为监控截图:

Storm消费延迟监控图

修改消费者offset

一般情况下,不建议直接修改Zookeeper中的offset。以免对正在运行的项目造成影响。

假设Storm的Kafka消费出现了严重延迟,需要手动修改offset。

首先,先停掉Storm的Topology服务。

然后,修改每个partition的offset。

set kafka/storm-live-test/partition_0 {"topology":{"id":"LiveTopology-29-1484213098","name":"LiveTopology"},"offset":21458000,"partition":0,"broker":{"host":"10.25.X.X","port":9092},"topic":"test"}

最后,启动Storm的Topology服务。

Storm监控


通过Storm UI,可以查看Storm的各种实时信息。

Storm自身提供了Rest API,Storm UI就是在调用API的基础上做了图形展示。

例如:

  • Storm集群配置参数信息:
http://<ui-host>:port/api/v1/cluster/configuration
  • Storm集群summary信息:
http://<ui-host>:port/api/v1/cluster/summary

http://<ui-host>:port/api/v1/supervisor/summary

http://<ui-host>:port/api/v1/nimbus/summary

http://<ui-host>:port/api/v1/history/summary

http://<ui-host>:port/api/v1/topology/summary
  • Topology信息:
http://<ui-host>:port/api/v1/topology/LiveTopology-29-1484213098

参考: