自动化部署Flume+Kafka+Storm

 2017-12-03 21:48:23     阿里云  Flume  Kafka  Storm  自动化部署   513


导读: 由于业务需求特殊,一般只固定在一周的某几天晚上会有大量数据需要进行实时计算,而且只需要使用4到5个小时。平时只需要小规模的集群进行开发、测试和线上使用。 针对这种情况,为了减少服务器的费用,决定利用阿里云的 API 和 ECS 服务器,自动独立部署一套 Flume+Kafka+Storm 实时计算集群服务,以便灵活应对各种压力下的计算需求。

场景分析


根据业务分析,整理出三种使用场景:

1、平时或用户数据量较小时,直接用固定的小型集群。不需要做任何改变。

场景1

2、用户量一般,压力主要集中在数据收集服务器(或压力在 Flume),Kafka + Storm 没有压力。只需要动态扩展 Flume 集群。

场景2

3、用户量较大,固定的 Kafka + Storm 集群有压力或可能出现延迟。

场景3

第3中场景根据压力可以再细分,但是,都属于动态扩展范畴,所以,归纳为一类。可以在部署时,进行动态选择。

方案说明


首先,阿里云的 ECS 提供了 API (以下简称 API),可以通过各种语言进行调用申请,并能获取到服务器的实例名称、IP等信息。有了这些功能,为实现自动化部署提供了基础。

其次,该方案是建立在能大体预估用户量的基础上的,如果低估了用户量,或需要临时调整时,需要重新进行一次自动化部署(重新部署更大的集群),重新部署过程大概需要 5~10 分钟。为了防止这种情况发生,建议在预估时,尽量给预估量预留一定的增长空间。

整个方案流程其实很简单,没有太多技术含量。

步骤 操作 说明
1 制定方案配置 一个 JSON 配置,指定各种方案下,集群的规模和服务器配置。
将该 JSON 存入 Redis
2 申请 ECS 选择第一步中的一个方案名称,获取到集群规模和服务器配置,
通过 API 申请 ECS(版本为方案中事先做好的镜像),
将申请到的服务器实例ID、IP信息以 JSON 方式存入 Redis
3 调用自动部署脚本 通过 API 启动所有服务器,服务器启动后,自动调用事先准备好的部署脚本,
该部署脚本会先从 Redis 中获取服务器实例ID、IP信息,
然后通过IP判断来修改 Flume、Kafka、Storm的配置文件和服务启动
4 调整负载均衡 通过 API 和选择的方案,对负载均衡进行调整
5 释放 ECS 通过 API 设置集群释放的时间,达到时间后,集群自动释放

步骤1: 需要事先指定好,然后存入 Redis,生命周期为永久。一般情况下,不用再做任何调整,除非有新增方案或调整方案中集群的参数。

JSON 格式如下:

{
  "large": {//方案名称的标识
    "id": 1, //方案序列号,可有可无
    "name": "方案名称,可有可无
    "options": [//该方案下各服务的集群配置
      {
        "flume": {
          "mirror": "镜像名称",//服务镜像名称
          "name": "flume",// 服务名称
          "servers": [//集群的详细配置,这里配置为多个选择,因为经常会出现某种CPU内核的服务器数量不够,导致申请失败。多个选择相当于备选
            {
              "cpu": 4,      //cpu内核
              "disk": 100,   //硬盘大小
              "memory": 16,  //内存
              "number": 6,   //服务器数量
              "tag": "cdn-flume-cluster" //服务器的名称标识
            },
            {
              "cpu": 4,
              "disk": 100,
              "memory": 8,
              "name": "",
              "number": 10,
              "tag": "cdn-flume-cluster"
            },
            ...
          ]
        },
        "kafka": {
          "mirror": "storm-cluster_20170822",
          "name": "kafka",
          "servers": [
            {
              "cpu": 8,
              "disk": 100,
              "memory": 32,
              "name": "",
              "number": 5,
              "tag": "cdn-kafka-cluster"
            },
            ...
          ]
        },
        "storm": {
          "mirror": "storm-cluster_20170822",
          "name": "storm",
          "servers": [
            {
              "cpu": 8,
              "disk": 100,
              "memory": 32,
              "name": "",
              "number": 7,
              "tag": "storm-cluster"
            },
            ...
          ]
        }
      }
    ]
  },
  ...
}

步骤2:通过 API,按照选择方案申请 ECS。这里需要事先准备好镜像。制作镜像的过程是,先申请一台干净的ECS,然后编写部署脚本,启动对应的服务。

这里用到了 Redis 中申请到的服务器实例信息。格式如下:

{
  "create_time": ,//创建时间
  "scheme": 3,    //方案
  "clusters": {   //集群信息
    "flume": {    //flume集群
      "id": [     //实例ID
        "i-2ze1o6sxxxxxxxx","i-2ze1o6sxxxxxxxx","i-2ze1o6sxxxxxxxx"
      ],
      "ip": [     //实例IP
        "10.27.81.1","10.27.81.2","10.27.81.3"
      ]
    }
  },
  "status": 0
}

这里分为两个镜像,一个 Flume 的镜像,一个 Kafka + Storm 的镜像。也可以只做一个镜像,在部署脚本中做区分判断,还可以做三个镜像,每个服务作为一个镜像。各有各的好和坏。

Kafka + Storm 镜像为例。

第一步:先获取本地IP和集群的实例ID、IP信息

function get_cluster() {
    LOCAL_IP_INTER=$(/sbin/ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6 | awk '{print $2}' | tr -d 'addr:' | awk 'NR==1')
    LOCAL_IP_OUTER=$(/sbin/ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6 | awk '{print $2}' | tr -d 'addr:' | awk 'NR==2')

    info "inter ip ${LOCAL_IP_INTER}"
    info "outer ip ${LOCAL_IP_OUTER}"

    str=$(python /opt/apps/ssports-data/scripts/cluster_auto.py 'get')

    if [ $? -eq 0 ]; then
        echo "$str" > /tmp/cluster.conf

        #获取zookeeper集群服务器的IP,用来配置HOST
        zk_len=$(cat /tmp/cluster.conf | jq '.clusters.storm.ip | length')
        #选择3台作为zookeeper服务器,用来配置HOST

        #获取storm集群服务器的IP,用来配置HOST
        storm_len=$(cat /tmp/cluster.conf | jq '.clusters.storm.ip | length')

        #获取kafka集群服务器的IP,用来配置HOST
        kafka_len=$(cat /tmp/cluster.conf | jq '.clusters.kafka.ip | length')

        return 0
    else
        #报警
        return 1
    fi
}

第二步:配置 HOST

function set_host() {
    info "start setting up host."
    echo "::1           localhost localhost.localdomain localhost6 localhost6.localdomain6 > /etc/hosts
    echo "127.0.0.1     localhost" >> /etc/hosts
    echo "" >> /etc/hosts
    echo "# start add cluster host of storm" >> /etc/hosts
    # storm
    num=${#CLUSTER_IP_INTER_STORM[@]}
    for ((i=0; i<${num};i++));
    do
        line=${CLUSTER_IP_INTER_STORM[$i]}"  storm-cluster-$[$i+1]-${day}  storm-cluster-$[$i+1]  "${CLUSTER_ID_STORM[$i]}
        echo ${line} >> /etc/hosts
    done
    echo "" >> /etc/hosts

    # kafka
    num=${#CLUSTER_IP_INTER_KAFKA[@]}
    for ((i=0; i<${num};i++));
    do
        line=${CLUSTER_IP_INTER_KAFKA[$i]}"  kafka-$[$i+1]-${day}  kafka-$[$i+1]   "${CLUSTER_ID_KAFKA[$i]}
        echo ${line} >> /etc/hosts
    done
    echo "" >> /etc/hosts

    # zookeeper
    num=${#CLUSTER_IP_INTER_ZOOKEEPER[@]}
    for ((i=0; i<${num};i++));
    do
        line=${CLUSTER_IP_INTER_ZOOKEEPER[$i]}"  zk-$[$i+1]"
        echo ${line} >> /etc/hosts
    done
    echo "" >> /etc/hosts

    echo "# end add cluster host of storm" >> /etc/hosts
}

第三步:修改主机名称。注意,UbuntuCentOS 是有区别的。

function set_hostname() {
    hostname="${1}-${day}"
    sed -i "s/elastic/${hostname}/g" /etc/sysconfig/network
    #echo "${hostname}" > /etc/hostname
    hostname ${hostname}
}

第四步:修改服务的配置文件。由于FlumeKafkaStorm、Zookeeper 的集群配置文件都很类同,一般只需修改 1~2 各参数,所以,可以先把配置文件模板化,然后,根据 IP 判断修改配置。

#配置zookeeper配置文件
function set_zookeeper() {
    info "start setting up zookeeper."

    num=${#CLUSTER_IP_INTER_ZOOKEEPER[@]}
    for ((i=0; i<${num};i++));
    do
        if [ ${CLUSTER_IP_INTER_ZOOKEEPER[$i]} = ${LOCAL_IP_INTER} ]; then
            info "input $[$i+1] to /tmp/zookeeper/data/myid"
            echo "$[$i+1]" > /tmp/zookeeper/data/myid
            chown hadoop:hadoop /tmp/zookeeper/data/myid
            SERVICE_ZOOKEEPER=1
        fi
    done
}

#配置storm配置文件
function set_storm() {
    info "start setting up storm."
    echo "0" > /tmp/storm/status
    num=${#CLUSTER_IP_INTER_STORM[@]}
    for ((i=0; i<${num};i++));
    do
        if [ ${CLUSTER_IP_INTER_STORM[$i]} = ${LOCAL_IP_INTER} ]; then
            SERVICE_STORM=1
            if [ ${i} -le 1 ]; then
                info "${LOCAL_IP_INTER} is nimbus."
                SERVICE_STORM_NIMBUS=$[$i+1]
                SERVICE_HOST_NAME="storm"
            fi
            set_hostname "storm-cluster-$[$i+1]"
        fi
    done
}


#配置kafka配置文件
function set_kafka() {
    info "start setting up kafka."

    num=${#CLUSTER_IP_INTER_KAFKA[@]}
    for ((i=0; i<${num};i++));
    do
        if [ ${CLUSTER_IP_INTER_KAFKA[$i]} = ${LOCAL_IP_INTER} ]; then
            SERVICE_KAFKA=1
            SERVICE_HOST_NAME="kafka"
            cat /opt/apps/kafka/config/server.properties.template > /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_PARTITION/${num}/g" /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_IP_INTER/${LOCAL_IP_INTER}/g" /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_INDEX/${i}/g" /opt/apps/kafka/config/server.properties
            sed -i "s/KAFKA_HOST_NAME/kafka-$[$i+1]/g" /opt/apps/kafka/config/server.properties

            SERVICE_KAFKA_MASTER=$[$i+1]
            set_hostname "kafka-$[$i+1]"
        fi
    done
}

第五步:配置监控服务。这里加了 Telegraf 监控。

# 配置Telegraf监控,args:storm/kafka  ip
function set_telegraf() {
    info "start config telegraf"
    TELEGRAF_HOSTNAME="${1}_${2}"
    if [ ${SERVICE_STORM_NIMBUS} -eq 1 ]; then
        cat /etc/telegraf/telegraf_storm.conf > /etc/telegraf/telegraf.conf
    elif [ ${SERVICE_KAFKA} -eq 1 ]; then
        cat /etc/telegraf/telegraf_kafka.conf > /etc/telegraf/telegraf.conf
    fi

    sed -i "s/TELEGRAF_HOSTNAME/${TELEGRAF_HOSTNAME}/g" /etc/telegraf/telegraf.conf
}

第六步:启动各服务。

# 启动zookeeper
function start_zookeeper() {
    if [ ${SERVICE_ZOOKEEPER} -eq 1 ]; then
        info "start zookeeper service."
        su hadoop <<EOF
            cd /opt/apps/zookeeper
            sh ./bin/zkServer.sh start
            exit 0
EOF
    fi
}

# 启动storm
function start_storm() {
    if [ ${SERVICE_STORM} -eq 1 ]; then
        info "start storm service."
        if [ ${SERVICE_STORM_NIMBUS} -eq 1 -o ${SERVICE_STORM_NIMBUS} -eq 2 ]; then
            su hadoop <<EOF
                cd /opt/apps/storm/
                    sh ./bin/storm nimbus &
                    sh ./bin/storm ui &
                exit 0
EOF
        else
            su hadoop <<EOF
                cd /opt/apps/storm/
                    sh ./bin/storm supervisor &
                exit 0
EOF
        fi
    fi
}

#启动Kafka
function start_kafka() {
    if [ ${SERVICE_KAFKA} -eq 1 ]; then
        info "start kafka service."
        echo "${SERVICE_KAFKA_MASTER}" > /tmp/storm/kafka
        num=${#CLUSTER_IP_INTER_KAFKA[@]}
        echo "$[$num+$num]" > /tmp/storm/workers
        chown -R hadoop:hadoop /tmp/storm/kafka
        chown -R hadoop:hadoop /tmp/storm/workers

        su hadoop <<EOF
            cd /opt/apps/kafka
            sh ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
        sleep 10
            #MASTER=$(cat /tmp/storm/kafka)
            #WORKERS=`cat /tmp/storm/workers`
        echo "master is $(cat /tmp/storm/kafka), workers is $(cat /tmp/storm/workers)"
            if [ $(cat /tmp/storm/kafka) -eq 1 ]; then
                if [ $(cat /tmp/storm/workers) -eq 3 ]; then
                    sh ./bin/kafka-topics.sh --create --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka --partitions 3 --replication-factor 1 --topic cdn-link >/dev/null  2>&1  &
                elif [ $(cat /tmp/storm/workers) -eq 5 ]; then
                    sh ./bin/kafka-topics.sh --create --zookeeper zk-1:2181,zk-2:2181,zk-3:2181/kafka --partitions 5 --replication-factor 1 --topic cdn-link >/dev/null  2>&1  &
                else
                    echo "workers is $(cat /tmp/storm/workers), not match partitions, create partitions failed."
                fi
            fi
            exit 0
EOF
    fi
}

第七步:启动应用。

# 启动topology
function start_topology() {
    if [ ${SERVICE_STORM_NIMBUS} -eq 1 ]; then
    sleep 40
        echo "1" > /tmp/storm/status
        num=${#CLUSTER_IP_INTER_KAFKA[@]}
        echo "${num}" > /tmp/storm/workers
        su hadoop <<EOF
            cd /opt/apps/storm/
            cp /tmp/topology.jar /home/storm/
            workers=$(cat /tmp/storm/workers)
            ./bin/storm jar /home/storm/topology.jar com.xx.xx.Topology  Topology Name${workers}
            exit 0
EOF
    fi
}

最后,将部署脚本加入到服务器自启动服务中,就完成了自动化部署脚本。

将该 ECS 做一个镜像,就完成了。

步骤3:申请的服务器启动后,自动调用部署脚本,完成服务的部署和启动。当然,还需要有对服务进行各种检查,防止服务部署失败或运行异常等情况。

步骤4:通过 API 和选择的方案,对负载均衡进行调整。

步骤5:自动释放服务器。


参考: