Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。
1、概念
关于Flink的详细说明,可以看简单之美的Apache Flink:特性、概念、组件栈、架构及原理分析。
2、安装
这里分为 Standalone Cluster
和 JobManager High Availability
两种安装方式。
2.1、准备工作
要求:
- Linux、Mac OS X 和 Cygwin (for Windows)
- Java 1.8.x 或更高
- 能ssh登录到服务器,集群之间必须ssh免密钥互通
这里以 Linux
安装为例。
首先,需要在主节点和所有工作节点上设置 JAVA_HOME
环境变量,并指向 Java
安装的目录。
如果没有配置,也可以解压完 Flink
后,在 conf/flink-conf.yaml
配置文件中设置变量。
1 | env.java.home = /usr/local/java |
然后,配置集群的 HOST
。
1 | 192.168.1.1 emr-header-1 |
最后,在Flink 下载页面选择与 Hadoop
版本对应的 Flink
版本,如果没有 Hadoop
,可以随意选择一个版本。
1 | wget http://mirrors.shu.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz |
2.2、 Standalone Cluster(独立的集群部署)
Standalone Cluster
的部署方式比较简单,只需要对 conf/flink-conf.yaml
和 conf/slaves
关键参数进行配置即可。
conf/flink-conf.yaml
的主要配置如下:
1 | # 设置flink的主节点 |
配置详细说明:Flink Configuration。
在 conf/slaves
设置所有工作节点。
1 | emr-worker-1 |
将 Flink
安装文件 scp
到集群的其它机器上。
1 | scp -r ./flink hadoop@emr-worker-1:/opt/apps/ |
在 emr-header-1
上集群启动。
1 | bin/start-cluster.sh |
以上就完成了 Flink
的独立部署了。
也可以,单独启动 JobManager
和 TaskManager
。
1 | bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all |
2.3、JobManager High Availability(JobManager的高可用部署)
JobManager
协调每个 Flink
部署。它负责调度和资源管理。
默认情况下,每个 Flink
集群只有一个 JobManager
实例。这会出现单点故障(SPOF):如果 JobManager
崩溃,则不能提交新程序,并且运行程序失败。
使用 JobManager
高可用性,可以从 JobManager
故障中恢复,从而消除SPOF。可以为独立群集和YARN
群集配置高可用性。
- 前提:完成前面Standalone Cluster的安装部署。
首先,需要设置主备JobManager
。在 conf/masters
中配置。
1 | emr-header-1:8081emr-worker-1:8081 |
高可用模式下,需要用 Zookeeper
记录 JobManager
的状态。在 conf/flink-conf.yaml
中配置高可用模式。
1 | # 启用高可用模式(必须) |
这里的zk01、zk02、zk03为已经部署完成的 Zookeeper
集群。
如果没有 Zookeeper
集群服务,可以使用 Flink
自带的 Zookeeper
,需要修改配置文件 conf/zoo.cfg
。参数配置与独立安装 Zookeeper
一样。需要注意的是,这种方式下,Zookeeper
的服务启动脚本是在 Flink
的bin目录下。
1 | # 使用Flink的Zookeeperbin/start-zookeeper-quorum.sh |
启动 Flink
集群。
1 | bin/start-cluster.sh |
查看 Flink Web Dashboard UI来确认,在浏览器打开链接:http://emr-header-1:8081/,如下图:
通过 UI 可以查看集群的资源。
通过Hadoop命令,可以查看到在Hadoop上创建的元数据存储路径。
3、简单实践
简单实践,从查看 Flink
源码中提供的例子开始。
一种,直接从GitHub上下载源码。然后导入 IDEA
,用 maven
编译。
1 | mvn archetype:generate \ |
另一种,使用以下命令
1 | curl https://flink.apache.org/q/quickstart.sh | bash |
下载完官方代码后,先清空以前build的东西, 再重新build Flink包。
1 | mvn clean install -DskipTests |
上面两个命令,都需要手动输入自己项目的artifactId、groupId等相关信息。
当然,也可以创建一个新的 Flink
Maven项目。
1 |
|
把 Flink
的官方统计单词的例子拷贝过来,改成从Hdfs读文件,将结果再写入Hdfs。
编译打包,将程序部署到集群上。
1 | ./bin/flink run --class com.tomato.flink.HdfsWordCount /home/flink/flink-demo-1.0-SNAPSHOT.jar hdfs:///tmp/test/flink-demo-wordcount.txt hdfs:///tmp/test/flink-demo-wordcount-result.txt |
执行过程如下:
查看 flink-demo-wordcount-result.txt
,可以看到计算的结果。
参考: