0%

kafka API操作之生产者

kafka提供了4类核心API

  • Producer API 生产消息相关接口,自定义生产者、自定义分区分配

  • Consumer API 消费消息相关接口,创建消费者、消费偏移量管理

  • Streams API 构建流处理程序的接口

  • Connect API kafka和外部系统进行数据流连接的连接器,实现将数据导入到kafka或从kafka中导出到外部系统。

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

生产者

实现kafka生产者的一般步骤

  • 创建Properties对象,设置生产者级别配置,必须包含的3个配置

    • bootstrap-server:配置kafka地址

    • key.serializer:配置用于序列化key的类

    • value.serializer:配置用于序列化实际数据的类

  • 选填的配置(参照ProducerConfig类)

    • acks ack的应答级别,0/1/-1
    • retries 重试次数
    • batch.size 批次大小(达到该大小才会发送)
    • linger.ms 等待时间(如果达到该时间时,batch.size依然没有达到,也会进行发送)
    • buffer.memory RecordAccumulator缓冲区大小,消息会组织成批次放入缓冲区中
  • 根据Properties对象实例化一个KafkaProducer对象

  • 实例化ProducerRecord对象,每条消息对应ProducerRecord对象

  • 调用KafkaProducer发送消息的方法将ProducerRecord发送到kafka,有两种方法send(ProducerRecord),send(ProducerRecord,Callback)。 KafkaProducer默认是异步发送,会将消息缓存到消息缓冲区中,当消息在消息缓冲区中累计到一定数量之后作为一个RecordBatch发送。生产者发送消息分为两个阶段:第一阶段是将消息发送到消息缓冲区;第二阶段是一个Sender线程负责将缓冲区的消息发送到kafka,执行真正的I/O操作,在第一阶段执行完会返回一个Feature对象,根据对Feature对象处理方式不同,分为两种发送方式

    • 异步方式: 使用异步方式如果希望知道消息发送成功与否,在回调函数Callback中进行相应处理

    • 同步方式: 通过调用send方法返回的Feature对象的get()方法以阻塞式获取执行结果,即等待Sender线程处理的最终结果

  • 关闭KafkaProducer,释放连接资源

单线程生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 设置生产消息的总数
*/
private static final int MSG_SIZE = 10;
/**
* 主题名称
*/
private static final String TOPIC_NAME = "test-topic";

private static KafkaProducer<String, String> producer;

static {
Properties properties = initConfig();
producer = new KafkaProducer<String, String>(properties);
}

/**
* 初始化kafka配置
*
* @return
*/
private static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}

同步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ProducerRecord<String, String> record = null;
try {
int num = 0;
for (int i = 0; i < MSG_SIZE; i++) {
record = new ProducerRecord<>(TOPIC_NAME,"消息key"+i,"消息体"+i);
// send方法默认就是异步的,返回的是Future<RecordMetadata>对象,所以需要使用get来进行获取结果
producer.send(record).get();
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}

异步方式(使用回调函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ProducerRecord<String, String> record = null;
try {
int num = 0;
for (int i = 0; i < MSG_SIZE; i++) {
record = new ProducerRecord<>(TOPIC_NAME,"callback消息key"+i,"callback消息体"+i);
// 也可以使用回调函数来进行获取是否发送成功
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception == null){
System.out.printf("offset:%s,partition:%s",metadata.offset(),metadata.partition());
System.out.println();
} else { // 只有发送失败了exception才会有异常信息,同时metadata为空

System.out.println("出现异常");

}
}
});
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
producer.close();
}

多线程生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ProducerThread implements Runnable {

private KafkaProducer<String, String> producer = null;

private ProducerRecord<String, String> record = null;

public ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}

@Override
public void run() {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("出现异常");
}
if (metadata != null) {
System.out.printf("offset:%s,partition:%s", metadata.offset(), metadata.partition());
System.out.println();
}
}
});
}
}


// 执行方法
public static void main(String[] args) {
ProducerRecord<String, String> record = null;
ExecutorService executorService = Executors.newFixedThreadPool(4);
try{
for (int i = 0; i < MSG_SIZE; i++) {
record = new ProducerRecord<>(TOPIC_NAME,"多线程callback消息key"+i,"多线程callback消息体"+i);
executorService.submit(new ProducerThread(producer,record));
}
} catch (Exception e){
System.out.println("kakfa发送出现异常"+e.getMessage());
} finally {
producer.close();
executorService.shutdown();
}
}

欢迎关注我的其它发布渠道