基础环境
服务器:
test11 192.168.37.11
test12 192.168.37.12
test13 192.168.37.13
系统版本:
centos 7.6
安装
1、下载安装包:wget http://mirror-hk.koddos.net/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
2、解压并进入目录:
tar zxf kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/
3、只需要修改server.properties文件的部分配置,如下:
test11:
vim config/server.properties
broker.id=1
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.37.11:2181,192.168.37.12:2181,192.168.37.13:2181
test12:
vim config/server.properties
broker.id=2
log.dirs=/tmp/kafka-logs-2
zookeeper.connect=192.168.37.11:2181,192.168.37.12:2181,192.168.37.13:2181
test13:
vim config/server.properties
broker.id=3
log.dirs=/tmp/kafka-logs-3
zookeeper.connect=192.168.37.11:2181,192.168.37.12:2181,192.168.37.13:2181
4、修改三台服务器上的hosts文件使用ip与主机名对应,如下:
192.168.37.11 test11
192.168.37.12 test12
192.168.37.13 test13
4、分别启动test11,12,13服务器上的kafka,命令如下:nohup ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
5、创建topic,例:./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
当然,你可以连接到指定服务器创建,例:./bin/kafka-topics.sh --create --bootstrap-server test12:9092 --replication-factor 3 --partitions 1 --topic test22
6、查看创建主题的状态,如下:
[root@test11 kafka_2.12-2.3.0]# ./bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test22
Topic:test22 PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test22 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
#当然用下面这个命令也是可以的
[root@test11 kafka_2.12-2.3.0]# ./bin/kafka-topics.sh --describe --zookeeper test11:2181 --topic test22
leader:是负责给定分区的所有读写的节点。每个节点将成为随机选择的分区部分的领导者。上面节点2是leader节点
replicas:是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
isr:是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获
7、下面我们验证一下,向我们的新主题发送消息
[root@test11 kafka_2.12-2.3.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test22
>I send info in node 1
#在test13上接收消息
[root@test13 kafka_2.12-2.3.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test22
I send info in node 1
8、因为上面我们清楚节点2是leader节点,我们这时把节点2的kafka kill掉,看一下集群有没有受影响
[root@test12 kafka_2.12-2.3.0]# ps aux|grep kafka
root 5724 1.4 16.6 3657172 313304 pts/0 Sl 7月04 15:41 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/usr/local/kafka_2.12-2.3.0/bin/.
......
[root@test12 kafka_2.12-2.3.0]# kill -9 5724
[root@test12 kafka_2.12-2.3.0]# ps aux|grep kafka
root 15860 0.0 0.0 112724 984 pts/0 R+ 14:37 0:00 grep --color=auto kafka
[1]+ 已杀死 nohup ./bin/kafka-server-start.sh ./config/server.properties > /dev/null 2>&1
[root@test12 kafka_2.12-2.3.0]# ./bin/kafka-topics.sh --describe --bootstrap-server test13:9092 --topic test22
Topic:test22 PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: test22 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3
#这时因为原来的leader节点2已经被kill,kafka重新选举出leader节点1来,已经不同步到节点2了。
#我们再看下在test22主题上发消息有没有受到影响
test11发消息:
>I send info after leader 2 is kill
>[2019-07-05 14:38:32,519] WARN [Producer clientId=console-producer] Connection to node 2 (test12/192.168.37.12:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
test13接收消息:
[2019-07-05 14:37:44,032] WARN [Consumer clientId=consumer-1, groupId=console-consumer-19650] Connection to node 2 (test12/192.168.37.12:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
I send info after leader 2 is kill
#从上面看这些消息仍可供消费的,只是会有node 2不能用的警告