消费者

消费者的消费模式

主要有2种模式,推模式和拉模式。

推模式: 由broker主动向消费者推送数据。
优点是消费者不用做主要的操作,只要接收就行。
缺点是消费者不能自行决定消费速度,可能出现消费能力不足的问题。推送的数据大于能消费的能力上限。

拉模式: 由消费者自定去broker拉取指定的数据。
优点是: 消费数据的速度由消费者来控制。
缺点是:消费者需要主要去拉取数据,并且可能循环拉取不到数据(没有数据)导致空循环。

消费者的消费流程

消费者组: 在创建消费者的时候,必须为消费者设置一个消费者组id。相同消费者组id 的一组消费者 称为是一个消费者组。

不同消费者组的消费是相互独立的,比如两个消费者组 id 分别为 0 1 ,都消费topicA 的数据,不同的组都可以消费一遍相同的数据,是不冲突的。

针对不同的消费者组 都记录了一个消费消息的offset,默认这个offset 维护在 __consumer_offsets topic 中存储。

当topic 有多个分区的时候,有多个消费者组成的消费者组就可以共同消费这部分数据,达到协同的作用。比如有4个分区,有4个消费者组成的组,那么每个消费者消费一个分区。由定义的策略决定使用那种方式。

注意:一个分区的数据只能有一个消费者来消费,如果分区数量 > 消费者的数量 那么会出现消费者闲置的情况。

消费者的初始化

在broker的每个broker节点上都有一个 coordinator ,主要作用是 辅助实现消费者组的初始化和分区的分配。

首先 每个消费者组的offset 信息是存放到 __consumer_offsets 主题中的,并且此主题 的分区数量是 50 。

因为在每个broker上都有一个coordinator 那么就需要选择一个特定的作为一个协调者。

会根据groupId 计算hashCode % 50 (分区数量) 得到特定的consumer_offsets 分区,再以此分区所在的broker节点上的 coordinator 作为选中的coordinator。

  1. 已经选定了一个特定的coordinator.
  2. 消费者组中的每个消费者 向coordinator 发送一个JoinGroup 请求。
  3. coordinator 根据接收到的多个消费者,选定其中一个消费者作为 消费者组中的消费者leader。
  4. 消费者leader 会制定消费方案,并将消费方案发送给 coordinator
  5. coordinator 将收到的方案分发给各个消费者。
  6. 消费者会与coordinator 保持3s 每次的心跳,当超过45秒未收到心跳,认为此消费者挂了,就会触发再平衡。同时如果某个消费者消费的时间过长,默认超过5分钟,也会触发再平衡分配。

参数: session.timeout.ms=45s max.poll.interval.ms5分钟

消费者的消费流程

  1. 消费者向broker 发送拉取请求。 consumerNetWorkClient 发送 send()

  2. 监听回调方法completeFetches(queue) 从队列中获取一条一条的数据记录。

  3. 对数据执行反序列化操作。 经过拦截器 处理,最终处理数据。

消费者的相关参数

  • bootstrap.servers broker地址

  • key.deserializer 和 value.deserializer 反序列化类,要序列化的对应

  • group.id 组id。

  • enable.auto.commit 开启自动提交。默认值为 true,消费者会自动周期性地向服务器提交偏移量。

  • auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

  • auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在该如何处理?

    • earliest:自动重置偏移量到最早的偏移量。
    • latest:默认,自动重置偏移量为最新的偏移量。
    • none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。
    • anything:向消费者抛异常。
  • offsets.topic.num.partitions __consumer_offsets 的分区数,默认是 50 个分区。

  • heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3

  • session.timeout.ms 心跳超时时间

  • max.poll.interval.ms 消息处理最大的时长,默认5分钟,超过后执行再平衡

  • fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。

  • fetch.max.wait.ms 最大拉取等待时间,一直等不导达到指定大小的数据 超过此时间,仍然返回。

  • fetch.max.bytes 默认 Default: 52428800(50 m)。 消费者每次拉取最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝
    对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。

  • max.poll.records 一次poll拉取数据返回消息的最大条数,默认是 500 条

消费数据

消费某个topic的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-a");


KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList("test_topic"));

//循环拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("消息:"+consumerRecord.value());
}
}

消费特定topic的特定分区的数据

1
2
3
4
5
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

TopicPartition topicPartition = new TopicPartition("test_topic",0);
consumer.assign(Arrays.asList(topicPartition));

使用 assign传入topic 和分区信息

测试消费者组 的分区负载均衡

  1. 启动3个相同消费者groupid 的消费者。并且都监听同一个topic ,不指定分区。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group-a");

KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList("test_topic"));

//循环拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("消费者"+args[0]+"消息--->:"+consumerRecord.value() + " 分区信息:"+consumerRecord.partition());
}
}
  1. 因为目前分区数量是3个,启动3个实例运行。

  1. 生产者发送消息

生产者均匀向3个分区发送100条数据。

  1. 各个消费者消费情况。

可以看到消费者组中的各个消费者实例分别消费了一个分区的数据。

分区的消费者分配策略

主要定义分区和消费者的关系,及哪部分消费者来消费哪部分分区的数据。
目前有4种分区策略。

  • Range
  • RoundRobin
  • Sticky
  • CooperativeSticky

可以配合修改参数 partition.assignment.strategy 来修改特定的策略。
默认的策略是 Range + CooperativeSticky 。为什么是2个呢,因为kafka可以多个策略同时使用。

Range

对应的实现类 org.apache.kafka.clients.consumer.RangeAssignor

基本逻辑: 针对某个topic,对订阅了此topic的消费者组信息排序。然后计算出平均每个组应该分得多少个。按照分区号 顺序分配给多个消费者。

假设有5个分区。 2 个消费者。

那么将会给 第一个消费者 分配 [0,1,2] 第二个消费者分配 [3,4]

RoundRobin

对应的实现类 org.apache.kafka.clients.consumer.RoundRobinAssignor

通过循环的方式将分区分配给使用者。

首先会把所有的topic的分区信息 执行排序,然后通过轮训的方式为每个topic分区执行分配对应的消费者信息。

代码配置:

1
2
3
List<String> strategyList = new ArrayList<>();
strategyList.add("org.apache.kafka.clients.consumer.RoundRobinAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,strategyList);

Sticky

粘性分配策略,对应的实现类 org.apache.kafka.clients.consumer.StickyAssignor

此分配策略是下次分配的时候尽量分配给上次相同的消费者。这里的下次主要指的是触发再分配的时候。

比如本来有3个消费者。

C0 C1 C2 已经分配的分配对应关系是

C0 –> p1 p2

C1 –> p3 p4

C2 –> p5 p6

当使用此策略的时候,执行再分配的时候,尽量满足大部分消费者的处理的分区不变。

当C2 挂了,那么执行的结果为

C0 –> p1 p2 p5

C1 –> p3 p4 p6

代码配置:

1
2
3
List<String> strategyList = new ArrayList<>();
strategyList.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,strategyList);

CooperativeSticky

对应的实现类: org.apache.kafka.clients.consumer.CooperativeStickyAssignor

CooperativeSticky 的分区逻辑最终的结果和Sticky是一样的,区别是内容使用的协议。

Sticky 使用的是EAGER协议,而CooperativeSticky 使用COOPERATIVE协议。

EAGER协议在执行重分配的时候,会将所有的之前的分配停止掉,放弃之前的所有分区,然后再建立新的分区。只不过新的分区和之前的分区尽量保持粘性关系。在此过程中,不需要变动的消费者也得停止消费。

而COOPERATIVE 协议这个过程是一个渐进平衡的过程,不需要变动的消费者保留原有的分区信息,等待协调者处理需要变动的分区。

Offset

默认offset 是存储在 特定的topic (**__consumer_offsets**)中的。既然是topic那么就可以监听和消费此topic的。直接消费的话发现是无法消费的。

需要开启个系统的参数:

1.

在配置文件

1
2
config/consumer.properties 中添加配置 exclude.internal.topics=true (默认是false)

  1. 执行命令行的消费
1
kafka-console-consumer.sh --topic  __consumer_offsets --bootstrap-server hadoop1:9092 --consumer.config ../config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning 
  1. 查看消费结果

offset的提交

消费者消费完指定的数据后,将消费的数据执行提交,将消费的offset信息提交给broker,broker写入到__consumer_offsets 的topic中。

默认情况下,消费者开启了自动提交机制。每隔一段时间将一批数据自动提交。

自动提交配置: enable.auto.commit=true 默认是true

如果设置自动提交为false ,并且不手动提交,那么将会重复消费消费过的数据,因为offset不能被记录到topic中。

关闭自动提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"offset-d");
//关闭自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);


KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList(Constant.TOPIC));

//循环拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if(!consumerRecords.isEmpty()){
System.out.println("拉取到了消息=====================>");
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.offset() + "===>"+consumerRecord.value());
}
System.out.println("消息结束========================>");
}
}

自动提交的时间很长

1
2
3
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//auto.commit.interval.ms
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,50000000);

如果自动提交的时间设置的很长,并且还未的及自动提交,仍然会导致再次消费没有提交的数据。

使用手动提交

使用手动提交,就不依赖自动提交机制更方便的控制消息的提交。

  1. 首先关闭自动提交
1
2
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

  1. 在获取消息后,调用提交APi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//循环拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if(!consumerRecords.isEmpty()){
System.out.println("拉取到了消息=====================>");
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.offset() + "===>"+consumerRecord.value());
}
System.out.println("消息结束========================>");
}
//同步
consumer.commitSync();
//异步
//consumer.commitAsync();
}

需要注意的是,提供了2种提交方式,同步提交(commitSync)和异步提交(commitAsync)
主要的区别是同步提交会等待提交完成才能继续操作,在一定程度上减低的程序的吞吐量。

  • consumer.commitSync() 同步提交
  • consumer.commitAsync() 异步提交

从中间消费

第一次从哪里消费

这里涉及到一个参数
auto.offset.reset 表示在一个没有记录消费位置的消费者组第一次消费的时候从什么位置消费。比如一个新的消费者组,默认是从最新的偏移量来消费,而不会消费更早的数据。

  • latest 最近的数据 (默认)
  • earliest 最早的数据
  • none 不指定,但是如果之间没有记录会抛出异常
1
2
3
4
5
6
7
8
//1
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

//2
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

//3
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"none");

从指定位置消费

拉取分区号100 以后的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"offset-111");

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList(Constant.TOPIC));

Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0){
//获取分区信息
consumer.poll(Duration.ofMillis(100L));
assignment = consumer.assignment();
}

for (TopicPartition topicPartition : assignment) {
consumer.seek(topicPartition,100);
}

//循环拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if(!consumerRecords.isEmpty()){
System.out.println("拉取到了消息=====================>");
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区号:"+consumerRecord.partition() + "==> "+ consumerRecord.offset() + "===>"+consumerRecord.value());
}
System.out.println("消息结束========================>");
}
}
}

重点使用 consumer.seek(topicPartition,100); API 指定offset

消费某个时间段内的数据

假设要消费从指定的时间前开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//10分钟前开始的数据
Long beforeTimeMillion = 10 * 60 * 1000L;

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"offset-111");

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

consumer.subscribe(Arrays.asList(Constant.TOPIC));
//获取分区信息
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0){
consumer.poll(Duration.ofMillis(100L));
assignment = consumer.assignment();
}
Map<TopicPartition,Long> topicPartitionLongMap = new HashMap<>();
for (TopicPartition topicPartition : assignment) {
//减去指定时间前的
topicPartitionLongMap.put(topicPartition, System.currentTimeMillis() - beforeTimeMillion);
}
//通过时间获取offset等信息 ,获取每个分区的对应的OffsetAndTimestamp 对象
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionLongMap);

//再次对每个分区进行seek操作
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
if(offsetAndTimestamp != null){
//还是获取offset
consumer.seek(topicPartition,offsetAndTimestamp.offset());
}
}

//循环拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if(!consumerRecords.isEmpty()){
System.out.println("拉取到了消息=====================>");
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("分区号:"+consumerRecord.partition() + "==> "+ consumerRecord.offset() + "===>"+consumerRecord.value());
}
System.out.println("消息结束========================>");
}
}

其实主要是通过时间获取一个时间到offset的转换,最终还是调用了seek的api