Flink1.4部署实践

 2018-02-07 16:02:52     Flink  流处理框架  开源框架   1168


导读: Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

概念


关于Flink的详细说明,可以看简单之美Apache Flink:特性、概念、组件栈、架构及原理分析

安装


这里分为 Standalone ClusterJobManager High Availability 两种安装方式。

准备工作


要求:

  • Linux、Mac OS X 和 Cygwin (for Windows)
  • Java 1.8.x 或更高
  • 能ssh登录到服务器,集群之间必须ssh免密钥互通

这里以 Linux 安装为例。

首先,需要在主节点和所有工作节点上设置 JAVA_HOME 环境变量,并指向 Java 安装的目录。

如果没有配置,也可以解压完 Flink 后,在 conf/flink-conf.yaml 配置文件中设置变量。

env.java.home = /usr/local/java

然后,配置集群的 HOST

192.168.1.1  emr-header-1
192.168.1.2  emr-worker-1
192.168.1.4  emr-worker-2
192.168.1.5  emr-worker-3

最后,在Flink 下载页面选择与 Hadoop 版本对应的 Flink版本,如果没有 Hadoop,可以随意选择一个版本。

wget http://mirrors.shu.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz

tar xzf flink-1.4.0-bin-hadoop27-scala_2.11.tgz
ln -s /opt/apps/flink-1.4.0/ /opt/apps/flink
cd flink

Standalone Cluster(独立的集群部署)


Standalone Cluster 的部署方式比较简单,只需要对 conf/flink-conf.yamlconf/slaves 关键参数进行配置即可。

conf/flink-conf.yaml 的主要配置如下:

# 设置flink的主节点
jobmanager.rpc.address: emr-header-1

# 设置每个JobManager的可用内存量,单位:M
jobmanager.heap.mb: 1024

# 设置每个TaskManager的可用内存量,单位:M
taskmanager.heap.mb: 1024

配置详细说明:Flink Configuration

conf/slaves 设置所有工作节点。

emr-worker-1
emr-worker-2
emr-worker-3

Flink 安装文件 scp 到集群的其它机器上。

scp -r ./flink hadoop@emr-worker-1:/opt/apps/ 
scp -r ./flink hadoop@emr-worker-2:/opt/apps/ 
scp -r ./flink hadoop@emr-worker-3:/opt/apps/

emr-header-1 上集群启动。

bin/start-cluster.sh

以上就完成了 Flink 的独立部署了。

也可以,单独启动 JobManagerTaskManager

bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all

bin/taskmanager.sh start|start-foreground|stop|stop-all

JobManager High Availability(JobManager的高可用部署)


JobManager 协调每个 Flink 部署。它负责调度和资源管理。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会出现单点故障(SPOF):如果 JobManager 崩溃,则不能提交新程序,并且运行程序失败。

使用 JobManager 高可用性,可以从 JobManager 故障中恢复,从而消除SPOF。可以为独立群集和YARN 群集配置高可用性。

  • 前提:完成前面Standalone Cluster的安装部署。

首先,需要设置主备JobManager 。在 conf/masters 中配置。

emr-header-1:8081
emr-worker-1:8081

高可用模式下,需要用 Zookeeper 记录 JobManager 的状态。在 conf/flink-conf.yaml 中配置高可用模式。

# 启用高可用模式(必须)
high-availability: zookeeper

# 设置zookeeper的服务器和端口(必须)
high-availability.zookeeper.quorum: zk01:2181,zk02:2181,zk03:2181

# 设置JobManager元数据保存在文件系统的路径 (必须)
# Zookeeper中只保存JobManager的状态
# storageDir存储用于恢复JobManager失败所需的所有元数据
high-availability.zookeeper.storageDir: hdfs:///flink/recovery

# 设置flink在zookeeper的根节点(推荐)
high-availability.zookeeper.path.root: /flink

# 设置zookeeper的集群id编号(推荐)
# 如果Flink运行在Yarn集群上,不需要配置该参数,会被覆盖
# 如果是独立部署的,且运行多个Flink HA集群,必须配置该参数作为群集标识
high-availability.cluster-id: /cluster_one

这里的zk01、zk02、zk03为已经部署完成的 Zookeeper 集群。

如果没有 Zookeeper 集群服务,可以使用 Flink自带的 Zookeeper,需要修改配置文件 conf/zoo.cfg。参数配置与独立安装 Zookeeper 一样。需要注意的是,这种方式下,Zookeeper的服务启动脚本是在 Flink 的bin目录下。

# 使用Flink的Zookeeper
bin/start-zookeeper-quorum.sh

启动 Flink 集群。

bin/start-cluster.sh

查看 Flink Web Dashboard UI来确认,在浏览器打开链接:http://emr-header-1:8081/,如下图:

flink ui

通过 UI 可以查看集群的资源。

通过Hadoop命令,可以查看到在Hadoop上创建的元数据存储路径。

$ hadoop fs -ls /flink/ha
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/apps/ecm/service/hadoop/2.7.2-1.2.11/package/hadoop-2.7.2-1.2.11/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/apps/ecm/service/tez/0.8.4/package/tez-0.8.4/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 1 items
drwxr-xr-x   - hadoop hadoop          0 2018-02-06 16:30 /flink/ha/cluster_one

简单实践


简单实践,从查看 Flink 源码中提供的例子开始。

一种,直接从GitHub上下载源码。然后导入 IDEA,用 maven 编译。

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink            \
      -DarchetypeArtifactId=flink-quickstart-java    \
      -DarchetypeVersion=1.0.0

另一种,使用以下命令

curl https://flink.apache.org/q/quickstart.sh | bash

下载完官方代码后,先清空以前build的东西, 再重新build Flink包。

mvn clean install -DskipTests

上面两个命令,都需要手动输入自己项目的artifactId、groupId等相关信息。

flink-demo

当然,也可以创建一个新的 Flink Maven项目。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tomato.flink</groupId>
    <artifactId>flink-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>flink-demo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.4.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
</project>

Flink 的官方统计单词的例子拷贝过来,改成从Hdfs读文件,将结果再写入Hdfs。

public class HdfsWordCount {

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {
            System.err.println("USAGE:\ncom.tomato.flink.HdfsWordCount <inputFile> <outputFile>");
            return;
        }

        String inputFile = args[0];
        String outputFile = args[1];

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data
        DataStream<String> text = env.readTextFile(inputFile);

        DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new LineSplitter())
                // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0)
                .sum(1);

        //counts.print();
        counts.map(new LineString()).writeAsText(outputFile);
        // execute program
        env.execute("WordCount from ReadTextFile Example");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    public static final class LineString implements MapFunction<Tuple2<String, Integer>, String> {

        @Override
        public String map(Tuple2<String, Integer> value) throws Exception {
            return value.getField(0) + "\t" + value.getField(1);
        }
    }
}

编译打包,将程序部署到集群上。

./bin/flink run --class com.tomato.flink.HdfsWordCount /home/flink/flink-demo-1.0-SNAPSHOT.jar hdfs:///tmp/test/flink-demo-wordcount.txt hdfs:///tmp/test/flink-demo-wordcount-result.txt

执行过程如下:

flink-demo-wordcount

查看 flink-demo-wordcount-result.txt,可以看到计算的结果。


参考