0%

kafka消费者

消费者

kafka的消费者以pull的方式获取消息,同时采用了消费者组的模式,每个消费者都属于某个消费者组。在创建消费者时,若不指定消费者的groupId,则该消费者属于默认消费者组

对于同一条消息而言,只能被同组下的某一个消费者消费,但不同的消费者组的消费者能消费同一条消息,可以通过消费者组来实现消息的单播和广播。

kafka提供kafka-console-consumer脚本实现用户在终端模拟消费者消费消息。

脚本

1
2
3
4
5
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"

消费者命令

1
2
3
4
5
6
7
8
9
#表示从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。
>kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-test


#从开始位置消费,表示从指定主题中有效的起始位移位置开始消费所有分区的消息
>kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-test --from-beginning

#消费出的消息结果将打印出消息体的 key 和 value
>kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-test --property print.key=true --from-beginning

参数说明

  • bootstrap-server
  • topic 消费的主题
  • whitelist 正则表达式,指定要包含以供使用的主题白名单
  • partition 指定分区,除非指定-offset 否则从分区结束(lastest)开始消费
  • offset 执行消费的起始offset位置 默认lastest lastest earliest
  • consumer-property 将用户定义的属性以key=value的形式传给使用者
  • consumer.config 消费者配置文件,consumer-property 优先级高于该配置
  • formatter 格式化kafka消息以供显示的类的名称 默认kafka.tools.DefaultMessageFormatter 可选kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter
    kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter
  • property 初始化消息格式化程序的属性 print.timestamp=true|false print.key=true|false
    print.value=true|false key.separator=<key.separator> line.separator=<line.separator>
    key.deserializer=<key.deserializer> value.deserializer=<value.deserializer>
  • from-beginning 如果消费者尚没有已建立的可用于消费的偏移量,则从存在的最早消息开始,而不是从最新消息开始
  • max-messages 消费的最大数据量,若不指定,则持续消费下去
  • timeout-ms 在指定时间间隔内没有消息可用时退出
  • skip-message 处理消息时出错,跳过该消息而不是暂停
  • key-deserializer
  • value-deserializer
  • enable-systest-events 除记录消费者外,还记录消费者的生命周期
  • isolation-level 设置为read_committed以过滤掉未提交的事务性消息 设置为read_uncommitted以读取所有消息,默认值read_uncommitted
  • group 指定消费者组
  • blacklist 要从消费中排除的主题黑名单
  • cvs-reporter-enabled 如果设置,将启用csv metrics报告器
  • delete-consumer-offsets 如果指定,启动时删除zookeeper中的消费者信息
  • metrics-dir 输出csv度量值
  • zookeeper (旧版时使用)相当于bootstrap-server

当使用新版本时,消费者不在依赖于zookeeper,当启动一个消费者时,不向zookeeper注册,而是由消费组协调器(GroupCoordinator)统一管理。消费者已消费的消息的偏移量提交后会保存到名为”__consumer_offsets”的内部主题中。

__consumer_offsets主题默认的副本数量是1,即使使用–relication-factor设置了多个副本,如果__consumer_offsets当前副本所在的broker宕机,会导致消费者无法读取和写入topic的offset信息,从而导致生产者可以发送消息但是消费者不可用,需要设置

offsets.topic.replicaton.factor的值大于1

通过计算消费组名的hashcode值与内部主题分区总数(默认是50个分区)取模来确定消费者偏移量存储的分区。

__consumer_offsets中存储的为key/value格式,key为消费者组+分区,value为该消费者的offset

查看消费情况

describe 查看某个消费者组当前的消费情况

1
2
3
4
5
6
7
8
9
10
11
#查看消费者组名
>kafka-consumer-groups --bootstrap-server localhost:9092 --list

#查看每个分区的偏移量
>kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test1
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
kafka-test 3 200003 200003 0 - - -
kafka-test 4 200004 200004 0 - - -
kafka-test 1 200003 200003 0 - - -
kafka-test 0 200002 200002 0 - - -
kafka-test 2 200002 200002 0 - - -

消费多主题

kafka消费者的topic参数不支持同时指定多个主题,但是脚本中有另一个参数whitelist,该参数可以指定多个主题

1
2
#同时消费多个主题
>kafka-console-consumer --bootstrap-server localhost:9092 --whitelist "kafka-test|kafka-action"

消费者性能测试工具

使用kafka-consumer-perf-test脚本

脚本

1
2
3
4
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@"

命令

1
>kafka-consumer-perf-test --broker-list localhost:9092 --threads 5 --messages 10000 num-fetch-threads 5 --topic kafka-action

消费者源码

KafkaCunsumer实现了Consume接口

方法列表

  • subscribe 订阅主题
  • assign 订阅主题的分区
  • poll 拉取消息
  • seek/seekToBeginning/seekToEnd 指定消费位置
  • commitSync/commitAsync 以同步/异步方式提交偏移量
  • assignment 获取分区分配关系
  • position 获取下一次消费消息位置
  • pause/resume 对分区控制

配置说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
CONFIG = (new ConfigDef()).define("bootstrap.servers", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.HIGH, "")
.define("group.id", Type.STRING, "", Importance.HIGH, "消费组id")
.define("session.timeout.ms", Type.INT, 10000, Importance.HIGH, "组协调器没有收到消费者发来的心跳请求,则协调器会将该消费者从消费组中移除")
.define("heartbeat.interval.ms", Type.INT, 3000, Importance.HIGH, "发送心跳请求的时间间隔")
.define("partition.assignment.strategy", Type.LIST, Collections.singletonList(RangeAssignor.class), new NonNullValidator(), Importance.MEDIUM, "分区分配策略")
.define("metadata.max.age.ms", Type.LONG, 300000, Range.atLeast(0), Importance.LOW, "每隔多长时间强制更新元数据")
.define("enable.auto.commit", Type.BOOLEAN, true, Importance.MEDIUM, "是否开启自动提交消费偏移量")
.define("auto.commit.interval.ms", Type.INT, 5000, Range.atLeast(0), Importance.LOW, "自动提交消费偏移量的时间间隔")
.define("client.id", Type.STRING, "", Importance.LOW, "客户端id")
.define("max.partition.fetch.bytes", Type.INT, 1048576, Range.atLeast(0), Importance.HIGH, "")
.define("send.buffer.bytes", Type.INT, 131072, Range.atLeast(-1), Importance.MEDIUM, "Socket发送消息缓冲区大小")
.define("receive.buffer.bytes", Type.INT, 65536, Range.atLeast(-1), Importance.MEDIUM, "Socket接收消息缓冲区大小")
.define("fetch.min.bytes", Type.INT, 1, Range.atLeast(0), Importance.HIGH, "一次拉取操作等待消息的最小字节数")
.define("fetch.max.bytes", Type.INT, 52428800, Range.atLeast(0), Importance.MEDIUM, "一次拉取操作等待消息的最大字节数")
.define("fetch.max.wait.ms", Type.INT, 500, Range.atLeast(0), Importance.LOW, "客户端等待请求的最长等待时间")
.define("reconnect.backoff.ms", Type.LONG, 50L, Range.atLeast(0L), Importance.LOW, "")
.define("reconnect.backoff.max.ms", Type.LONG, 1000L, Range.atLeast(0L), Importance.LOW, "")
.define("retry.backoff.ms", Type.LONG, 100L, Range.atLeast(0L), Importance.LOW, "")
.define("auto.offset.reset", Type.STRING, "latest", ValidString.in(new String[]{"latest", "earliest", "none"}), Importance.MEDIUM, "重置偏移量,只有在消费者组没有初始化偏移量(即该消费者组第一次消费)或者当前偏移量已经不存在了(已经过期删除了)的时候才会生效,earliest 取最早的偏移量,latest 取最新的偏移量")
.define("check.crcs", Type.BOOLEAN, true, Importance.LOW, "")
.define("metrics.sample.window.ms", Type.LONG, 30000, Range.atLeast(0), Importance.LOW, "")
.define("metrics.num.samples", Type.INT, 2, Range.atLeast(1), Importance.LOW, "")
.define("metrics.recording.level", Type.STRING, RecordingLevel.INFO.toString(), ValidString.in(new String[]{RecordingLevel.INFO.toString(), RecordingLevel.DEBUG.toString()}), Importance.LOW, "")
.define("metric.reporters", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.LOW, "")
.define("key.deserializer", Type.CLASS, Importance.HIGH, "")
.define("value.deserializer", Type.CLASS, Importance.HIGH, "")
.define("request.timeout.ms", Type.INT, 30000, Range.atLeast(0), Importance.MEDIUM, "客户端发送请求后等待回应的超时时间")
.define("default.api.timeout.ms", Type.INT, 60000, Range.atLeast(0), Importance.MEDIUM, "")
.define("connections.max.idle.ms", Type.LONG, 540000, Importance.MEDIUM, "")
.define("interceptor.classes", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.LOW, "")
.define("max.poll.records", Type.INT, 500, Range.atLeast(1), Importance.MEDIUM, "一次拉取消息的最大数量")
.define("max.poll.interval.ms", Type.INT, 300000, Range.atLeast(1), Importance.MEDIUM, "拉取消息线程最长空闲时间,若超过这个时间还没有发起poll操作,则消费组认为该消费者已经离开了消费组") // 此时会触发重平衡,将分区分配给其他成员,如果消息消费时间过长,需要增大该时间
.define("exclude.internal.topics", Type.BOOLEAN, true, Importance.MEDIUM, "")
.defineInternal("internal.leave.group.on.close", Type.BOOLEAN, true, Importance.LOW)
.define("isolation.level", Type.STRING, DEFAULT_ISOLATION_LEVEL, ValidString.in(new String[]{IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)}), Importance.MEDIUM, "")
.define("security.protocol", Type.STRING, "PLAINTEXT", Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).withClientSslSupport().withClientSaslSupport();

消费者初始化

KafkaConsumer是非线程安全的。有一个acquire()方法用来检测每个方法的调用是不是只有一个线程在操作

1
2
3
4
5
6
7
8
9
10
11
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}

private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
1
2
3
4
5
6
// 客户端没有指定client.id时,使用该变量自增
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
// 记录当前操作消费者的线程id,起始值为-1,若该值为-1表示目前还没有线程操作该消费者,在acquire()方法中检测
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// 记录当前操作消费者的线程数,初始值为0,acquire()方法中加一,release方法中减一,并将currentThread重置为-1
private final AtomicInteger refcount = new AtomicInteger(0);

消费订阅

kafkaConsumer提供了两种订阅消息的方法,一种使用subscribe()方法指定消息对应的主题,支持正则;另一种使用assign()方法指定分区。这两种订阅方式互斥,只能选择一种订阅方式。

主题订阅(subscribe方法)

该订阅方式由同一个消费者组的Leader消费者根据各消费者都支持的分区分配策略为消费者分配分区,可以指定ConsumerRebalanceListener进行平衡操作回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void subscribe(Collection<String> topics) {
this.subscribe((Collection)topics, new NoOpConsumerRebalanceListener());
}

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 调用acquire方法检测是否并发操作
this.acquireAndEnsureOpen();

try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
}

if (topics.isEmpty()) {
this.unsubscribe();
} else {
Iterator var3 = topics.iterator();

String topic;
do {
if (!var3.hasNext()) {
this.throwIfNoAssignorsConfigured();
this.log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
// 将订阅的主题列表保存到subscriptions
this.subscriptions.subscribe(new HashSet(topics), listener);
// 更新metadata中主题的过期时间
this.metadata.setTopics(this.subscriptions.groupSubscription());
return;
}

topic = (String)var3.next();
} while(topic != null && !topic.trim().isEmpty());

throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
} finally {
// 重置
this.release();
}

}

分区订阅(assign方法)

该订阅方式客户端直接指定了消费者与分区的对应关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void assign(Collection<TopicPartition> partitions) {
// 检测并发
this.acquireAndEnsureOpen();

try {
if (partitions == null) {
throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
}

if (partitions.isEmpty()) {
this.unsubscribe();
} else {
Set<String> topics = new HashSet();
Iterator var3 = partitions.iterator();

while(true) {
if (var3.hasNext()) {
TopicPartition tp = (TopicPartition)var3.next();
String topic = tp != null ? tp.topic() : null;
if (topic != null && !topic.trim().isEmpty()) {
topics.add(topic);
continue;
}

throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}
// 提交消费偏移量,保证同一个消费者组下的消费者对分区的消费偏移量已提交,防止重复消费
this.coordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
this.log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet(partitions));
// 更新metadata的主题过期时间
this.metadata.setTopics(topics);
break;
}
}
} finally {
this.release();
}

}

消费消息

KafkaConsumer提供了一个poll方法来拉取消息,通过Fetcher类来完成消息的拉取及更新消费偏移量

Fetcher

Fetcher主要功能就是负责构造拉取消息的FetchRequest请求,然后通过ConsumerNetworkClient发送该请求,最后对返回结果进行处理并更新缓存中记录的消费位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final Logger log;
private final LogContext logContext;
// 用于向kafka节点发送请求
private final ConsumerNetworkClient client;
private final Time time;
private final int minBytes;
private final int maxBytes;
private final int maxWaitMs;
private final int fetchSize;
private final long retryBackoffMs;
private final long requestTimeoutMs;
private final int maxPollRecords;
private final boolean checkCrcs;
private final Metadata metadata;
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions;
// 用于保存FetchResponse原始结果
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final IsolationLevel isolationLevel;
private final Map<Integer, FetchSessionHandler> sessionHandlers;
private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
// 用于保存从completedFetches中解析后的结果
private PartitionRecords nextInLineRecords = null;

拉取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();

if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.pollNoWakeup();
}

return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());

return ConsumerRecords.empty();
} finally {
release();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
// 从completedFetches中取数据,第一次进来肯定没有数据,此时返回空的map
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}

// send any new fetches (won't resend pending fetches)
// 构造请求并调用client.send发送请求
fetcher.sendFetches();

// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure

// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}

Timer pollTimer = time.timer(pollTimeout);
// 阻塞等待服务端返回结果
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
timer.update(pollTimer.currentTimeMs());

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator != null && coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}

return fetcher.fetchedRecords();
}

偏移量提交

kafka将偏移量存储在内部主题”_consumer_offsets”中,内部主题配置了compact策略(类似于redis的aof持久化rewrite功能。由一个logCleaner线程来完成的,会把重复的、并且较老的消息清除掉),保证了该主题总保留各分区被消费的最新偏移量,而且控制了该主题的日志容量。通过该消费社对应的消费者组与该主题分区总数取模的方式确定消费偏移量提交的分区

(Math.abs(${group.id}.hashcode() % ${offsets.topic.num.partitions}))

0.9版本之前偏移量存储在zk(在/consumers/<group.id>/offsets//目录下),0.9版本之后偏移量存储在kafka中,kafka定义了一个系统的topic,专用用来存储每个消费者组的偏移量的数据

String GROUP_METADATA_TOPIC_NAME = “__consumer_offsets”;

消息内容示例

key: .anonymous.fc38a59a-1103-477f-8b71-3895d4f82e18 payload:consumerrange/consumer-2-a4f940d7-ddd6-4852-8c87-48deb9a634f8���C�/consumer-2-a4f940d7-ddd6-4852-8c87-48deb9a634f8

由于偏移量更改频繁,对zk压力过大,所以改为kafka自己维护

手动提交

enable.auto.commit设置为false,kafka提供了同步提交commitSync()和异步提交commitAsync()供客户端提交偏移量。分别调用的是ConsumerCoordinator的commitOffsetsSync方法和commitOffsetsAsync方法。

通过消费者协调器ConsumerCoordinator发送OffsetCommitRequest请求,服务器组协调器GroupCoordinator进行处理,最终将消费者偏移量追加到kafka内部主题中

同步提交时,消费者在提交请求响应结果返回前会一直被阻塞,在成功提交后才会进行下一次拉取消息操作;异步提交不阻塞,当提交发生异常时有可能发生重复消费

自动提交
1
2
3
4
5
6
7
8
9
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}

检测时间是否超过了${auto.commit.interval.ms},如果超过了则提交偏移量

在以下操作时检测是否提交偏移量

  • 通过kafkaConsumer.assign()订阅分区。
  • kafkaConsumer.poll()拉取消息,即在Coordinator.poll方法处理时会进行消费者偏移量提交检测
  • 消费者进行平衡操作前,即在ConsumerCoordinator.onJoinPrepare()方法处理时会进行消费偏移量提交检测
  • ConsumerCoordinator关闭操作

心跳检测

kafkaConsumer启动后会定期向服务端组协调器GroupCoordinator发送心跳检测HeartbeatRequest请求,通过心跳检测通信双方相互感知对方是否存在并进行相应处理。核心为HeartbeatThread线程,该线程为ConsumerCoordinator父类AbstractCoordinator中的内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public void run() {
try {
log.debug("Heartbeat thread started");
while (true) {
synchronized (AbstractCoordinator.this) {
if (closed)
return;

if (!enabled) {
AbstractCoordinator.this.wait();
continue;
}

if (state != MemberState.STABLE) {
// the group is not stable (perhaps because we left the group or because the coordinator
// kicked us out), so disable heartbeats and wait for the main thread to rejoin.
disable();
continue;
}

client.pollNoWakeup();
long now = time.milliseconds();

if (coordinatorUnknown()) {
if (findCoordinatorFuture != null || lookupCoordinator().failed())
// the immediate future check ensures that we backoff properly in the case that no
// brokers are available to connect to.
AbstractCoordinator.this.wait(retryBackoffMs);
}
// 会话超时时间sessionTimeout是否过期,过期表明HeartbeatRequest发送后没有收到响应,认为GroupCoordinator不可达
else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
markCoordinatorUnknown();
}
// 距离上次poll操作时间是否大于最大空闲时间max.poll.interval.ms,超过的话认为消费者已离开消费者组
else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
log.warn("This member will leave the group because consumer poll timeout has expired. This " +
"means the time between subsequent calls to poll() was longer than the configured " +
"max.poll.interval.ms, which typically implies that the poll loop is spending too " +
"much time processing messages. You can address this either by increasing " +
"max.poll.interval.ms or by reducing the maximum size of batches returned in poll() " +
"with max.poll.records.");
//发送LeaveGroupRequest请求,准备进行消费者平衡操作
maybeLeaveGroup();
}
// 还未到发送心跳检测时间
else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
// 继续等待
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
// 发送心跳检测
sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat();
}
}

@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
// it is valid to continue heartbeating while the group is rebalancing. This
// ensures that the coordinator keeps the member in the group for as long
// as the duration of the rebalance timeout. If we stop sending heartbeats,
// however, then the session timeout may expire before we can rejoin.
heartbeat.receiveHeartbeat();
} else {
heartbeat.failHeartbeat();

// wake up the thread if it's sleeping to reschedule the heartbeat
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
} catch (AuthenticationException e) {
log.error("An authentication error occurred in the heartbeat thread", e);
this.failed.set(e);
} catch (GroupAuthorizationException e) {
log.error("A group authorization error occurred in the heartbeat thread", e);
this.failed.set(e);
} catch (InterruptedException | InterruptException e) {
Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread", e);
this.failed.set(new RuntimeException(e));
} catch (Throwable e) {
log.error("Heartbeat thread failed due to unexpected error", e);
if (e instanceof RuntimeException)
this.failed.set((RuntimeException) e);
else
this.failed.set(new RuntimeException(e));
} finally {
log.debug("Heartbeat thread has closed");
}
}

消费者平衡

消费者平衡是指消费者重新加入消费组并重新分配分区给消费者的过程。

以下情况会引起消费者平衡操作

  • 新的消费者加入消费组
  • 当前消费者从消费组退出
  • 消费者取消对某个主题的订阅
  • 订阅主题的分区增加
  • 代理宕机新的协调器当选
  • 当消费者在${session.timeout.ms}毫秒还没有发送心跳请求,组协调器认为消费者已退出

欢迎关注我的其它发布渠道