Flink1.4安装

Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处理引擎上构建了批处理引擎,原生支持了迭代计算、内存管理和程序优化。

1、概念

关于Flink的详细说明,可以看简单之美Apache Flink:特性、概念、组件栈、架构及原理分析

2、安装

这里分为 Standalone Cluster JobManager High Availability 两种安装方式。

2.1、准备工作

要求:

  • Linux、Mac OS X 和 Cygwin (for Windows)
  • Java 1.8.x 或更高
  • 能ssh登录到服务器,集群之间必须ssh免密钥互通

这里以 Linux 安装为例。

首先,需要在主节点和所有工作节点上设置 JAVA_HOME 环境变量,并指向 Java 安装的目录。

如果没有配置,也可以解压完 Flink 后,在 conf/flink-conf.yaml 配置文件中设置变量。

1
env.java.home = /usr/local/java

然后,配置集群的 HOST

1
2
3
4
192.168.1.1  emr-header-1
192.168.1.2 emr-worker-1
192.168.1.4 emr-worker-2
192.168.1.5 emr-worker-3

最后,在Flink 下载页面选择与 Hadoop 版本对应的 Flink版本,如果没有 Hadoop,可以随意选择一个版本。

1
2
3
4
5
wget http://mirrors.shu.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz

tar xzf flink-1.4.0-bin-hadoop27-scala_2.11.tgz
ln -s /opt/apps/flink-1.4.0/ /opt/apps/flink
cd flink

2.2、 Standalone Cluster(独立的集群部署)

Standalone Cluster 的部署方式比较简单,只需要对 conf/flink-conf.yamlconf/slaves 关键参数进行配置即可。

conf/flink-conf.yaml 的主要配置如下:

1
2
3
4
5
6
7
8
# 设置flink的主节点
jobmanager.rpc.address: emr-header-1

# 设置每个JobManager的可用内存量,单位:M
jobmanager.heap.mb: 1024

# 设置每个TaskManager的可用内存量,单位:M
taskmanager.heap.mb: 1024

配置详细说明:Flink Configuration

conf/slaves 设置所有工作节点。

1
2
3
emr-worker-1
emr-worker-2
emr-worker-3

Flink 安装文件 scp 到集群的其它机器上。

1
2
3
scp -r ./flink hadoop@emr-worker-1:/opt/apps/ 
scp -r ./flink hadoop@emr-worker-2:/opt/apps/
scp -r ./flink hadoop@emr-worker-3:/opt/apps/

emr-header-1 上集群启动。

1
bin/start-cluster.sh

以上就完成了 Flink 的独立部署了。

也可以,单独启动 JobManagerTaskManager

1
2
3
bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all

bin/taskmanager.sh start|start-foreground|stop|stop-all

2.3、JobManager High Availability(JobManager的高可用部署)

JobManager 协调每个 Flink 部署。它负责调度和资源管理。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。这会出现单点故障(SPOF):如果 JobManager 崩溃,则不能提交新程序,并且运行程序失败。

使用 JobManager 高可用性,可以从 JobManager 故障中恢复,从而消除SPOF。可以为独立群集和YARN 群集配置高可用性。

  • 前提:完成前面Standalone Cluster的安装部署。

首先,需要设置主备JobManager 。在 conf/masters 中配置。

1
emr-header-1:8081emr-worker-1:8081

高可用模式下,需要用 Zookeeper 记录 JobManager 的状态。在 conf/flink-conf.yaml 中配置高可用模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 启用高可用模式(必须)
high-availability: zookeeper
# 设置zookeeper的服务器和端口(必须)
high-availability.zookeeper.quorum: zk01:2181,zk02:2181,zk03:2181
# 设置JobManager元数据保存在文件系统的路径 (必须)
# Zookeeper中只保存JobManager的状态
# storageDir存储用于恢复JobManager失败所需的所有元数据high-availability.zookeeper.storageDir: hdfs:///flink/recovery
# 设置flink在zookeeper的根节点(推荐)
high-availability.zookeeper.path.root: /flink
# 设置zookeeper的集群id编号(推荐)
# 如果Flink运行在Yarn集群上,不需要配置该参数,会被覆盖
# 如果是独立部署的,且运行多个Flink HA集群,必须配置该参数作为群集标识
high-availability.cluster-id: /cluster_one

这里的zk01、zk02、zk03为已经部署完成的 Zookeeper 集群。

如果没有 Zookeeper 集群服务,可以使用 Flink自带的 Zookeeper,需要修改配置文件 conf/zoo.cfg。参数配置与独立安装 Zookeeper 一样。需要注意的是,这种方式下,Zookeeper的服务启动脚本是在 Flink 的bin目录下。

1
# 使用Flink的Zookeeperbin/start-zookeeper-quorum.sh

启动 Flink 集群。

1
bin/start-cluster.sh

查看 Flink Web Dashboard UI来确认,在浏览器打开链接:http://emr-header-1:8081/,如下图:

flink ui

通过 UI 可以查看集群的资源。

通过Hadoop命令,可以查看到在Hadoop上创建的元数据存储路径。

3、简单实践

简单实践,从查看 Flink 源码中提供的例子开始。

一种,直接从GitHub上下载源码。然后导入 IDEA,用 maven 编译。

1
2
3
4
mvn archetype:generate \      
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.0.0

另一种,使用以下命令

1
curl https://flink.apache.org/q/quickstart.sh | bash

下载完官方代码后,先清空以前build的东西, 再重新build Flink包。

1
mvn clean install -DskipTests

上面两个命令,都需要手动输入自己项目的artifactId、groupId等相关信息。

flink-demo

当然,也可以创建一个新的 Flink Maven项目。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<?xml version="1.0" encoding="utf-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tomato.flink</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink-demo</name>
<url>http://maven.apache.org</url>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.4.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>

Flink 的官方统计单词的例子拷贝过来,改成从Hdfs读文件,将结果再写入Hdfs。

编译打包,将程序部署到集群上。

1
./bin/flink run --class com.tomato.flink.HdfsWordCount /home/flink/flink-demo-1.0-SNAPSHOT.jar hdfs:///tmp/test/flink-demo-wordcount.txt hdfs:///tmp/test/flink-demo-wordcount-result.txt

执行过程如下:

flink-demo-wordcount

查看 flink-demo-wordcount-result.txt,可以看到计算的结果。

参考:

文章作者: OneRain
文章链接: https://kiswo.com/2018/03/04/stream-processing/flink/flink-install/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 OneRain's Blog