0%

kafka API操作

kafka提供了4类核心API

  • Producer API 生产消息相关接口,自定义生产者、自定义分区分配

  • Consumer API 消费消息相关接口,创建消费者、消费偏移量管理

  • Streams API 构建流处理程序的接口

  • Connect API kafka和外部系统进行数据流连接的连接器,实现将数据导入到kafka或从kafka中导出到外部系统。

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

主题

主题创建

新版本的kafka使用AdminClient代替了之前的ZkUtils类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void createTopic(String topic){
AdminClient adminClient;
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
adminClient = AdminClient.create(properties);
// 主题 分区 副本
NewTopic newTopic = new NewTopic(topic,5,(short)1);
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
try{
result.all().get();
System.out.println("创建成功");
}catch (InterruptedException | ExecutionException e) {
System.out.println("创建失败");
} finally {
adminClient.close();
}
}

主题查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 创建client
* @return
*/
public static AdminClient createAdmin() {
AdminClient adminClient;
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
adminClient = AdminClient.create(properties);
return adminClient;
}

/**
* 列出主题
*/
public static void listTopic() throws ExecutionException, InterruptedException {
AdminClient adminClient = createAdmin();
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> topicNameSet = listTopicsResult.names().get();
System.out.println("----------列出主题-----------");
topicNameSet.forEach(System.out::println);
adminClient.close();
}

主题详细信息

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 主题详细信息
*
* @param topic
* @throws ExecutionException
* @throws InterruptedException
*/
public static void descTopic(String topic) throws ExecutionException, InterruptedException {
AdminClient adminClient = createAdmin();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic));
TopicDescription topicDescription = describeTopicsResult.all().get().get(topic);
System.out.println(topicDescription);
}

修改主题配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 修改主题配置
*
* @param topic
*/
public static void updateTopic(String topic) throws ExecutionException, InterruptedException {
AdminClient adminClient = createAdmin();
ConfigEntry configEntry = new ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "102400");
AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(configResource,Collections.singletonList(alterConfigOp));
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configs);
alterConfigsResult.all().get();
adminClient.close();
}

删除主题

1
2
3
4
5
6
7
8
/**
* 删除主题
* @param topic
*/
public static void deleteTopic(String topic){
AdminClient adminClient = createAdmin();
adminClient.deleteTopics(Collections.singletonList(topic));
}

新增分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 增加分区
*
* @param topic
* @param num
*/
public static void addPartitions(String topic, int num) {
// 增加分区数 num为分区总数
NewPartitions newPartitions = NewPartitions.increaseTo(num);
AdminClient adminClient = createAdmin();
Map<String, NewPartitions> config = new HashMap<>();
config.put(topic, newPartitions);
adminClient.createPartitions(config);
adminClient.close();
}

生产者

实现kafka生产者的一般步骤

  • 创建Properties对象,设置生产者级别配置,必须包含的3个配置

    • bootstrap-server:配置kafka地址

    • key.serializer:配置用于序列化key的类

    • value.serializer:配置用于序列化实际数据的类

  • 根据Properties对象实例化一个KafkaProducer对象

  • 实例化ProducerRecord对象,每条消息对应ProducerRecord对象

  • 调用KafkaProducer发送消息的方法将ProducerRecord发送到kafka,有两种方法send(ProducerRecord),send(ProducerRecord,Callback)。 KafkaProducer默认是异步发送,会将消息缓存到消息缓冲区中,当消息在消息缓冲区中累计到一定数量之后作为一个RecordBatch发送。生产者发送消息分为两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是一个Sender线程负责将缓冲区的消息发送到kafka,执行真正的I/O操作,在第一阶段执行完会返回一个Feature对象,根据对Feature对象处理方式不同,分为两种发送方式

    • 异步方式: 使用异步方式如果希望知道消息发送成功与否,在回调函数Callback中进行相应处理

    • 同步方式: 通过调用send方法返回的Feature对象的get()方法以阻塞式获取执行结果,即等待Sender线程处理的最终结果

  • 关闭KafkaProducer,释放连接资源

单线程生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 设置生产消息的总数
*/
private static final int MSG_SIZE = 10;
/**
* 主题名称
*/
private static final String TOPIC_NAME = "test-topic";

private static KafkaProducer<String, String> producer;

static {
Properties properties = initConfig();
producer = new KafkaProducer<String, String>(properties);
}

/**
* 初始化kafka配置
*
* @return
*/
private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}

同步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
ProducerRecord<String, String> record = null;
try {
int num = 0;
for (int i = 0; i < MSG_SIZE; i++) {
record = new ProducerRecord<>(TOPIC_NAME,"消息key"+i,"消息体"+i);
producer.send(record).get();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}

异步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ProducerRecord<String, String> record = null;
try {
int num = 0;
for (int i = 0; i < MSG_SIZE; i++) {
record = new ProducerRecord<>(TOPIC_NAME,"callback消息key"+i,"callback消息体"+i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
System.out.println("出现异常");
}
if(metadata != null){
System.out.printf("offset:%s,partition:%s",metadata.offset(),metadata.partition());
System.out.println();
}
}
});
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}

多线程生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ProducerThread implements Runnable {

private KafkaProducer<String, String> producer = null;

private ProducerRecord<String, String> record = null;

public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}

@Override
public void run() {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("出现异常");
}
if (metadata != null) {
System.out.printf("offset:%s,partition:%s", metadata.offset(), metadata.partition());
System.out.println();
}
}
});
}
}


// 执行方法
public static void main(String[] args) {
ProducerRecord<String, String> record = null;
ExecutorService executorService = Executors.newFixedThreadPool(4);
try{
for (int i = 0; i < MSG_SIZE; i++) {
record = new ProducerRecord<>(TOPIC_NAME,"多线程callback消息key"+i,"多线程callback消息体"+i);
executorService.submit(new ProducerThread(producer,record));
}
} catch (Exception e){
System.out.println("kakfa发送出现异常"+e.getMessage());
} finally {
producer.close();
executorService.shutdown();
}
}

消费者

消费者创建需要指定连接的kafka的bootstrap-server属性、消息key反序列化类的key.deserializer属性、消息value反序列化类的value.deserializer属性、是否自动提交偏移量enable.auto.commit属性、消费者组group.id

如果不指定消费偏移量提交方式的话,默认是1s自动提交偏移量,使用auto.commit.interval.ms参数设置偏移量提交的时间间隔

订阅主题

kafka提供了四种订阅主题的方式

  • subscribe(Collection topics)

  • subscribe(Collection topics, ConsumerRebalanceListener listener) 订阅主题时指定一个监听器,用于在消费者发生平衡操作时回调进行相应的业务处理 ConsumerRebalanceListener接口,当消费者发生平衡操作时,可以在该接口的相应方法中完成必要的应用程序逻辑处理,如提交偏移量操作,有两个回调方法:

    • 一个是在消费者平衡操作前、消费者停止拉取消息之后被调用的onPartitionsRevoked(Collection partitions),在该方法中可以提交偏移量操作以避免数据重复消费
    • 一个是在平衡之后,消费者开始拉取消息之前被调用的onPartitionsAssigned(Collection partitions)方法,在该方法中保证各消费者回滚到正确的偏移量,即重置各消费者偏移量
  • subscribe(Pattern pattern)

  • subscribe(Pattern pattern, ConsumerRebalanceListener listener)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    private static KafkaConsumer<String, String> consumer;

    private static final String TOPIC_NAME = "test-topic";

    private static final long TIME = 1000L;

    static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<String, String>(properties);
    }

    private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置偏移量自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置偏移量提交时间间隔
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    return properties;
    }

    public static void main(String[] args) {
    // 订阅主题
    consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    consumer.commitSync();//提交偏移量
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    long committedOffset = -1;
    for (TopicPartition partition : partitions) {
    // 获取该分区已消费的偏移量
    committedOffset = consumer.committed(partition).offset();
    // 重置偏移量到上一次提交的偏移量下一个位置处开始消费
    consumer.seek(partition, committedOffset + 1);
    }
    }
    });
    while (true) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(TIME));
    for (ConsumerRecord<String,String> record : consumerRecords) {
    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    }
    }
    }

订阅指定分区

使用consumer.assign(Collection partitions)订阅指定分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static KafkaConsumer<String, String> consumer;

private static final String TOPIC_NAME = "test-topic";

private static final long TIME = 1000L;

static {
Properties properties = initConfig();
consumer = new KafkaConsumer<>(properties);
}

private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置偏移量自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 设置偏移量提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return properties;
}

public static void main(String[] args) {
consumer.assign(Collections.singletonList(new TopicPartition(TOPIC_NAME,0)));
while (true){
ConsumerRecords<String,String> consumerRecords = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String,String> record: consumerRecords) {
System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}

消费偏移量管理

kafka消费者API提供了两种方法用于查询消费偏移量的操作

  • committed(TopicPartition partition)方法,该方法返回一个OffsetAndMetadata对象,通过OffsetAndMetadata对象可获取指定分区已提交的偏移量

  • position(TopicPartition partition)方法,返回下一次拉取位置

kafka提供了重置消费者偏移量的方法

  • seek(TopicPartition partition, long offset)方法用于将消费起始位置重置到指定的偏移量位置

  • seekToBeginning(Collection partitions)方法指定从消息起始位置开始消费,对应偏移量重置策略auto.offset.reset=earliest

  • seekToEnd(Collection partitions)方法指定从最新消息对应的消息开始消费,要等新的消息写入后才进行拉取,对应偏移量重置策略auto.offset.reset=latest

消费者消费位移确定有自动提交与手动提交两种策略。创建消费者对象的时候,参数enable.auto.commit设定,true表示自动提交,默认是自动提交。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。手动提交需要由客户端自己控制偏移量的提交

  • 自动提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private static KafkaConsumer<String, String> consumer;

    private static final String TOPIC_NAME = "test-topic";

    private static final long TIME = 1000L;

    static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<String, String>(properties);
    }

    private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置偏移量自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置偏移量提交时间间隔
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    return properties;
    }

    public static void main(String[] args) {
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    while (true) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(TIME));
    for (ConsumerRecord<String,String> record : consumerRecords) {
    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    }
    }
    }
  • 手动提交 提供了同步提交(commitSync)和异步提交(commitAsync)两种提交方式。同步提交失败时一直尝试提交,直到遇到无法重试的情况下才会结束,同时同步方式下消费者线程在拉取消息的时候会被阻塞,直到偏移量提交操作成功或者在提交过程出现错误。 异步方式下消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返回时就开始下一次的拉取操作,在提交失败时也不会尝试提交

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    private static KafkaConsumer<String, String> consumer;

    private static final String TOPIC_NAME = "test-topic";

    private static final long TIME = 1000L;

    static {
    Properties properties = initConfig();
    consumer = new KafkaConsumer<String, String>(properties);
    }

    private static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置偏移量手动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return properties;
    }

    public static void main(String[] args) {
    consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    while (true) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(TIME));
    for (ConsumerRecord<String, String> record : consumerRecords) {
    System.out.printf("partition = %d,offset = %d,key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    consumer.commitAsync((offsets, exception) -> {
    if(exception == null){
    System.out.println("提交成功");
    } else {
    System.out.println("发生异常");
    }
    });
    }
    }
    }

消费速度控制

使用pause(Collection partitions)和resume(Collection partitions),分别用来暂停某些分区在拉取操作时返回数据给客户端和恢

复某些分区向客户端返回数据操作

多线程消费者

KafkaConsumer为非线程安全的,多线程需要处理好线程同步。常见的实现方式是每个线程各自实例化一个KafkaConsumer对象。将分区作为消费者线程的最小划分单位。(消费者数大于分区数时就会有部分消费者一直处于空闲状态,若多个消费者消费同一分区时需要考虑偏移量提交处理的问题)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ConsumerThread extends Thread {
private KafkaConsumer<String, String> consumer;

public ConsumerThread(Properties config, String topic) {
this.consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe(Collections.singletonList(topic));
}

@Override
public void run() {
try{
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000L));
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.printf("threadId=%s,partition=%d,offset%d,key=%s,value=%s%n", Thread.currentThread().getId(), record.partition(), record.offset(), record.key(), record.value());
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
}

}
}


public class TestThead {
private static final String TOPIC_NAME = "test-topic";

private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置偏移量自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 设置偏移量提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return properties;
}

public static void main(String[] args) {
//假设该主题有六个分区,则开启六个线程
for (int i = 0; i < 6; i++) {
new ConsumerThread(initConfig(), TOPIC_NAME).start();
}
}
}

与spring结合

spring-kafka是通过监听模式消费消息的。定义了一个消息监听者容器接口MessageListerContainer,有两个实现类KafkaMessageListenerContainer和ConcurrentMessageListenerContainer,分别表示单线程容器和多线程并发容器。

多线程并发容器是根据用户指定的并发数来创建多个单线程容器。称为线程容器,是由于消费者线程是交由消息监听者容器来管理,然而监听者容器并不是直接管理消费者线程,而是管理消费者工厂。

提供了一个MessageListener接口,实现该接口,重写onMessage方法即可。