0%

sparing集成kafka

与spring集成

spring-kafka是通过监听模式消费消息的。定义了一个消息监听者容器接口MessageListerContainer,有两个实现类KafkaMessageListenerContainer和ConcurrentMessageListenerContainer,分别表示单线程容器和多线程并发容器。

多线程并发容器是根据用户指定的并发数来创建多个单线程容器。称为线程容器,是由于消费者线程是交由消息监听者容器来管理,然而监听者容器并不是直接管理消费者线程,而是管理消费者工厂。

提供了一个MessageListener接口,实现该接口,重写onMessage方法即可。

生产者

生产者直接使用kafkaTemplate就可以发送数据,分为同步发送和异步发送

异步发送

1
2
3
4
5
6
7
8
9
10
11
12
kafkaTemplate.send("topic","message").addCallback(new ListenableFutureCallback<SendResult<String,String>>(){

@Override
public void onSuccess(SendResult<String, String> result) {

}

@Override
public void onFailure(Throwable ex) {

}
});

同步发送

1
2
3
4
5
6
ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send("topic", "message");
try {
SendResult<String, String> sendResult = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

消费者

消费者可以使用注解@KafkaListener来标注

1
2
3
4
5
6
7
8
9
10
11
@KafkaListener(topicPartitions = { // 设置主题分区
@TopicPartition(topic = "testTopic",partitions = {"0","1"},partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "10"),
@PartitionOffset(partition = "1",initialOffset = "100")
})
},
concurrency = "10", // 并发度
errorHandler = "myErrorHandler") // 失败处理,需要实现一个KafkaListenerErrorHandler接口
public void accept(String msg){

}

控制监听器生命周期

spring-kafka中提供了一个KafkaListenerEndpointRegistry类来控制消费者的生命周期,可以进行启动、暂停和继续

1
2
3
4
5
6
7
// listenId是@KafkaListener中的id属性
// 启动
kafkaListenerEndpointRegistry.getListenerContainer("listenId").start();
// 暂停
kafkaListenerEndpointRegistry.getListenerContainer("listenId").pause();
// 继续
kafkaListenerEndpointRegistry.getListenerContainer("listenId").resume();

欢迎关注我的其它发布渠道