kafka 的部署

zookeeper 集群搭建

  1. 准备三台服务器。ip分别为

192.168.1.103 192.168.1.104 192.168.1.105

  1. 先下载安装zookeeper
1
https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
  1. 下载完毕后将zookeeper上传到3台服务器上,并解压

3台都要操作

1
2
/usr/local/software/zookeeper

解压

1
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz

目前zookeeper home目录为

1
2
/usr/local/software/zookeeper/apache-zookeeper-3.6.3-bin

  1. 修改配置文件

首先创建数据文件夹

1
mkdir /usr/local/software/zookeeper/apache-zookeeper-3.6.3-bin/data

{zookeeper_home}/conf 目录下修改

拷贝模默认配置文件

1
2
cp zoo_sample.cfg  zoo.cfg

修改内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/software/zookeeper/apache-zookeeper-3.6.3-bin/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
server.1=192.168.1.103:2881:3881
server.2=192.168.1.104:2881:3881
server.3=192.168.1.105:2881:3881

  • 修改了数据的目录
  • 修改了server.1 server.2 server.3 的集群信息,格式为集群内的ip + 服务器通信端口 + 选举端口
  1. 在每个服务器的 zookeeper 的data 目录分别写入一个myid 文件,用来标识不同的服务id.

192.168.1.103

1
echo 1 > myid

192.168.1.104

1
echo 2 > myid

192.168.1.105

1
echo 3 > myid

  1. 启动服务器,各个服务器分别执行

启动服务

1
2

./zkServer.sh start

全部启动后,可使用status 查看服务启动状态

1
2
./zkServer.sh  status

第一个启动的角色为 Mode: leader
后面的启动的两个为 Mode: follower

kafak 集群搭建

  1. 下载kafak

https://kafka.apache.org/downloads.html

这里下载的是比较新的 Scala 2.13 - kafka_2.13-3.0.0.tgz (asc, sha512)

解压缩

1
tar xvf kafka_2.13-3.0.0.tgz 

解压缩路径为 /usr/local/software/kafka/kafka_2.13-3.0.0

  1. 修改配置文件 server.properties
  • 修改broker.id 每个节点给一个数字。分别修改为 0 1 2

  • 修改log.dirs 配置日志也就是数据文件的目录,目录会自动创建,这里修改为安装目录下的数据目录,也支持配置多个目录使用 英文逗号 分开。

  • 修改zookeeper.connect 的集群连接地址

1
2
3
4
5
6
7
8
9
10
//不同的节点的值必须不同
broker.id=0

//日志文件目录
log.dirs=/usr/local/software/kafka/kafka_2.13-3.0.0/data



zookeeper.connect=192.168.1.103:2181,192.168.1.104:2181,192.168.1.105:2181

  1. 配置环境变量

切换到root

1
2
su root

编辑kafka相关的环境变量配置

1
vim /etc/profile.d/kafka.sh

写入文件内容

1
2
3
4
5

#KAFKA_HOME
export KAFKA_HOME=/usr/local/software/kafka/kafka_2.13-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin

保存后,应用环境变量。

1
2
source /etc/profile

三台主机同样要做此操作

  1. 启动服务

三台主机都启动

1
2
bin/kafka-server-start.sh -daemon  config/server.properties  

查看端口号

1
netstat -an | grep 9092

如果有错误从 /log/server.log 查看错误信息.

启动的时候zookeeper 先启动再启动 kafka ,关闭的时候先关闭 kafka再关闭 zookeeper

增加新的机器节点

  1. 新创建一个机器(192.168.1.108),安装 kafka。zookeeper 已经有3台了,不再拓展。

  2. 修改配置文件中的broker.id 因为broker.id 每个节点要唯一;

1
broker.id=3
  1. 保证 data 和 logs 下的数据是空的。

  2. 启动新节点的kafka

1
bin/kafka-server-start.sh -daemon  config/server.properties  

可以看到zk 中的节点的ids 已经加上了这个新的节点。

  1. 对旧的topic执行重新负载操作

在任意一个节点 编辑一个json文件

1
vim topics-to-move.json
1
2
3
4
5
6
7
8
9
10
11
12
13
{

"topics":[
{
"topic":"one_topic"

}

],
"version":1

}

目前此topic的分布情况如下

1
2
3
4
5
6
7
kafka-topics.sh   --bootstrap-server 192.168.1.103:9092 --describe   --topic one_topic
Topic: one_topic TopicId: PUEkzODhSgiv9qFSqAM5zg PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: one_topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: one_topic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: one_topic Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: one_topic Partition: 3 Leader: 2 Replicas: 2,0 Isr: 2,0

生成负载计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

表示 使用某个文件来生成新的迁移计划.

192.168.1.103 必须是连接原集群中的某个节点。

最终生成这样一个结果

1
2
3
4
5
6
7
8
9
当前
{"version":1,"partitions":[{"topic":"one_topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"one_topic","partition":1,"replicas":[1,0],"log_dirs":["any","any"]},{"topic":"one_topic","partition":2,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"one_topic","partition":3,"replicas":[2,0],"log_dirs":["any","any"]}]}


计划
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"one_topic","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"one_topic","partition":1,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one_topic","partition":2,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"one_topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]}]}


编辑副本存储计划

1
vim increase-replication-factor.json

内容就是Proposed partition reassignment configuration

1
{"version":1,"partitions":[{"topic":"one_topic","partition":0,"replicas":[2,0],"log_dirs":["any","any"]},{"topic":"one_topic","partition":1,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"one_topic","partition":2,"replicas":[0,2],"log_dirs":["any","any"]},{"topic":"one_topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]}]}

执行副本生成计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --reassignment-json-file increase-replication-factor.json --execute

如果提交有正在进行中的,可以使用

1
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --list

查看当前的进行任务

–cancel 参数可以取消正在执行的任务。

最终验证结果

1
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --reassignment-json-file increase-replication-factor.json --verify

可以看到当前按照此计划执行的进度和结果。

等待一会。

退役旧节点

基本上分2步操作,首先将这个节点上的数据迁移到其他节点,然后停机。

  1. 在节点 编辑一个json文件,定义要执行哪些topic
1
vim topics-to-move.json
1
2
3
4
5
6
7
8
9
10
11
12
13
{

"topics":[
{
"topic":"par_test"

}

],
"version":1

}

  1. 生成新的负载计划
1
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.1.103:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

注意:去掉了要卸载的节点的broker.id