基础环境

服务器:
test11 192.168.37.11
test12 192.168.37.12
test13 192.168.37.13
系统版本:
centos 7.6

安装

1、下载安装包:wget http://mirror-hk.koddos.net/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
2、解压并进入目录:
tar zxf kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/
3、只需要修改server.properties文件的部分配置,如下:

test11:
vim config/server.properties
broker.id=1
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.37.11:2181,192.168.37.12:2181,192.168.37.13:2181

test12:
vim config/server.properties
broker.id=2
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.37.11:2181,192.168.37.12:2181,192.168.37.13:2181

test13:
vim config/server.properties
broker.id=3
log.dirs=/tmp/kafka-logs-3
zookeeper.connect=192.168.37.11:2181,192.168.37.12:2181,192.168.37.13:2181

4、修改三台服务器上的hosts文件使用ip与主机名对应,如下:

192.168.37.11 test11
192.168.37.12 test12
192.168.37.13 test13

4、分别启动test11,12,13服务器上的kafka,命令如下:
nohup ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
5、创建topic,例:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
当然,你可以连接到指定服务器创建,例:
./bin/kafka-topics.sh --create --bootstrap-server test12:9092 --replication-factor 3 --partitions 1 --topic test22
6、查看创建主题的状态,如下:

[root@test11 kafka_2.12-2.3.0]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test22
Topic:test22    PartitionCount:1        ReplicationFactor:3     Configs:segment.bytes=1073741824
        Topic: test22   Partition: 0    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3

#当然用下面这个命令也是可以的
[root@test11 kafka_2.12-2.3.0]# ./bin/kafka-topics.sh --describe --zookeeper test11:2181 --topic test22

leader:是负责给定分区的所有读写的节点。每个节点将成为随机选择的分区部分的领导者。上面节点2是leader节点
replicas:是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
isr:是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获

7、下面我们验证一下,向我们的新主题发送消息

[root@test11 kafka_2.12-2.3.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test22
>I send info in node 1

#在test13上接收消息
[root@test13 kafka_2.12-2.3.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test22
I send info in node 1

8、因为上面我们清楚节点2是leader节点,我们这时把节点2的kafka kill掉,看一下集群有没有受影响

[root@test12 kafka_2.12-2.3.0]# ps aux|grep kafka
root      5724  1.4 16.6 3657172 313304 pts/0  Sl   7月04  15:41 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/usr/local/kafka_2.12-2.3.0/bin/.
......
[root@test12 kafka_2.12-2.3.0]# kill -9 5724
[root@test12 kafka_2.12-2.3.0]# ps aux|grep kafka
root     15860  0.0  0.0 112724   984 pts/0    R+   14:37   0:00 grep --color=auto kafka
[1]+  已杀死               nohup ./bin/kafka-server-start.sh ./config/server.properties > /dev/null 2>&1
[root@test12 kafka_2.12-2.3.0]# ./bin/kafka-topics.sh --describe --bootstrap-server test13:9092 --topic test22 
Topic:test22    PartitionCount:1        ReplicationFactor:3     Configs:segment.bytes=1073741824
        Topic: test22   Partition: 0    Leader: 1       Replicas: 2,1,3 Isr: 1,3
#这时因为原来的leader节点2已经被kill,kafka重新选举出leader节点1来,已经不同步到节点2了。
#我们再看下在test22主题上发消息有没有受到影响
test11发消息:
>I send info after leader 2 is kill
>[2019-07-05 14:38:32,519] WARN [Producer clientId=console-producer] Connection to node 2 (test12/192.168.37.12:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

test13接收消息:
[2019-07-05 14:37:44,032] WARN [Consumer clientId=consumer-1, groupId=console-consumer-19650] Connection to node 2 (test12/192.168.37.12:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
I send info after leader 2 is kill

#从上面看这些消息仍可供消费的,只是会有node 2不能用的警告
文档更新时间: 2019-07-05 16:21   作者:子木