阿里云ECS上搭建Kafka集群

 2017-04-07 18:24:45     Kafka  阿里云   2034


导读: Kafka是一个分布式的、可分区的、基于备份的、基于commit-log存储的服务。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。

Kafka消息是根据Topic进行归类,发送消息者成为Producer,消息接收者成为Consumer;此外Kafka集群有多个kafka实例组成,每个实例(server)称为broker。

无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性以及保存一些meta信息。

术语说明


Topic
Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)。

Producer
发布消息的对象称之为主题生产者(Kafka topic producer)。

Consumer
订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)。

Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

集群搭建


安装前,建议选择高内存的服务器,因为Kafka并不是把数据立即落地到磁盘,而是根据一定策略将数据批量flush到磁盘上,所以,需要一定的内存空间来存储高并发的数据。一般建议选择32G及以上的内存,对CPU没有太高的要求。

如果服务器只部署Kafka的话,不用考虑多硬盘,否则,建议为Kafka的存储独立出一块或多块硬盘,减少硬盘读写之间的影响。在网上看到过,根据partition划分多个磁盘,以提供性能的用法,暂时不知道具体的实现。

带宽一定要够用,防止因为带宽的限制而限制了Kafka的吞吐量。

partition的数量一定要合理,并不是越大越好,越大带来的性能开销越大。一般要是broker的N倍,且大于或等于Consumer Group的并发消费量。

replicated的数量根据业务而定,如果对数据的缺失不是很敏感的话,replicated可以不设置或设置为1。如果对数据的缺失很敏感,可以设置为2或3。由于备份需要将数据调度到其他broker上,所以,越多的备份,意味着越多的数据拷贝和转移。

由于Kafka需要用到Zookeeper,而我们的EMR集群上,已经自带了Zookeeper,所以,不需要再安装。但是,需要开通EMR内网的2181、2888、3888端口。Kafka集群机器之间,需要开通内网9092端口。特别要注意这点,否则会无法通信。

我们这里是用了3台进行部署。

创建用户

groupadd  kafka
useradd kafka -g kafka
passwd kafka

安装

cd /opt/apps/

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.1.0/kafka_2.11-0.10.1.0.tgz

tar -zxvf kafka_2.11-0.10.1.0.tgz

ln -s /opt/apps/kafka_2.11-0.10.1.0 /opt/apps/kafka


chown -R kafka.kafka kafka_2.11-0.10.1.0
chown -R kafka.kafka kafka

配置hosts

#Kafka host
ip1  kafka01
ip2  kafka02
ip3  kafka03

#zookeeper host
ip4  zk01
ip5  zk02
ip6  zk03

修改Kafka配置文件server.properties

broker.id=0
#此处为kafka返回给客户端的通信地址,建议配置成域名,便于内外网访问(kafka客户端机器需要配置host,将kafka01映射到IP)
advertised.host.name=kafka01
advertised.port=9092

#内外网同时访问时的设置
listeners=PLAINTEXT://0.0.0.0:9092
#只限制内网访问时设置
#listeners=PLAINTEXT://内网IP:9092

delete.topic.enable=true

log.dirs=/home/kafka/kafka-logs

num.partitions=3

#设置所有Topic数据的默认保存时间
#log.cleanup.policy=delete
log.retention.hours=72
log.cleaner.enable=true
log.roll.hours=24
#log.segment.delete.delay.ms=60000
#log.retention.check.interval.ms=300000

zookeeper.connect=zk01:2181,zk02:2181,zk03:2181/kafka

将Kafka分发到其他安装机器上

scp -r /opt/apps/kafka_2.11-0.10.1.0 root@ip2:/opt/apps/
scp -r /opt/apps/kafka_2.11-0.10.1.0 root@ip3:/opt/apps/

修改其他安装机器上的配置文件server.properties,将 broker.id=0 修改为
broker.id=1broker.id=2

启动Kafka

./bin/kafka-server-start.sh config/server.properties &

创建Topic

./bin/kafka-topics.sh --create --zookeeper zk01:2181,zk02:2181,zk03:2181/kafka --partitions 3 --replication-factor 1 --topic test

查看Topic

./bin/kafka-topics.sh --list --zookeeper zk01:2181,zk02:2181,zk03:2181/kafka


./bin/kafka-topics.sh --describe --topic test --zookeeper zk01:2181,zk02:2181,zk03:2181/kafka

生产数据

./bin/kafka-console-producer.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic test

在控制台输入内容:

my test message 1
my test message 2

消费数据

./bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --topic test --from-beginning

指定partition消费

./bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --topic test --from-beginning --partition 0

查看topic的offset最小值

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic test --time -2

查看topic的offset最大值

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic test --time -1

删除Topic

删除Topic数据时,需要同时删除Zookeeper中对应的记录。

首先,我们删除Kakfka的数据,在每台机器上执行相同的操作。

rm /home/kafka/test-logs

然后,进入Zookeeper命令窗口,删除对应记录。

./bin/zkCli.sh -server zk01:2181,zk02:2181,zk03:2181

#查看目录
ls /kafka/brokers

#查看信息
get /kafka/brokers/partition_0

#删除
rmr /kafka/brokers/topics/test