Kafka是一种高吞吐量的分布式发布订阅的消息队列系统,Kafka对消息进行保存时是通过topic进行分组的。
macOS
1
2
3
4
5
|
# 安装kafka前默认安装了zookeeper
brew install kafka
# brew install zookeeper
vim /usr/local/etc/kafka/server.properties
|
1
2
|
# 找到 listeners=PLAINTEXT://:9092 把注释取消掉并修改。
listeners=PLAINTEXT://localhost:9092
|
如果只是临时启动,可以:
1
2
3
4
5
6
7
8
9
10
11
12
|
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
# nohup 关闭终端还是可以用 命令后面加上& 实现后台运行
nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties &
# 那么在缺省情况下该作业的所有输出都被重定向到当前目录一个名为nohup.out的文件中,除非另外指定了输出文件:
nohup command > /usr/local/var/log/kafka.out 2>&1 &
zookeeper-server-stop
kafka-server-stop
# 使用配置 /usr/local/etc/zookeeper/zoo.cfg
zkServer start
kafka-server-start /usr/local/etc/kafka/server.properties
|
如果想以服务的方式启动,那么可以:
1
2
|
brew services start kafka
brew services start zookeeper
|
创建 Topic
1
2
3
4
5
6
7
8
9
|
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
# 数据默认存储在 log.dirs=/usr/local/var/lib/kafka-logs
# 这里--replication-factor 不大于集群的 brokers 会有 test-0 test-1文件夹
kafka-topics --zookeeper localhost:2181 --list
kafka-topics --zookeeper localhost:2181 --describe
kafka-topics --zookeeper localhost:2181 --describe -topic test
kafka-topics --zookeeper localhost:2181 --alter --topic test --partitions 6
kafka-topics --zookeeper localhost:2181 --delete -topic test
|
产生消息
1
2
3
4
5
6
|
# 新建一个 SHELL
kafka-console-producer --broker-list localhost:9092 --topic test
>HELLO Kafka
# 关闭
Control+C
|
消费
简单方式:
1
2
3
|
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
# 关闭
Control+C
|
如果使用消费组:
1
|
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --group test-consumer1 --from-beginning
|
可视化 UI
1
|
brew install kafka-tool --cask
|
安装与配置
Kafka 官方地址
1
2
3
4
|
cd ~/Apps/kafka
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.1/kafka_2.12-2.1.1.tgz
tar -zxvf kafka_2.12-2.1.1.tgz -C ~/Apps/kafka
cd /opt/kafka_2.12-2.1.1
|
1
2
3
4
|
# 在Kafka启动之前需要先启动ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 之后启动Kafka Server
bin/kafka-server-start.sh config/server.properties
|
1
2
3
4
|
# 停止kafka 先
bin/kafka-server-stop.sh
# 停止zookeeper
bin/zookeeper-server-stop.sh
|
1
2
3
4
|
# create a topic named "test" with a single partition and only one replica
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# list topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
|
测试
单机测试
1
2
3
4
|
# 运行producer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>This is a message
>This is another message
|
1
2
3
4
|
# 运行consumer 新开一个shell
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
|
在producer端输入字符串并回车,查看consumer端是否显示。
分布式连通性测试
- 192.168.2.10 Zookeeper Server, Kafka Server, Producer
- 192.168.2.20 Consumer
1
2
3
|
bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test1
bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test1 --from-beginning
|
问题出在broker的配置文件server.properties上: host名称没有指定,就是127.0.0.1,consumer去broker拿数据就有问题。设置为192.168.1.10,重启服务就好了。
集群
1
2
3
4
5
6
7
|
cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server1.properties
cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server2.properties
cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server3.properties
mkdir -p /usr/local/var/lib/kafka-logs1
mkdir -p /usr/local/var/lib/kafka-logs2
mkdir -p /usr/local/var/lib/kafka-logs3
|
broker.id=0
listeners=PLAINTEXT://localhost:9093
log.dirs=/usr/local/var/lib/kafka-logs1
zookeeper.connect=127.0.0.1:2181
broker.id=1
listeners=PLAINTEXT://localhost:9094
log.dirs=/usr/local/var/lib/kafka-logs2
zookeeper.connect=127.0.0.1:2181
broker.id=2
listeners=PLAINTEXT://localhost:9095
log.dirs=/usr/local/var/lib/kafka-logs3
zookeeper.connect=127.0.0.1:2181
1
2
3
4
5
6
7
8
|
kafka-server-start /usr/local/etc/kafka/server1.properties &
kafka-server-start /usr/local/etc/kafka/server2.properties &
kafka-server-start /usr/local/etc/kafka/server3.properties &
kafka-server-stop /usr/local/etc/kafka/server1.properties
kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test-cluster-1
|
server.properties 修改
同样的先来了解里面的参数含义吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# 在kafka这个集群中的唯一标识,且只能是正整数
broker.id=1
# 该服务监听的端口
port=9091
# broker 绑定的主机名称(IP) 如果不设置将绑定所有的接口。
host.name=192.168.1.130
# broker服务将通知消费者和生产者 换言之,就是消费者和生产者就是通过这个主机(IP)来进行通信的。如果没有设置就默认采用host.name。
advertised.host.name=192.168.1.130
# broker处理消息的最大线程数,一般情况是CPU的核数
num.network.threads=21
# broker处理IO的线程数 一般是num.network.threads的两倍
num.io.threads=81
# socket发送的缓冲区。socket调优参数SO_SNDBUFF
socket.send.buffer.bytes=10485761
# socket接收的缓冲区 socket的调优参数SO_RCVBUF
socket.receive.buffer.bytes=10485761
# socket请求的最大数量,防止serverOOM。
socket.request.max.bytes=1048576001
# kafka数据的存放地址,多个地址的话用逗号隔开。多个目录分布在不同的磁盘上可以提高读写性能
log.dirs=\logs1
# 每个tipic的默认分区个数,在创建topic时可以重新制定
num.partitions=21
# 数据文件的保留时间 log.retention.minutes也是一个道理。
log.retention.hours=1681
# topic中的最大文件的大小 -1表示没有文件大小限制 log.segment.bytes 和log.retention.minutes 任意一个
达到要求 都会删除该文件 在创建topic时可以重新制定。若没有.则选取该默认值
log.segment.bytes=5368709121
# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=600001
# 是否开启日志清理
log.cleaner.enable=false1
# Zookeeper集群
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num31
# Zookeeper集群链接时间超时
zookeeper.connection.timeout.ms=10000001
|
同样的我们每次赋值kafka服务我们只需该配置文件里的 broker.id
port
这两个属性就行了。
命令工具
kafka-topics
Docker 方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=172.27.3.56 \
--env KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka
# Local Test
sudo docker exec -it kafka /opt/kafka/bin/kafka-console-producer.sh --topic=test --broker-list localhost:9092
# 打开另一个shell
sudo docker exec -it kafka1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
# 其他主机测试
# 下载 kafka 安装包:http://kafka.apache.org/downloads 下载完后解压缩,进入目录中。
# 客户端需要java 环境 yum -y install java-1.8.0-openjdk
cd /opt
wget http://mirrors.shu.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar -xzf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
# 生产端发消息,命令行演示如下:
bin/kafka-console-producer.sh --broker-list 172.27.3.56:9092 --topic test1
# 消息端收消息,命令行演示如下:
cd /opt/kafka_2.11-2.0.0
bin/kafka-console-consumer.sh --bootstrap-server 172.27.3.56:9092 --topic test1 --from-beginning
|
多 broker 集群
broker.id
是集群中每个节点的唯一且永久的名称,我们修改端口和日志目录是因为我们现在在同一台机器上运行,我们要防止broker在同一端口上注册和覆盖对方的数据。
1
2
|
cp config/server.properties config/server-1.properties
vim config/server-1.properties
|
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
1
2
|
cp config/server.properties config/server-2.properties
vim config/server-2.properties
|
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
1
2
|
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
|
创建一个新topic,把备份设置为:3
1
|
`> bin``/kafka-topics``.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic`
|
每个集群在做什么呢?运行命令 describe topics
1
|
`> bin``/kafka-topics``.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic``Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:`` ``Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0`
|
Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
- “leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
- “replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
- “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息,因为我们只有一个分区,所以只有一行。
- “leader”:该节点负责该分区的所有的读和写,每个节点的
leader
都是随机选择的。
- “replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
- “isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
Kafka 集群
hostname |
ip |
service |
node01 |
192.168.2.10 |
zookeeper、kafka broker |
node02 |
192.168.2.20 |
zookeeper、kafka broker |
node03 |
192.168.2.30 |
zookeeper、kafka broker |
配置zookeeper集群
1 修改zookeeper的配置文件,在$ZOOKEEPER_HOME/conf/下的zoo.cfg.(每台机器都需要添加)
在文件末尾添加:
server.0=xx.101.139.1:2888:3888
server.1=xx.101.139.2:2888:3888
server.2=xx.101.139.3:2888:3888
这里简单说明一下:server.A=B:C:D
A是一个数字,表示这个是第几号服务器,B是这个服务器的ip地址
C第一个端口用来集群成员的信息交换,表示的是这个服务器与集群中的Leader服务器交换信息的端口
D是在leader挂掉时专门用来进行选举leader所用
2 创建server id标识
在zoo.cfg下同时还需要配置(单机版时就已经设定,在配置集群时就可不必在设置)
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper/data
dataLogDir/tmp/zookeeper/log
clientPort=2181
在上面可以看到有个dataDir的配置,在配置集群时需要在dataDir配置的/tmp/zookeeper/data下创建一个名称为”myid”的文件,文件里填上serverid号。
1
2
3
4
|
echo 1 > myid
vim /tmp/zookeeper/data/myid
之后在其中输入0 (根据上一步的配置,xx.101.139.1配置为0,xx.101.139.2配置为1,依次类推对应起来)
输入:wq保存退出123
|
3 启动各个节点的zookeeper: bin/zkServer.sh start
4 通过bin/zkServer.sh status查看各个节点的状态。
配置kafka集群
1 需要修改$KAFKA_HOME/config下的server.properties文件
修改broker.id的值,比如xx.101.139.1设置为0,xx.101.139.2设置为1,依次类推。(每台kafka broker server必须唯一)
最好将host.name设置为本机的ip地址而不是默认的localhost.
修改zookeeper.connect为zookeeper集群的ip,本例中可以这样设置:
zookeeper.connect=xx.101.139.1:2181,xx.101.139.2:2181,xx.101.139.3:2181
2 在每台机器上启动kafka
bin/kafka-server-start.sh config/server.properties
附录
https://kafka.apache.org/documentation/#producerapi