消费者
kafka的消费者以pull的方式获取消息,同时采用了消费者组的模式,每个消费者都属于某个消费者组。在创建消费者时,若不指定消费者的groupId,则该消费者属于默认消费者组
对于同一条消息而言,只能被同组下的某一个消费者消费,但不同的消费者组的消费者能消费同一条消息,可以通过消费者组来实现消息的单播和广播。
kafka提供kafka-console-consumer脚本实现用户在终端模拟消费者消费消息。
脚本
1 | if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then |
消费者命令
1 | #表示从 latest 位移位置开始消费该主题的所有分区消息,即仅消费正在写入的消息。 |
参数说明
- bootstrap-server
- topic 消费的主题
- whitelist 正则表达式,指定要包含以供使用的主题白名单
- partition 指定分区,除非指定-offset 否则从分区结束(lastest)开始消费
- offset 执行消费的起始offset位置 默认lastest lastest earliest
- consumer-property 将用户定义的属性以key=value的形式传给使用者
- consumer.config 消费者配置文件,consumer-property 优先级高于该配置
- formatter 格式化kafka消息以供显示的类的名称 默认kafka.tools.DefaultMessageFormatter 可选kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter
kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter - property 初始化消息格式化程序的属性 print.timestamp=true|false print.key=true|false
print.value=true|false key.separator=line.separator=
key.deserializer=value.deserializer= - from-beginning 如果消费者尚没有已建立的可用于消费的偏移量,则从存在的最早消息开始,而不是从最新消息开始
- max-messages 消费的最大数据量,若不指定,则持续消费下去
- timeout-ms 在指定时间间隔内没有消息可用时退出
- skip-message 处理消息时出错,跳过该消息而不是暂停
- key-deserializer
- value-deserializer
- enable-systest-events 除记录消费者外,还记录消费者的生命周期
- isolation-level 设置为read_committed以过滤掉未提交的事务性消息 设置为read_uncommitted以读取所有消息,默认值read_uncommitted
- group 指定消费者组
- blacklist 要从消费中排除的主题黑名单
- cvs-reporter-enabled 如果设置,将启用csv metrics报告器
- delete-consumer-offsets 如果指定,启动时删除zookeeper中的消费者信息
- metrics-dir 输出csv度量值
- zookeeper (旧版时使用)相当于bootstrap-server
当使用新版本时,消费者不再依赖于zookeeper,当启动一个消费者时,不向zookeeper注册,而是由消费组协调器(GroupCoordinator)统一管理。消费者已消费的消息的偏移量提交后会保存到名为”__consumer_offsets”的内部主题中。
该
__consumer_offsets
主题默认的副本数量是1,即使使用—relication-factor设置了多个副本,如果__consumer_offsets当前副本所在的broker宕机,会导致消费者无法读取和写入topic的offset信息,从而导致生产者可以发送消息但是消费者不可用,需要设置
offsets.topic.replicaton.factor
的值大于1
通过计算消费组名的hashcode值与内部主题分区总数(默认是50个分区)取模来确定消费者偏移量存储的分区。
__consumer_offsets
中存储的为key/value格式,key为消费者组+分区,value为该消费者的offset
查看消费情况
describe 查看某个消费者组当前的消费情况
1 | #查看消费者组名 |
消费多主题
kafka消费者的topic参数不支持同时指定多个主题,但是脚本中有另一个参数whitelist,该参数可以指定多个主题
1 | #同时消费多个主题 |
消费者性能测试工具
使用kafka-consumer-perf-test脚本
脚本
1 | if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then |
命令
1 | >kafka-consumer-perf-test --broker-list localhost:9092 --threads 5 --messages 10000 num-fetch-threads 5 --topic kafka-action |
消费者源码
KafkaCunsumer实现了Consume接口
方法列表
- subscribe 订阅主题
- assign 订阅主题的分区
- poll 拉取消息
- seek/seekToBeginning/seekToEnd 指定消费位置
- commitSync/commitAsync 以同步/异步方式提交偏移量
- assignment 获取分区分配关系
- position 获取下一次消费消息位置
- pause/resume 对分区控制
配置说明
1 | CONFIG = (new ConfigDef()).define("bootstrap.servers", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.HIGH, "") |
消费者初始化
KafkaConsumer是非线程安全的。有一个acquire()方法用来检测每个方法的调用是不是只有一个线程在操作
1 | private void acquire() { |
1 | // 客户端没有指定client.id时,使用该变量自增 |
消费订阅
kafkaConsumer提供了两种订阅消息的方法,一种使用subscribe()方法指定消息对应的主题,支持正则;另一种使用assign()方法指定分区。这两种订阅方式互斥,只能选择一种订阅方式。
主题订阅(subscribe方法)
该订阅方式由同一个消费者组的Leader消费者根据各消费者都支持的分区分配策略为消费者分配分区,可以指定ConsumerRebalanceListener进行平衡操作回调
1 | public void subscribe(Collection<String> topics) { |
分区订阅(assign方法)
该订阅方式客户端直接指定了消费者与分区的对应关系
1 | public void assign(Collection<TopicPartition> partitions) { |
消费消息
KafkaConsumer提供了一个poll方法来拉取消息,通过Fetcher类来完成消息的拉取及更新消费偏移量
Fetcher
Fetcher主要功能就是负责构造拉取消息的FetchRequest请求,然后通过ConsumerNetworkClient发送该请求,最后对返回结果进行处理并更新缓存中记录的消费位置
1 | public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { |
拉取消息
1 | private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { |
1 | private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { |
偏移量提交
kafka将偏移量存储在内部主题”_consumer_offsets”中,内部主题配置了compact策略(类似于redis的aof持久化rewrite功能。由一个logCleaner线程来完成的,会把重复的、并且较老的消息清除掉),保证了该主题总保留各分区被消费的最新偏移量,而且控制了该主题的日志容量。通过该消费者对应的消费者组与该主题分区总数取模的方式确定消费偏移量提交的分区
(Math.abs(${group.id}.hashcode() % ${offsets.topic.num.partitions}))
0.9版本之前偏移量存储在zk(在
/consumers/<group.id>/offsets/<topic>/<partitionId>
目录下),0.9版本之后偏移量存储在kafka中,kafka定义了一个系统的topic,专门用来存储每个消费者组的偏移量的数据
String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets";
消息内容示例
key: .anonymous.fc38a59a-1103-477f-8b71-3895d4f82e18 payload:consumerrange/consumer-2-a4f940d7-ddd6-4852-8c87-48deb9a634f8���C�/consumer-2-a4f940d7-ddd6-4852-8c87-48deb9a634f8
由于偏移量更改频繁,对zk压力过大,所以改为kafka自己维护
手动提交
enable.auto.commit设置为false,kafka提供了同步提交commitSync()和异步提交commitAsync()供客户端提交偏移量。分别调用的是ConsumerCoordinator的commitOffsetsSync方法和commitOffsetsAsync方法。
通过消费者协调器ConsumerCoordinator发送OffsetCommitRequest请求,服务器组协调器GroupCoordinator进行处理,最终将消费者偏移量追加到kafka内部主题中
同步提交时,消费者在提交请求响应结果返回前会一直被阻塞,在成功提交后才会进行下一次拉取消息操作;异步提交不阻塞,当提交发生异常时有可能发生重复消费
自动提交
1 | public void maybeAutoCommitOffsetsAsync(long now) { |
检测时间是否超过了${auto.commit.interval.ms},如果超过了则提交偏移量
在以下操作时检测是否提交偏移量
- 通过kafkaConsumer.assign()订阅分区。
- kafkaConsumer.poll()拉取消息,即在Coordinator.poll方法处理时会进行消费者偏移量提交检测
- 消费者进行平衡操作前,即在ConsumerCoordinator.onJoinPrepare()方法处理时会进行消费偏移量提交检测
- ConsumerCoordinator关闭操作
心跳检测
kafkaConsumer启动后会定期向服务端组协调器GroupCoordinator发送心跳检测HeartbeatRequest请求,通过心跳检测通信双方相互感知对方是否存在并进行相应处理。核心为HeartbeatThread线程,该线程为ConsumerCoordinator父类AbstractCoordinator中的内部类
1 | public void run() { |
消费者平衡
消费者平衡是指消费者重新加入消费组并重新分配分区给消费者的过程。
以下情况会引起消费者平衡操作
- 新的消费者加入消费组
- 当前消费者从消费组退出
- 消费者取消对某个主题的订阅
- 订阅主题的分区增加
- 代理宕机新的协调器当选
- 当消费者在${session.timeout.ms}毫秒还没有发送心跳请求,组协调器认为消费者已退出