Flink1.4部署实践
Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。
概念
关于Flink的详细说明,可以看 简单之美 的 Apache Flink:特性、概念、组件栈、架构及原理分析。
安装
这里分为 Standalone Cluster
和 JobManager High Availability
两种安装方式。
准备工作
要求:
- Linux、Mac OS X 和 Cygwin (for Windows)
- Java 1.8.x 或更高
- 能ssh登录到服务器,集群之间必须ssh免密钥互通
这里以 Linux
安装为例。
首先,需要在主节点和所有工作节点上设置 JAVA_HOME
环境变量,并指向 Java
安装的目录。
如果没有配置,也可以解压完 Flink
后,在 conf/flink-conf.yaml
配置文件中设置变量。
env.java.home = /usr/local/java
然后,配置集群的 HOST
。
192.168.1.1 emr-header-1 192.168.1.2 emr-worker-1 192.168.1.4 emr-worker-2 192.168.1.5 emr-worker-3
最后,在Flink 下载页面选择与 Hadoop
版本对应的 Flink
版本,如果没有 Hadoop
,可以随意选择一个版本。
wget http://mirrors.shu.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz tar xzf flink-1.4.0-bin-hadoop27-scala_2.11.tgz ln -s /opt/apps/flink-1.4.0/ /opt/apps/flink cd flink
Standalone Cluster(独立的集群部署)
Standalone Cluster
的部署方式比较简单,只需要对 conf/flink-conf.yaml
和 conf/slaves
关键参数进行配置即可。
conf/flink-conf.yaml
的主要配置如下:
# 设置flink的主节点 jobmanager.rpc.address: emr-header-1 # 设置每个JobManager的可用内存量,单位:M jobmanager.heap.mb: 1024 # 设置每个TaskManager的可用内存量,单位:M taskmanager.heap.mb: 1024
配置详细说明:Flink Configuration。
在 conf/slaves
设置所有工作节点。
emr-worker-1 emr-worker-2 emr-worker-3
将 Flink
安装文件 scp
到集群的其它机器上。
scp -r ./flink hadoop@emr-worker-1:/opt/apps/ scp -r ./flink hadoop@emr-worker-2:/opt/apps/ scp -r ./flink hadoop@emr-worker-3:/opt/apps/
在 emr-header-1
上集群启动。
bin/start-cluster.sh
以上就完成了 Flink
的独立部署了。
也可以,单独启动 JobManager
和 TaskManager
。
bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all bin/taskmanager.sh start|start-foreground|stop|stop-all
JobManager High Availability(JobManager的高可用部署)
JobManager
协调每个 Flink
部署。它负责调度和资源管理。
默认情况下,每个 Flink
集群只有一个 JobManager
实例。这会出现单点故障(SPOF):如果 JobManager
崩溃,则不能提交新程序,并且运行程序失败。
使用 JobManager
高可用性,可以从 JobManager
故障中恢复,从而消除SPOF。可以为独立群集和YARN
群集配置高可用性。
- 前提:完成前面Standalone Cluster的安装部署。
首先,需要设置主备JobManager
。在 conf/masters
中配置。
emr-header-1:8081 emr-worker-1:8081
高可用模式下,需要用 Zookeeper
记录 JobManager
的状态。在 conf/flink-conf.yaml
中配置高可用模式。
# 启用高可用模式(必须) high-availability: zookeeper # 设置zookeeper的服务器和端口(必须) high-availability.zookeeper.quorum: zk01:2181,zk02:2181,zk03:2181 # 设置JobManager元数据保存在文件系统的路径 (必须) # Zookeeper中只保存JobManager的状态 # storageDir存储用于恢复JobManager失败所需的所有元数据 high-availability.zookeeper.storageDir: hdfs:///flink/recovery # 设置flink在zookeeper的根节点(推荐) high-availability.zookeeper.path.root: /flink # 设置zookeeper的集群id编号(推荐) # 如果Flink运行在Yarn集群上,不需要配置该参数,会被覆盖 # 如果是独立部署的,且运行多个Flink HA集群,必须配置该参数作为群集标识 high-availability.cluster-id: /cluster_one
这里的zk01、zk02、zk03为已经部署完成的 Zookeeper
集群。
如果没有 Zookeeper
集群服务,可以使用 Flink
自带的 Zookeeper
,需要修改配置文件 conf/zoo.cfg
。参数配置与独立安装 Zookeeper
一样。需要注意的是,这种方式下,Zookeeper
的服务启动脚本是在 Flink
的bin目录下。
# 使用Flink的Zookeeper bin/start-zookeeper-quorum.sh
启动 Flink
集群。
bin/start-cluster.sh
查看 Flink Web Dashboard UI来确认,在浏览器打开链接:http://emr-header-1:8081/,如下图:

通过 UI 可以查看集群的资源。
通过Hadoop命令,可以查看到在Hadoop上创建的元数据存储路径。
$ hadoop fs -ls /flink/ha SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/apps/ecm/service/hadoop/2.7.2-1.2.11/package/hadoop-2.7.2-1.2.11/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/apps/ecm/service/tez/0.8.4/package/tez-0.8.4/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Found 1 items drwxr-xr-x - hadoop hadoop 0 2018-02-06 16:30 /flink/ha/cluster_one
简单实践
简单实践,从查看 Flink
源码中提供的例子开始。
一种,直接从GitHub上下载源码。然后导入 IDEA
,用 maven
编译。
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.0.0
另一种,使用以下命令
curl https://flink.apache.org/q/quickstart.sh | bash
下载完官方代码后,先清空以前build的东西, 再重新build Flink包。
mvn clean install -DskipTests
上面两个命令,都需要手动输入自己项目的artifactId、groupId等相关信息。

当然,也可以创建一个新的 Flink
Maven项目。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tomato.flink</groupId> <artifactId>flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>flink-demo</name> <url>http://maven.apache.org</url> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.4.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>
把 Flink
的官方统计单词的例子拷贝过来,改成从Hdfs读文件,将结果再写入Hdfs。
public class HdfsWordCount { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("USAGE:\ncom.tomato.flink.HdfsWordCount <inputFile> <outputFile>"); return; } String inputFile = args[0]; String outputFile = args[1]; // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream<String> text = env.readTextFile(inputFile); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new LineSplitter()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1); //counts.print(); counts.map(new LineString()).writeAsText(outputFile); // execute program env.execute("WordCount from ReadTextFile Example"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } public static final class LineString implements MapFunction<Tuple2<String, Integer>, String> { @Override public String map(Tuple2<String, Integer> value) throws Exception { return value.getField(0) + "\t" + value.getField(1); } } }
编译打包,将程序部署到集群上。
./bin/flink run --class com.tomato.flink.HdfsWordCount /home/flink/flink-demo-1.0-SNAPSHOT.jar hdfs:///tmp/test/flink-demo-wordcount.txt hdfs:///tmp/test/flink-demo-wordcount-result.txt
执行过程如下:

查看 flink-demo-wordcount-result.txt
,可以看到计算的结果。
参考
Very nice post. I simply stumbled upon your weblog and wished to mention that I’ve truly enjoyed surfing around your blog posts.