生产者
初始化
kafka在初始化时会实例化一个sender对象用于发送消息
1
this.sender = newSender(logContext, kafkaClient, this.metadata);
初始化时还会实例化一个ioThread线程
1
2
3String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
发送消息
拉取元数据
sender线程拉取,主线程等待(使用version和上一次版本号进行比较,来等待sender线程唤醒或者时间超时)
RecordAccumulator缓存区有一个对象batches,存在多个队列,每个队列代表一个分区
序列化消息
1
2
3
4
5
6byte[] serializedKey;
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue;
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());路由分区
1
2int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);在partition中默认先取ProducerRecord中设置的partition,如果没有指定就是用路由策略,使用
partitioner.class
进行配置,默认是DefaultPartitioner1
2
3
4
5
6
7private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}写入内部缓存
此时会将数据写入内部缓存batches中
1
2RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);消费缓存
ioThread线程会执行sender的run方法,来消费缓存中的数据然后进行发送
1
2
3
4
5
6RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
sendProduceRequests(batches, now);
sender线程
NetWorkClient Selector KafkaChannel NIO
kafka发消息只会发送到leader partition