kafka(二)kafka环境安装和集群操作
kafka 的部署
zookeeper 集群搭建
- 准备三台服务器。ip分别为
192.168.1.103 192.168.1.104 192.168.1.105
- 先下载安装zookeeper
1 | https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz |
- 下载完毕后将zookeeper上传到3台服务器上,并解压
3台都要操作
1 | /usr/local/software/zookeeper |
解压
1 | tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz |
目前zookeeper home目录为
1 | /usr/local/software/zookeeper/apache-zookeeper-3.6.3-bin |
- 修改配置文件
首先创建数据文件夹
1 | mkdir /usr/local/software/zookeeper/apache-zookeeper-3.6.3-bin/data |
{zookeeper_home}/conf 目录下修改
拷贝模默认配置文件
1 | cp zoo_sample.cfg zoo.cfg |
修改内容如下
1 | # The number of milliseconds of each tick |
- 修改了数据的目录
- 修改了server.1 server.2 server.3 的集群信息,格式为集群内的ip + 服务器通信端口 + 选举端口
- 在每个服务器的 zookeeper 的data 目录分别写入一个myid 文件,用来标识不同的服务id.
192.168.1.103
1 | echo 1 > myid |
192.168.1.104
1 | echo 2 > myid |
192.168.1.105
1 | echo 3 > myid |

- 启动服务器,各个服务器分别执行
启动服务
1 |
|

全部启动后,可使用status 查看服务启动状态
1 | ./zkServer.sh status |

第一个启动的角色为 Mode: leader
后面的启动的两个为 Mode: follower
kafak 集群搭建
- 下载kafak
https://kafka.apache.org/downloads.html
这里下载的是比较新的 Scala 2.13 - kafka_2.13-3.0.0.tgz (asc, sha512)
解压缩
1 | tar xvf kafka_2.13-3.0.0.tgz |
解压缩路径为 /usr/local/software/kafka/kafka_2.13-3.0.0
- 修改配置文件 server.properties
修改broker.id 每个节点给一个数字。分别修改为 0 1 2
修改log.dirs 配置日志也就是数据文件的目录,目录会自动创建,这里修改为安装目录下的数据目录,也支持配置多个目录使用 英文逗号 分开。
修改zookeeper.connect 的集群连接地址
1 | //不同的节点的值必须不同 |
- 配置环境变量
切换到root
1 | su root |
编辑kafka相关的环境变量配置
1 | vim /etc/profile.d/kafka.sh |
写入文件内容
1 |
|
保存后,应用环境变量。
1 | source /etc/profile |
三台主机同样要做此操作
- 启动服务
三台主机都启动
1 | bin/kafka-server-start.sh -daemon config/server.properties |
查看端口号
1 | netstat -an | grep 9092 |
如果有错误从 /log/server.log 查看错误信息.
启动的时候zookeeper 先启动再启动 kafka ,关闭的时候先关闭 kafka再关闭 zookeeper
增加新的机器节点
新创建一个机器(192.168.1.108),安装 kafka。zookeeper 已经有3台了,不再拓展。
修改配置文件中的broker.id 因为broker.id 每个节点要唯一;
1 | broker.id=3 |
保证 data 和 logs 下的数据是空的。
启动新节点的kafka
1 | bin/kafka-server-start.sh -daemon config/server.properties |

可以看到zk 中的节点的ids 已经加上了这个新的节点。
- 对旧的topic执行重新负载操作
在任意一个节点 编辑一个json文件
1 | vim topics-to-move.json |
1 | { |
目前此topic的分布情况如下
1 | kafka-topics.sh --bootstrap-server 192.168.1.103:9092 --describe --topic one_topic |
生成负载计划
1 | bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate |
表示 使用某个文件来生成新的迁移计划.
192.168.1.103 必须是连接原集群中的某个节点。
最终生成这样一个结果
1 | 当前 |
编辑副本存储计划
1 | vim increase-replication-factor.json |
内容就是Proposed partition reassignment configuration
1 | {"version":1,"partitions":[{"topic":"one_topic","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"one_topic","partition":1,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one_topic","partition":2,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"one_topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]}]} |
执行副本生成计划
1 | bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --reassignment-json-file increase-replication-factor.json --execute |

如果提交有正在进行中的,可以使用
1 | bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --list |
查看当前的进行任务
–cancel 参数可以取消正在执行的任务。
最终验证结果
1 | bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --reassignment-json-file increase-replication-factor.json --verify |
可以看到当前按照此计划执行的进度和结果。
等待一会。
退役旧节点
基本上分2步操作,首先将这个节点上的数据迁移到其他节点,然后停机。
- 在节点 编辑一个json文件,定义要执行哪些topic
1 | vim topics-to-move.json |
1 | { |
- 生成新的负载计划
1 | bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate |
注意:去掉了要卸载的节点的broker.id


