与spring集成
spring-kafka是通过监听模式消费消息的。定义了一个消息监听者容器接口MessageListerContainer,有两个实现类KafkaMessageListenerContainer和ConcurrentMessageListenerContainer,分别表示单线程容器和多线程并发容器。
多线程并发容器是根据用户指定的并发数来创建多个单线程容器。称为线程容器,是由于消费者线程是交由消息监听者容器来管理,然而监听者容器并不是直接管理消费者线程,而是管理消费者工厂。
提供了一个MessageListener接口,实现该接口,重写onMessage方法即可。
生产者
生产者直接使用kafkaTemplate就可以发送数据,分为同步发送和异步发送
异步发送
1 2 3 4 5 6 7 8 9 10 11 12
| kafkaTemplate.send("topic","message").addCallback(new ListenableFutureCallback<SendResult<String,String>>(){
@Override public void onSuccess(SendResult<String, String> result) {
}
@Override public void onFailure(Throwable ex) { } });
|
同步发送
1 2 3 4 5 6
| ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send("topic", "message"); try { SendResult<String, String> sendResult = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
|
消费者
消费者可以使用注解@KafkaListener来标注
1 2 3 4 5 6 7 8 9 10 11
| @KafkaListener(topicPartitions = { // 设置主题分区 @TopicPartition(topic = "testTopic",partitions = {"0","1"},partitionOffsets = { @PartitionOffset(partition = "0",initialOffset = "10"), @PartitionOffset(partition = "1",initialOffset = "100") }) }, concurrency = "10", // 并发度 errorHandler = "myErrorHandler") public void accept(String msg){
}
|
控制监听器生命周期
spring-kafka中提供了一个KafkaListenerEndpointRegistry类来控制消费者的生命周期,可以进行启动、暂停和继续
1 2 3 4 5 6 7
|
kafkaListenerEndpointRegistry.getListenerContainer("listenId").start();
kafkaListenerEndpointRegistry.getListenerContainer("listenId").pause();
kafkaListenerEndpointRegistry.getListenerContainer("listenId").resume();
|