自动化部署Flume+Kafka+Storm
由于业务需求特殊,一般只固定在一周的某几天晚上会有大量数据需要进行实时计算,而且只需要使用4到5个小时。平时只需要小规模的集群进行开发、测试和线上使用。 针对这种情况,为了减少服务器的费用,决定利用阿里云的 API 和 ECS 服务器,自动独立部署一套 Flume+Kafka+Storm 实时计算集群服务,以便灵活应对各种压力下的计算需求。
场景分析
根据业务分析,整理出三种使用场景:
1、平时或用户数据量较小时,直接用固定的小型集群。不需要做任何改变。

2、用户量一般,压力主要集中在数据收集服务器(或压力在 Flume
),Kafka + Storm
没有压力。只需要动态扩展 Flume
集群。

3、用户量较大,固定的 Kafka + Storm
集群有压力或可能出现延迟。

第3中场景根据压力可以再细分,但是,都属于动态扩展范畴,所以,归纳为一类。可以在部署时,进行动态选择。
方案说明
首先,阿里云的 ECS
提供了 API
(以下简称 API
),可以通过各种语言进行调用申请,并能获取到服务器的实例名称、IP等信息。有了这些功能,为实现自动化部署提供了基础。
其次,该方案是建立在能大体预估用户量的基础上的,如果低估了用户量,或需要临时调整时,需要重新进行一次自动化部署(重新部署更大的集群),重新部署过程大概需要 5~10 分钟。为了防止这种情况发生,建议在预估时,尽量给预估量预留一定的增长空间。
整个方案流程其实很简单,没有太多技术含量。
步骤 | 操作 | 说明 |
---|---|---|
1 | 制定方案配置 | 一个 JSON 配置,指定各种方案下,集群的规模和服务器配置。将该 JSON 存入 Redis |
2 | 申请 ECS | 选择第一步中的一个方案名称,获取到集群规模和服务器配置, 通过 API 申请 ECS (版本为方案中事先做好的镜像),将申请到的服务器实例ID、IP信息以 JSON 方式存入 Redis |
3 | 调用自动部署脚本 | 通过 API 启动所有服务器,服务器启动后,自动调用事先准备好的部署脚本,该部署脚本会先从 Redis 中获取服务器实例ID、IP信息,然后通过IP判断来修改 Flume、Kafka、Storm的配置文件和服务启动 |
4 | 调整负载均衡 | 通过 API 和选择的方案,对负载均衡进行调整 |
5 | 释放 ECS | 通过 API 设置集群释放的时间,达到时间后,集群自动释放 |
步骤1: 需要事先指定好,然后存入 Redis
,生命周期为永久。一般情况下,不用再做任何调整,除非有新增方案或调整方案中集群的参数。
JSON
格式如下:
{ "large": {//方案名称的标识 "id": 1, //方案序列号,可有可无 "name": "方案名称,可有可无 "options": [//该方案下各服务的集群配置 { "flume": { "mirror": "镜像名称",//服务镜像名称 "name": "flume",// 服务名称 "servers": [//集群的详细配置,这里配置为多个选择,因为经常会出现某种CPU内核的服务器数量不够,导致申请失败。多个选择相当于备选 { "cpu": 4, //cpu内核 "disk": 100, //硬盘大小 "memory": 16, //内存 "number": 6, //服务器数量 "tag": "cdn-flume-cluster" //服务器的名称标识 }, { "cpu": 4, "disk": 100, "memory": 8, "name": "", "number": 10, "tag": "cdn-flume-cluster" }, ... ] }, "kafka": { "mirror": "storm-cluster_20170822", "name": "kafka", "servers": [ { "cpu": 8, "disk": 100, "memory": 32, "name": "", "number": 5, "tag": "cdn-kafka-cluster" }, ... ] }, "storm": { "mirror": "storm-cluster_20170822", "name": "storm", "servers": [ { "cpu": 8, "disk": 100, "memory": 32, "name": "", "number": 7, "tag": "storm-cluster" }, ... ] } } ] }, ... }
步骤2:通过 API
,按照选择方案申请 ECS
。这里需要事先准备好镜像。制作镜像的过程是,先申请一台干净的ECS,然后编写部署脚本,启动对应的服务。
这里用到了 Redis
中申请到的服务器实例信息。格式如下:
{ "create_time": ,//创建时间 "scheme": 3, //方案 "clusters": { //集群信息 "flume": { //flume集群 "id": [ //实例ID "i-2ze1o6sxxxxxxxx","i-2ze1o6sxxxxxxxx","i-2ze1o6sxxxxxxxx" ], "ip": [ //实例IP "10.27.81.1","10.27.81.2","10.27.81.3" ] } }, "status": 0 }
这里分为两个镜像,一个 Flume
的镜像,一个 Kafka + Storm
的镜像。也可以只做一个镜像,在部署脚本中做区分判断,还可以做三个镜像,每个服务作为一个镜像。各有各的好和坏。
以 Kafka + Storm
镜像为例。
第一步:先获取本地IP和集群的实例ID、IP信息
function get_cluster() { LOCAL_IP_INTER=$(/sbin/ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6 | awk '{print $2}' | tr -d 'addr:' | awk 'NR==1') LOCAL_IP_OUTER=$(/sbin/ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6 | awk '{print $2}' | tr -d 'addr:' | awk 'NR==2') info "inter ip ${LOCAL_IP_INTER}" info "outer ip ${LOCAL_IP_OUTER}" str=$(python /opt/apps/ssports-data/scripts/cluster_auto.py 'get') if [ $? -eq 0 ]; then echo "$str" > /tmp/cluster.conf #获取zookeeper集群服务器的IP,用来配置HOST zk_len=$(cat /tmp/cluster.conf | jq '.clusters.storm.ip | length') #选择3台作为zookeeper服务器,用来配置HOST #获取storm集群服务器的IP,用来配置HOST storm_len=$(cat /tmp/cluster.conf | jq '.clusters.storm.ip | length') #获取kafka集群服务器的IP,用来配置HOST kafka_len=$(cat /tmp/cluster.conf | jq '.clusters.kafka.ip | length') return 0 else #报警 return 1 fi }
第二步:配置 HOST
。
function set_host() { info "start setting up host." echo "::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 > /etc/hosts echo "127.0.0.1 localhost" >> /etc/hosts echo "" >> /etc/hosts echo "# start add cluster host of storm" >> /etc/hosts # storm num=${#CLUSTER_IP_INTER_STORM[@]} for ((i=0; i<${num};i++)); do line=${CLUSTER_IP_INTER_STORM[$i]}" storm-cluster-$[$i+1]-${day} storm-cluster-$[$i+1] "${CLUSTER_ID_STORM[$i]} echo ${line} >> /etc/hosts done echo "" >> /etc/hosts # kafka num=${#CLUSTER_IP_INTER_KAFKA[@]} for ((i=0; i<${num};i++)); do line=${CLUSTER_IP_INTER_KAFKA[$i]}" kafka-$[$i+1]-${day} kafka-$[$i+1] "${CLUSTER_ID_KAFKA[$i]} echo ${line} >> /etc/hosts done echo "" >> /etc/hosts # zookeeper num=${#CLUSTER_IP_INTER_ZOOKEEPER[@]} for ((i=0; i<${num};i++)); do line=${CLUSTER_IP_INTER_ZOOKEEPER[$i]}" zk-$[$i+1]" echo ${line} >> /etc/hosts done echo "" >> /etc/hosts echo "# end add cluster host of storm" >> /etc/hosts }
第三步:修改主机名称。注意,Ubuntu
和 CentOS
是有区别的。
function set_hostname() { hostname="${1}-${day}" sed -i "s/elastic/${hostname}/g" /etc/sysconfig/network #echo "${hostname}" > /etc/hostname hostname ${hostname} }
第四步:修改服务的配置文件。由于Flume
、Kafka
、Stor
m、Zookeeper
的集群配置文件都很类同,一般只需修改 1~2 各参数,所以,可以先把配置文件模板化,然后,根据 IP
判断修改配置。
#配置zookeeper配置文件 function set_zookeeper() { info "start setting up zookeeper." num=${#CLUSTER_IP_INTER_ZOOKEEPER[@]} for ((i=0; i<${num};i++)); do if [ ${CLUSTER_IP_INTER_ZOOKEEPER[$i]} = ${LOCAL_IP_INTER} ]; then info "input $[$i+1] to /tmp/zookeeper/data/myid" echo "$[$i+1]" > /tmp/zookeeper/data/myid chown hadoop:hadoop /tmp/zookeeper/data/myid SERVICE_ZOOKEEPER=1 fi done } #配置storm配置文件 function set_storm() { info "start setting up storm." echo "0" > /tmp/storm/status num=${#CLUSTER_IP_INTER_STORM[@]} for ((i=0; i<${num};i++)); do if [ ${CLUSTER_IP_INTER_STORM[$i]} = ${LOCAL_IP_INTER} ]; then SERVICE_STORM=1 if [ ${i} -le 1 ]; then info "${LOCAL_IP_INTER} is nimbus." SERVICE_STORM_NIMBUS=$[$i+1] SERVICE_HOST_NAME="storm" fi set_hostname "storm-cluster-$[$i+1]" fi done } #配置kafka配置文件 function set_kafka() { info "start setting up kafka." num=${#CLUSTER_IP_INTER_KAFKA[@]} for ((i=0; i<${num};i++)); do if [ ${CLUSTER_IP_INTER_KAFKA[$i]} = ${LOCAL_IP_INTER} ]; then SERVICE_KAFKA=1 SERVICE_HOST_NAME="kafka" cat /opt/apps/kafka/config/server.properties.template > /opt/apps/kafka/config/server.properties sed -i "s/KAFKA_PARTITION/${num}/g" /opt/apps/kafka/config/server.properties sed -i "s/KAFKA_IP_INTER/${LOCAL_IP_INTER}/g" /opt/apps/kafka/config/server.properties sed -i "s/KAFKA_INDEX/${i}/g" /opt/apps/kafka/config/server.properties sed -i "s/KAFKA_HOST_NAME/kafka-$[$i+1]/g" /opt/apps/kafka/config/server.properties SERVICE_KAFKA_MASTER=$[$i+1] set_hostname "kafka-$[$i+1]" fi done }
第五步:配置监控服务。这里加了 Telegraf
监控。
# 配置Telegraf监控,args:storm/kafka ip function set_telegraf() { info "start config telegraf" TELEGRAF_HOSTNAME="${1}_${2}" if [ ${SERVICE_STORM_NIMBUS} -eq 1 ]; then cat /etc/telegraf/telegraf_storm.conf > /etc/telegraf/telegraf.conf elif [ ${SERVICE_KAFKA} -eq 1 ]; then cat /etc/telegraf/telegraf_kafka.conf > /etc/telegraf/telegraf.conf fi sed -i "s/TELEGRAF_HOSTNAME/${TELEGRAF_HOSTNAME}/g" /etc/telegraf/telegraf.conf }
第六步:启动各服务。
# 启动zookeeper function start_zookeeper() { if [ ${SERVICE_ZOOKEEPER} -eq 1 ]; then info "start zookeeper service." su hadoop <<EOF cd /opt/apps/zookeeper sh ./bin/zkServer.sh start exit 0 EOF fi } # 启动storm function start_storm() { if [ ${SERVICE_STORM} -eq 1 ]; then info "start storm service." if [ ${SERVICE_STORM_NIMBUS} -eq 1 -o ${SERVICE_STORM_NIMBUS} -eq 2 ]; then su hadoop <<EOF cd /opt/apps/storm/ sh ./bin/storm nimbus & sh ./bin/storm ui & exit 0 EOF else su hadoop <<EOF cd /opt/apps/storm/ sh ./bin/storm supervisor & exit 0 EOF fi fi } #启动Kafka function start_kafka() { if [ ${SERVICE_KAFKA} -eq 1 ]; then info "start kafka service." echo "${SERVICE_KAFKA_MASTER}" > /tmp/storm/kafka num=${#CLUSTER_IP_INTER_KAFKA[@]} echo "$[$num+$num]" > /tmp/storm/workers chown -R hadoop:hadoop /tmp/storm/kafka chown -R hadoop:hadoop /tmp/storm/workers su hadoop <<EOF cd /opt/apps/kafka sh ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 & sleep 10 #MASTER=$(cat /tmp/storm/kafka) #WORKERS=`cat /tmp/storm/workers` echo "master is $(cat /tmp/storm/kafka), workers is $(cat /tmp/storm/workers)" if [ $(cat /tmp/storm/kafka) -eq 1 ]; then if [ $(cat /tmp/storm/workers) -eq 3 ]; then sh ./bin/kafka-topics.sh --create --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka --partitions 3 --replication-factor 1 --topic cdn-link >/dev/null 2>&1 & elif [ $(cat /tmp/storm/workers) -eq 5 ]; then sh ./bin/kafka-topics.sh --create --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka --partitions 5 --replication-factor 1 --topic cdn-link >/dev/null 2>&1 & else echo "workers is $(cat /tmp/storm/workers), not match partitions, create partitions failed." fi fi exit 0 EOF fi }
第七步:启动应用。
# 启动topology function start_topology() { if [ ${SERVICE_STORM_NIMBUS} -eq 1 ]; then sleep 40 echo "1" > /tmp/storm/status num=${#CLUSTER_IP_INTER_KAFKA[@]} echo "${num}" > /tmp/storm/workers su hadoop <<EOF cd /opt/apps/storm/ cp /tmp/topology.jar /home/storm/ workers=$(cat /tmp/storm/workers) ./bin/storm jar /home/storm/topology.jar com.xx.xx.Topology Topology Name${workers} exit 0 EOF fi }
最后,将部署脚本加入到服务器自启动服务中,就完成了自动化部署脚本。
将该 ECS
做一个镜像,就完成了。
步骤3:申请的服务器启动后,自动调用部署脚本,完成服务的部署和启动。当然,还需要有对服务进行各种检查,防止服务部署失败或运行异常等情况。
步骤4:通过 API
和选择的方案,对负载均衡进行调整。
步骤5:自动释放服务器。