0%

kafka API操作之消费者

kafka提供了4类核心API

  • Producer API 生产消息相关接口,自定义生产者、自定义分区分配

  • Consumer API 消费消息相关接口,创建消费者、消费偏移量管理

  • Streams API 构建流处理程序的接口

  • Connect API kafka和外部系统进行数据流连接的连接器,实现将数据导入到kafka或从kafka中导出到外部系统。

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

消费者

消费者创建需要指定连接的kafka的bootstrap-server属性、消息key反序列化类的key.deserializer属性、消息value反序列化类的value.deserializer属性、是否自动提交偏移量enable.auto.commit属性、消费者组group.id

如果不指定消费偏移量提交方式的话,默认是1s自动提交偏移量,使用auto.commit.interval.ms参数设置偏移量提交的时间间隔

订阅主题

kafka提供了四种订阅主题的方式

  • subscribe(Collection<String> topics)

  • subscribe(Collection<String> topics, ConsumerRebalanceListener listener)订阅主题时指定一个监听器,用于在消费者发生平衡操作时回调进行相应的业务处理 ConsumerRebalanceListener接口,当消费者发生平衡操作时,可以在该接口的相应方法中完成必要的应用程序逻辑处理,如提交偏移量操作,有两个回调方法:

  • 一个是在消费者平衡操作前、消费者停止拉取消息之后被调用的onPartitionsRevoked(Collection<TopicPartition> partitions),在该方法中可以提交偏移量操作以避免数据重复消费

    • 一个是在平衡之后,消费者开始拉取消息之前被调用的onPartitionsAssigned(Collection<TopicPartition> partitions)方法,在该方法中保证各消费者回滚到正确的偏移量,即重置各消费者偏移量
  • subscribe(Pattern pattern)

  • subscribe(Pattern pattern, ConsumerRebalanceListener listener)

消费偏移量管理

kafka消费者API提供了两种方法用于查询消费偏移量的操作

  • committed(TopicPartition partition)方法,该方法返回一个OffsetAndMetadata对象,通过OffsetAndMetadata对象可获取指定分区已提交的偏移量

  • position(TopicPartition partition)方法,返回下一次拉取位置

kafka提供了重置消费者偏移量的方法

  • seek(TopicPartition partition, long offset)方法用于将消费起始位置重置到指定的偏移量位置

  • seekToBeginning(Collection<TopicPartition> partitions)方法指定从消息起始位置开始消费,对应偏移量重置策略auto.offset.reset=earliest

  • seekToEnd(Collection<TopicPartition> partitions)方法指定从最新消息对应的消息开始消费,要等新的消息写入后才进行拉取,对应偏移量重置策略auto.offset.reset=latest

消费者消费位移确定有自动提交与手动提交两种策略。创建消费者对象的时候,参数enable.auto.commit设定,true表示自动提交,默认是自动提交。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交

  • 自动提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    private static KafkaConsumer<String, String> consumer;

    private static final String TOPIC_NAME = "test-topic";

    private static final long TIME = 1000L;

    static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<String, String>(properties);
    }

    private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置偏移量自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置偏移量提交时间间隔
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    return properties;
    }

    public static void main(String[] args) {
    // 订阅主题
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    while (true) {
    // poll指定的是一个超时时长,避免轮询过于频繁
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(TIME));
    for (ConsumerRecord<String,String> record : consumerRecords) {
    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    }
    }
    }

如果上次提交的偏移量为80,当前消费者已经消费到了偏移量为100的消息,在没有到auto.commit.interval.ms的情况下消费者挂了,此时会将该分区分配给其他消费者,其他消费者会从80开始消费,导致了消息的重复消费

  • 手动提交 提供了同步提交(commitSync)和异步提交(commitAsync)两种提交方式。同步提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息的时候会被阻塞,直到偏移量提交操作成功或者在提交过程出现错误。 异步方式下消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返回时就开始下一次的拉取操作,在提交失败时也不会尝试提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    private static KafkaConsumer<String, String> consumer;

    private static final String TOPIC_NAME = "test-topic";

    private static final long TIME = 1000L;

    static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<String, String>(properties);
    }

    private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置偏移量手动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return properties;
    }

    public static void main(String[] args) {
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    while (true) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(TIME));
    for (ConsumerRecord<String, String> record : consumerRecords) {
    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    // 同步提交,程序会等待提交成功后在处理下一条消息,会限制程序的吞吐量
    // consumer.commitSync();
    // 异步提交
    consumer.commitAsync((offsets, exception) -> {
    if(exception == null){
    System.out.println("提交成功");
    } else {
    System.out.println("发生异常");
    }
    });
    }
    }
    }
  • 自定义存储偏移量

    由于自动提交可能会导致数据丢失(已确定消费,但是业务代码没有执行完成,如果系统宕机,会导致数据丢失)

    手动提交可能会导致重复消费(业务已经执行完成,在进行偏移量提交时,提交失败,系统宕机,会导致数据重复消费)

    所以可以使用自定义存储偏移量的方式,在获取分区偏移量时,根据自己的存储来获取偏移量(可以存储在mysql或redis中)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    private static KafkaConsumer<String, String> consumer;

    private static final String TOPIC_NAME = "test-topic";

    private static final long TIME = 1000L;

    static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<String, String>(properties);
    }

    private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    // 消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    // 重置偏移量
    // 只有在消费者组没有初始化偏移量(即该消费者组第一次消费)或者当前偏移量已经不存在了(已经过期删除了)的时候才会生效
    // earliest 取最早的偏移量
    // latest 取最新的偏移量
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置偏移量自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置偏移量自动提交的时间间隔
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    return properties;
    }

    public static void main(String[] args) {
    // 订阅主题
    consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {// 自定义存储offset
    // 在Rebalance之前调用
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    consumer.commitSync();//提交偏移量
    }

    // 在rebalance之后调用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 由于进行分区的重新分配了,所以需要重新获取一下该分区的偏移量
    long committedOffset = -1;
    for (TopicPartition partition : partitions) {
    // 获取该分区已消费的偏移量
    committedOffset = consumer.committed(partition).offset();
    // 重置偏移量到该分区上一次提交的偏移量下一个位置处开始消费
    consumer.seek(partition, committedOffset + 1);
    }
    }
    });
    while (true) {
    // 拉取数据,是一个长轮询,如果kafka中没有数据,就会导致一直进行循环,传入的时间是延迟时间,如果当前没有数据可供消费,消费者会等待一段时间之后在返回,防止没有数据时一直轮询
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(TIME));
    for (ConsumerRecord<String,String> record : consumerRecords) {
    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    }
    }
    }

订阅指定分区

使用consumer.assign(Collection<TopicPartition> partitions)订阅指定分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static KafkaConsumer<String, String> consumer;

private static final String TOPIC_NAME = "test-topic";

private static final long TIME = 1000L;

static {
Properties properties = initConfig();
consumer = new KafkaConsumer<>(properties);
}

private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置偏移量自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 设置偏移量提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return properties;
}

public static void main(String[] args) {
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME,0)));
while (true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String,String> record: consumerRecords) {
System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}

消费速度控制

使用pause(Collection<TopicPartition> partitions)resume(Collection<TopicPartition> partitions),分别用来暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据操作

多线程消费者

KafkaConsumer为非线程安全的,多线程需要处理好线程同步。常见的实现方式是每个线程各自实例化一个KafkaConsumer对象。将分区作为消费者线程的最小划分单位。(消费者数大于分区数时就会有部分消费者一直处于空闲状态,若多个消费者消费同一分区时需要考虑偏移量提交处理的问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ConsumerThread extends Thread {
private KafkaConsumer<String, String> consumer;

public ConsumerThread(Properties config, String topic) {
this.consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe(Collections.singletonList(topic));
}

@Override
public void run() {
try{
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.printf("threadId=%s,partition=%d,offset%d,key=%s,value=%s%n", Thread.currentThread().getId(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
}

}
}


public class TestThead {
private static final String TOPIC_NAME = "test-topic";

private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 使用相同的groupid,属于同一消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置偏移量自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 设置偏移量提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return properties;
}

public static void main(String[] args) {
//假设该主题有六个分区,则开启六个线程
for (int i = 0; i < 6; i++) {
new ConsumerThread(initConfig(), TOPIC_NAME).start();
}
}
}

欢迎关注我的其它发布渠道