自动化部署Flume+Kafka+Storm

由于业务需求特殊,一般只固定在一周的某几天晚上会有大量数据需要进行实时计算,而且只需要使用4到5个小时。平时只需要小规模的集群进行开发、测试和线上使用。 针对这种情况,为了减少服务器的费用,决定利用阿里云的 API 和 ECS 服务器,自动独立部署一套 Flume+Kafka+Storm 实时计算集群服务,以便灵活应对各种压力下的计算需求。

场景分析

根据业务分析,整理出三种使用场景:

1、平时或用户数据量较小时,直接用固定的小型集群。不需要做任何改变。

2、用户量一般,压力主要集中在数据收集服务器(或压力在 Flume),Kafka + Storm 没有压力。只需要动态扩展 Flume 集群。

3、用户量较大,固定的 Kafka + Storm 集群有压力或可能出现延迟。

第3中场景根据压力可以再细分,但是,都属于动态扩展范畴,所以,归纳为一类。可以在部署时,进行动态选择。

方案说明

首先,阿里云的 ECS 提供了 API (以下简称 API),可以通过各种语言进行调用申请,并能获取到服务器的实例名称、IP等信息。有了这些功能,为实现自动化部署提供了基础。

其次,该方案是建立在能大体预估用户量的基础上的,如果低估了用户量,或需要临时调整时,需要重新进行一次自动化部署(重新部署更大的集群),重新部署过程大概需要 5~10 分钟。为了防止这种情况发生,建议在预估时,尽量给预估量预留一定的增长空间。

整个方案流程其实很简单,没有太多技术含量。

步骤操作说明
1制定方案配置一个 JSON 配置,指定各种方案下,集群的规模和服务器配置。
将该 JSON 存入 Redis
2申请 ECS选择第一步中的一个方案名称,获取到集群规模和服务器配置,
通过 API 申请 ECS(版本为方案中事先做好的镜像),
将申请到的服务器实例ID、IP信息以 JSON 方式存入 Redis
3调用自动部署脚本通过 API 启动所有服务器,服务器启动后,自动调用事先准备好的部署脚本,
该部署脚本会先从 Redis 中获取服务器实例ID、IP信息,
然后通过IP判断来修改 Flume、Kafka、Storm的配置文件和服务启动
4调整负载均衡通过 API 和选择的方案,对负载均衡进行调整
5释放 ECS通过 API 设置集群释放的时间,达到时间后,集群自动释放

步骤1: 需要事先指定好,然后存入 Redis,生命周期为永久。一般情况下,不用再做任何调整,除非有新增方案或调整方案中集群的参数。

JSON 格式如下:

{
  "large": {//方案名称的标识
    "id": 1, //方案序列号,可有可无
    "name": "方案名称,可有可无
    "options": [//该方案下各服务的集群配置
      {
        "flume": {
          "mirror": "镜像名称",//服务镜像名称
          "name": "flume",// 服务名称
          "servers": [//集群的详细配置,这里配置为多个选择,因为经常会出现某种CPU内核的服务器数量不够,导致申请失败。多个选择相当于备选
            {
              "cpu": 4,      //cpu内核
              "disk": 100,   //硬盘大小
              "memory": 16,  //内存
              "number": 6,   //服务器数量
              "tag": "cdn-flume-cluster" //服务器的名称标识
            },
            {
              "cpu": 4,
              "disk": 100,
              "memory": 8,
              "name": "",
              "number": 10,
              "tag": "cdn-flume-cluster"
            },
            ...
          ]
        },
        "kafka": {
          "mirror": "storm-cluster_20170822",
          "name": "kafka",
          "servers": [
            {
              "cpu": 8,
              "disk": 100,
              "memory": 32,
              "name": "",
              "number": 5,
              "tag": "cdn-kafka-cluster"
            },
            ...
          ]
        },
        "storm": {
          "mirror": "storm-cluster_20170822",
          "name": "storm",
          "servers": [
            {
              "cpu": 8,
              "disk": 100,
              "memory": 32,
              "name": "",
              "number": 7,
              "tag": "storm-cluster"
            },
            ...
          ]
        }
      }
    ]
  },
  ...
}

步骤2:通过 API,按照选择方案申请 ECS。这里需要事先准备好镜像。制作镜像的过程是,先申请一台干净的ECS,然后编写部署脚本,启动对应的服务。

这里用到了 Redis 中申请到的服务器实例信息。格式如下:

{
  "create_time": ,//创建时间
  "scheme": 3,    //方案
  "clusters": {   //集群信息
    "flume": {    //flume集群
      "id": [     //实例ID
        "i-2ze1o6sxxxxxxxx","i-2ze1o6sxxxxxxxx","i-2ze1o6sxxxxxxxx"
      ],
      "ip": [     //实例IP
        "10.27.81.1","10.27.81.2","10.27.81.3"
      ]
    }
  },
  "status": 0
}

这里分为两个镜像,一个 Flume 的镜像,一个 Kafka + Storm 的镜像。也可以只做一个镜像,在部署脚本中做区分判断,还可以做三个镜像,每个服务作为一个镜像。各有各的好和坏。

以 Kafka + Storm 镜像为例。

第一步:先获取本地IP和集群的实例ID、IP信息

function get_cluster() {
    LOCAL_IP_INTER=$(/sbin/ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6 | awk '{print $2}' | tr -d 'addr:' | awk 'NR==1')
    LOCAL_IP_OUTER=$(/sbin/ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6 | awk '{print $2}' | tr -d 'addr:' | awk 'NR==2')

    info "inter ip ${LOCAL_IP_INTER}"
    info "outer ip ${LOCAL_IP_OUTER}"

    str=$(python /opt/apps/ssports-data/scripts/cluster_auto.py 'get')

    if [ $? -eq 0 ]; then
        echo "$str" > /tmp/cluster.conf

        #获取zookeeper集群服务器的IP,用来配置HOST
        zk_len=$(cat /tmp/cluster.conf | jq '.clusters.storm.ip | length')
        #选择3台作为zookeeper服务器,用来配置HOST

        #获取storm集群服务器的IP,用来配置HOST
        storm_len=$(cat /tmp/cluster.conf | jq '.clusters.storm.ip | length')

        #获取kafka集群服务器的IP,用来配置HOST
        kafka_len=$(cat /tmp/cluster.conf | jq '.clusters.kafka.ip | length')

        return 0
    else
        #报警
        return 1
    fi
}

第二步:配置 HOST

function set_host() {
    info "start setting up host."
    echo "::1           localhost localhost.localdomain localhost6 localhost6.localdomain6 > /etc/hosts
    echo "127.0.0.1     localhost" >> /etc/hosts
    echo "" >> /etc/hosts
    echo "# start add cluster host of storm" >> /etc/hosts
    # storm
    num=${#CLUSTER_IP_INTER_STORM[@]}
    for ((i=0; i<${num};i++));
    do
        line=${CLUSTER_IP_INTER_STORM[$i]}"  storm-cluster-$[$i+1]-${day}  storm-cluster-$[$i+1]  "${CLUSTER_ID_STORM[$i]}
        echo ${line} >> /etc/hosts
    done
    echo "" >> /etc/hosts

    # kafka
    num=${#CLUSTER_IP_INTER_KAFKA[@]}
    for ((i=0; i<${num};i++));
    do
        line=${CLUSTER_IP_INTER_KAFKA[$i]}"  kafka-$[$i+1]-${day}  kafka-$[$i+1]   "${CLUSTER_ID_KAFKA[$i]}
        echo ${line} >> /etc/hosts
    done
    echo "" >> /etc/hosts

    # zookeeper
    num=${#CLUSTER_IP_INTER_ZOOKEEPER[@]}
    for ((i=0; i<${num};i++));
    do
        line=${CLUSTER_IP_INTER_ZOOKEEPER[$i]}"  zk-$[$i+1]"
        echo ${line} >> /etc/hosts
    done
    echo "" >> /etc/hosts

    echo "# end add cluster host of storm" >> /etc/hosts
}

第三步:修改主机名称。注意,Ubuntu 和 CentOS 是有区别的。

function set_hostname() {
    hostname="${1}-${day}"
    sed -i "s/elastic/${hostname}/g" /etc/sysconfig/network
    #echo "${hostname}" > /etc/hostname
    hostname ${hostname}
}

第四步:修改服务的配置文件。由于FlumeKafkaStorm、Zookeeper 的集群配置文件都很类同,一般只需修改 1~2 各参数,所以,可以先把配置文件模板化,然后,根据 IP 判断修改配置。

#配置zookeeper配置文件
function set_zookeeper() {
    info "start setting up zookeeper."

    num=${#CLUSTER_IP_INTER_ZOOKEEPER[@]}
    for ((i=0; i<${num};i++));
    do
        if [ ${CLUSTER_IP_INTER_ZOOKEEPER[$i]} = ${LOCAL_IP_INTER} ]; then
            info "input $[$i+1] to /tmp/zookeeper/data/myid"
            echo "$[$i+1]" > /tmp/zookeeper/data/myid
            chown hadoop:hadoop /tmp/zookeeper/data/myid
            SERVICE_ZOOKEEPER=1
        fi
    done
}

#配置storm配置文件
function set_storm() {
    info "start setting up storm."
    echo "0" > /tmp/storm/status
    num=${#CLUSTER_IP_INTER_STORM[@]}
    for ((i=0; i<${num};i++));
    do
        if [ ${CLUSTER_IP_INTER_STORM[$i]} = ${LOCAL_IP_INTER} ]; then
            SERVICE_STORM=1
            if [ ${i} -le 1 ]; then
                info "${LOCAL_IP_INTER} is nimbus."
                SERVICE_STORM_NIMBUS=$[$i+1]
                SERVICE_HOST_NAME="storm"
            fi
            set_hostname "storm-cluster-$[$i+1]"
        fi
    done
}


#配置kafka配置文件
function set_kafka() {
    info "start setting up kafka."

    num=${#CLUSTER_IP_INTER_KAFKA[@]}
    for ((i=0; i<${num};i++));
    do
        if [ ${CLUSTER_IP_INTER_KAFKA[$i]} = ${LOCAL_IP_INTER} ]; then
            SERVICE_KAFKA=1
            SERVICE_HOST_NAME="kafka"
            cat /opt/apps/kafka/config/server.properties.template > /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_PARTITION/${num}/g" /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_IP_INTER/${LOCAL_IP_INTER}/g" /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_INDEX/${i}/g" /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_HOST_NAME/kafka-$[$i+1]/g" /opt/apps/kafka/config/server.properties

            SERVICE_KAFKA_MASTER=$[$i+1]
            set_hostname "kafka-$[$i+1]"
        fi
    done
}

第五步:配置监控服务。这里加了 Telegraf 监控。

# 配置Telegraf监控,args:storm/kafka  ip
function set_telegraf() {
    info "start config telegraf"
    TELEGRAF_HOSTNAME="${1}_${2}"
    if [ ${SERVICE_STORM_NIMBUS} -eq 1 ]; then
        cat /etc/telegraf/telegraf_storm.conf > /etc/telegraf/telegraf.conf
    elif [ ${SERVICE_KAFKA} -eq 1 ]; then
        cat /etc/telegraf/telegraf_kafka.conf > /etc/telegraf/telegraf.conf
    fi

    sed -i "s/TELEGRAF_HOSTNAME/${TELEGRAF_HOSTNAME}/g" /etc/telegraf/telegraf.conf
}

第六步:启动各服务。

# 启动zookeeper
function start_zookeeper() {
    if [ ${SERVICE_ZOOKEEPER} -eq 1 ]; then
        info "start zookeeper service."
        su hadoop <<EOF
            cd /opt/apps/zookeeper
            sh ./bin/zkServer.sh start
            exit 0
EOF
    fi
}

# 启动storm
function start_storm() {
    if [ ${SERVICE_STORM} -eq 1 ]; then
        info "start storm service."
        if [ ${SERVICE_STORM_NIMBUS} -eq 1 -o ${SERVICE_STORM_NIMBUS} -eq 2 ]; then
            su hadoop <<EOF
                cd /opt/apps/storm/
                    sh ./bin/storm nimbus &
                    sh ./bin/storm ui &
                exit 0
EOF
        else
            su hadoop <<EOF
                cd /opt/apps/storm/
                    sh ./bin/storm supervisor &
                exit 0
EOF
        fi
    fi
}

#启动Kafka
function start_kafka() {
    if [ ${SERVICE_KAFKA} -eq 1 ]; then
        info "start kafka service."
        echo "${SERVICE_KAFKA_MASTER}" > /tmp/storm/kafka
        num=${#CLUSTER_IP_INTER_KAFKA[@]}
        echo "$[$num+$num]" > /tmp/storm/workers
        chown -R hadoop:hadoop /tmp/storm/kafka
        chown -R hadoop:hadoop /tmp/storm/workers

        su hadoop <<EOF
            cd /opt/apps/kafka
            sh ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
        sleep 10
            #MASTER=$(cat /tmp/storm/kafka)
            #WORKERS=`cat /tmp/storm/workers`
        echo "master is $(cat /tmp/storm/kafka), workers is $(cat /tmp/storm/workers)"
            if [ $(cat /tmp/storm/kafka) -eq 1 ]; then
                if [ $(cat /tmp/storm/workers) -eq 3 ]; then
                    sh ./bin/kafka-topics.sh --create --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka --partitions 3 --replication-factor 1 --topic cdn-link >/dev/null  2>&1  &
                elif [ $(cat /tmp/storm/workers) -eq 5 ]; then
                    sh ./bin/kafka-topics.sh --create --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka --partitions 5 --replication-factor 1 --topic cdn-link >/dev/null  2>&1  &
                else
                    echo "workers is $(cat /tmp/storm/workers), not match partitions, create partitions failed."
                fi
            fi
            exit 0
EOF
    fi
}

第七步:启动应用。

# 启动topology
function start_topology() {
    if [ ${SERVICE_STORM_NIMBUS} -eq 1 ]; then
    sleep 40
        echo "1" > /tmp/storm/status
        num=${#CLUSTER_IP_INTER_KAFKA[@]}
        echo "${num}" > /tmp/storm/workers
        su hadoop <<EOF
            cd /opt/apps/storm/
            cp /tmp/topology.jar /home/storm/
            workers=$(cat /tmp/storm/workers)
            ./bin/storm jar /home/storm/topology.jar com.xx.xx.Topology  Topology Name${workers}
            exit 0
EOF
    fi
}

最后,将部署脚本加入到服务器自启动服务中,就完成了自动化部署脚本。

将该 ECS 做一个镜像,就完成了。

步骤3:申请的服务器启动后,自动调用部署脚本,完成服务的部署和启动。当然,还需要有对服务进行各种检查,防止服务部署失败或运行异常等情况。

步骤4:通过 API 和选择的方案,对负载均衡进行调整。

步骤5:自动释放服务器。