0%

kafka生产者

kafka生产者

kafka使用脚本kafka-console-producer.sh命令发送消息

脚本

1
2
3
4
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

该脚本调用的是ConsoleProducer类

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def main(args: Array[String]): Unit = {

try {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))
// kafkaProducer是一个用java语言实现的kafka客户端,实现了producer接口,用于将消息(ProducerRecord)发送至代理
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})

var record: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
record = reader.readMessage()
if (record != null)
send(producer, record, config.sync)
} while (record != null)
} catch {
case e: joptsimple.OptionException =>
System.err.println(e.getMessage)
Exit.exit(1)
case e: Exception =>
e.printStackTrace
Exit.exit(1)
}
Exit.exit(0)
}

配置说明

在server.properties中配置kafka的各个属性

  • acks确认机制,用于配置代理接收到消息后向生产者发送确认信号,以便生产者根据acks进行相应的处理,该机制通过属性request.required.acks设置,0、-1、1三种,默认1

    • acks=0时,生产者不需要等待代理返回确认消息,而连续发送消息。消息投递速度加快,但是无法保证消息是否已经被代理接收,可能丢失数据

    • acks=1时,生产者需要等待leader副本已成功将消息写入日志文件。降低了数据丢失的可能性,但还是存在,如果在leader成功存储数据后,follower没有来得及同步,此时leader挂掉了,此时数据虽然已经存储了,但是原来的leader已经从集群中下线了,同时存活的代理再也不会有原来的leader副本存储的数据,数据就会丢失

    • acks=-1时,leader副本和所有ISR列表中的副本都完成数据存储之后才会向生产者发送确认消息,这种策略保证只要leader副本和follower副本中至少有一个节点存货,数据就不会丢失

  • batch.num.messages配置,kafka批量向代理发送消息,该配置表示每次批量发送消息的最大消息数,当生产者使用同步模式发送时该配置失效

  • message.send.max.retries 默认3 重试次数

  • retry.backoff.ms 默认值100 在生产者每次重试前,生产者会更新主题的MetaData信息,以此来检测新的Leader是否已经选举出来。因为Leader选举需要一定时间,所以此选项指定更新主题的MetaData之前生产者需要等待的时间,单位ms

  • queue.buffering.max.ms 默认1000 在异步模式下,表示消息被缓存的最长时间,单位ms,当到达该时间后消息将开始批量发送;若在异步模式下同时配置了缓存数据的最大值batch.num.messages,则达到这两个阈值之一都将开始批量发送消息

  • queue.buffering.max.messages 默认值10000 在异步模式下,在生产者必须被阻塞或者数据必须丢失之前,可以缓存到队列中的未发送的最大消息条数,即初始化消息队列的长度

  • batch.num.messages 默认值200 在异步模式每次批量发送消息的最大消息数

  • request.timeout.ms 默认值1500 在需要acks时,生产者等待代理应答超时时间,单位ms 若在该时间范围内还没有收到应答,则会发送错误到客户端

  • send.buffer.bytes 默认100k Socket发送缓冲区大小

  • topic.metadata.refresh.interval.ms 默认5min 生产者定时请求更新主题元数据的时间间隔。若设置为0,则在每个消息发送至后都会去请求数据

  • client.id 默认为console-producer 生产者指定的一个标识字段,在每次请求中包含该字段,用于追踪调用,根据该字段在逻辑上可以确认是哪个应用发出的请求

  • queue.enqueue.timeout.ms 默认2147483647 该值为0表示当队列没满时直接入队,满了则立即丢弃,负数表示无条件阻塞且不丢弃,正数表示阻塞达到该值时长后抛出QueueFullException异常

生产者命令

kafka-console-producer脚本提供了生产者发布消息调用kafka.tools.ConsoleProducer类

生产者发送消息

1
2
3
4
5
# 无key消息
>kafka-console-producer --broker-list localhost:9092 --topic kafka-test

# 有key消息 key和value之间用"Tab键"分隔
>kafka-console-producer --broker-list localhost:9092 --topic kafka-test --property parse.key=true

参数说明

  • broker-list 要连接的服务器(必传,kafka_2.12-2.50版本之前使用该参数)

  • bootstrap-server 要连接的服务器(必传,kafka_2.12-2.50版本之后使用该参数)

  • topic 接收消息的主题名称(必传)

  • batch-size 单个批处理中发送的消息数 200(默认值)

  • compression-codec 压缩编解码器 none、gzip(默认值) snappy、lz4、zstd

  • max-block-ms 在发送请求期间,生产者将阻止的最长时间 60000(默认值)

  • max-memory-ytes 生产者用来缓冲等待发送到服务器的总内存 33554432(默认值)

  • max-partition-memory-bytes 为分区分配的缓冲区大小 16384

  • message-send-max-retries 最大重试发送次数 3

  • metadata-expiry-ms 强制更新元数据的时间阈值(ms) 300000

  • producer-property 将自定义属性传递给生成器的机制 key=value 会覆盖配置文件中的配置

  • producer.config 生产者配置属性文件(producer-property优先于此配置)

  • property 自定义消息读取器 key.separator= 分隔符 ignore.error=true|false

  • request-required-acks 生产者请求的确认方式 0/1(默认)/all

  • request-timeout-ms 生产者请求的确认超时时间 1500(默认)

  • retry-backoff-ms 生产者重试前,刷新元数据的等待时间阈值 100(默认)

  • socket-buffer-size TCP接收缓冲大小 102400(默认)

  • timeout 消息排队异步等待处理的时间阈值 1000(默认)

  • sync 同步发送消息

  • version 显示kafka版本 不需要配合其他参数,显示本地版本

查看消息是否发送使用kafka-run-class脚本,该命令有一个time参数,支持-1(lastest)、-2(esrliest),默认-1

1
2
3
4
5
6
7
>kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka-test
//每个分区的偏移量
kafka-test:0:1
kafka-test:1:1
kafka-test:2:1
kafka-test:3:1
kafka-test:4:2

查看信息

kafka生产的消息以二进制的形式存在文件中,kafka提供了查看日志文件的kafka.tools.DumpLogSegments,files是必传参数,可传多个文件,逗号分隔

1
2
3
4
5
6
7
8
9
>kafka-run-class kafka.tools.DumpLogSegments --files ../../logs/kafka-test-1/00000000000000000000.log

Dumping ..\..\logs\kafka-test-1\00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1589276743099 size: 74 magic: 2 compresscodec: NONE crc: 2451158880 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 74 CreateTime: 1589277087794 size: 72 magic: 2 compresscodec: NONE crc: 3428219219 isvalid: true

## 查看消息内容
>kafka-run-class kafka.tools.DumpLogSegments --files ../../logs/kafka-test-1/00000000000000000000.log --print-data-log

生产者性能工具

kafka提供了一个生产者性能测试脚本kafka-producer-perf-test,可以对生产者进行调优。

脚本

1
2
3
4
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance "$@"

命令

脚本调优调用的是org.apache.kafka.tools.ProducerPerformance类

  • topic参数 指定生产者发送消息的目标主题

  • num-records 测试时发送消息的总条数

  • record-size 每条消息的字节数

  • throughput 限流控制(小于0时不进行限流;参数大于0,当已发送的消息总字节数与当前已执行的时间取整大于该字段时生产者线程会阻塞一段时间,生产者线程被阻塞时,在控制台可以看到输出一行吞吐量统计信息;参数等于0,生产者在发送一次消息之后检测满足阻塞条件时将会一直被阻塞)

  • producer-props 以键值对的形式指定配置,可同时指定多组配置,多组配置之间以空格隔开

  • producer.config 加载生产者级别的配置文件

1
2
3
4
5
6
>kafka-producer-perf-test --num-records 1000000 --record-size 1000 --topic kafka-test --throughput 1000000 --producer-props bootstrap.servers=localhost:9092 acks=all

147041 records sent, 29396.4 records/sec (28.03 MB/sec), 833.0 ms avg latency, 1111.0 ms max latency.
262480 records sent, 52485.5 records/sec (50.05 MB/sec), 651.0 ms avg latency, 1074.0 ms max latency.
338240 records sent, 67648.0 records/sec (64.51 MB/sec), 485.6 ms avg latency, 681.0 ms max latency.
1000000 records sent, 54188.793757 records/sec (51.68 MB/sec), 570.38 ms avg latency, 1111.00 ms max latency, 482 ms 50th, 974 ms 95th, 1039 ms 99th, 1101 ms 99.9th.
  • records sent 测试时发送的消息总数

  • records/sec 每秒发送的消息数来统计吞吐量

  • MB/sec 每秒发送的消息大小来统计吞吐量

  • avg latency 消息处理的平均耗时

  • max latency 消息处理最大耗时

  • 50th/95th/99th/99.9th 百分比的消息处理耗时

测试数据工具

kafka提供了一个用于生成测试数据的脚本kafka-verifiable-producer.sh,用于向指定主题发送自增整型数字消息

脚本

1
2
3
4
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer "$@"

命令

1
2
# max-message参数用于指定要发送的消息总数
kafka-verifiable-producer --broker-list localhost:9092 --topic test-verifiable --max-message 1000

kafkaProducer

kafkaProducer是一个用Java语言实现的kafka客户端,实现了Producer接口,用于将消息(ProducerRecord)发送到代理。kafkaProducer是线程安全的,在一个kafka集群中多线程之间共享同一个kafkaProducer实例比创建多个性能好。

kafkaProducer有一个缓存池,用于存储尚未向代理发送的消息,同时一个后台IO线程负责从缓存池读取消息构造请求,将消息发送至代理

源码解析

实例化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//默认情况下的配置,为了防止篇幅过大,将后面的注释去掉了
// 该默认配置在ProducerConfig类中
CONFIG = (new ConfigDef()).
define("bootstrap.servers", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.HIGH, "")
// RecordAccumulator中BufferPool的大小,默认32M
.define("buffer.memory", Type.LONG, 33554432L, Range.atLeast(0L), Importance.HIGH, "")
// 发送失败的重试次数
.define("retries", Type.INT, 0, Range.between(0, 2147483647), Importance.HIGH, "")
.define("acks", Type.STRING, "1", ValidString.in(new String[]{"all", "-1", "0", "1"}), Importance.HIGH, "")
.define("compression.type", Type.STRING, "none", Importance.HIGH, "")
//RecordBatch的大小,默认16K
.define("batch.size", Type.INT, 16384, Range.atLeast(0), Importance.MEDIUM, "")
// 生产者默认会将两次发送时间间隔内收集到的所有发送消息的请求聚合然后进行发送,该间隔默认1000ms
.define("linger.ms", Type.LONG, 0, Range.atLeast(0L), Importance.MEDIUM, "")
.define("client.id", Type.STRING, "", Importance.MEDIUM, "")
.define("send.buffer.bytes", Type.INT, 131072, Range.atLeast(-1), Importance.MEDIUM, "")
.define("receive.buffer.bytes", Type.INT, 32768, Range.atLeast(-1), Importance.MEDIUM, "")
// 生产者每次请求的最大字节数,默认1M
.define("max.request.size", Type.INT, 1048576, Range.atLeast(0), Importance.MEDIUM, "")
.define("reconnect.backoff.ms", Type.LONG, 50L, Range.atLeast(0L), Importance.LOW, "")
.define("reconnect.backoff.max.ms", Type.LONG, 1000L, Range.atLeast(0L), Importance.LOW, "")
.define("retry.backoff.ms", Type.LONG, 100L, Range.atLeast(0L), Importance.LOW, "")
// 消息发送或获取分区元数据信息时最大等待时间,默认60s
.define("max.block.ms", Type.LONG, 60000, Range.atLeast(0), Importance.MEDIUM, "")
.define("request.timeout.ms", Type.INT, 30000, Range.atLeast(0), Importance.MEDIUM, "")
// 强制更新metadata的时间间隔,默认5min
.define("metadata.max.age.ms", Type.LONG, 300000, Range.atLeast(0), Importance.LOW, "")
.define("metrics.sample.window.ms", Type.LONG, 30000, Range.atLeast(0), Importance.LOW, "")
.define("metrics.num.samples", Type.INT, 2, Range.atLeast(1), Importance.LOW, "")
.define("metrics.recording.level", Type.STRING, RecordingLevel.INFO.toString(), ValidString.in(new String[]{RecordingLevel.INFO.toString(), RecordingLevel.DEBUG.toString()}), Importance.LOW, "")
.define("metric.reporters", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.LOW, "")
// 设置每个连接的最大请求个数,默认5
.define("max.in.flight.requests.per.connection", Type.INT, 5, Range.atLeast(1), Importance.LOW, "")
.define("key.serializer", Type.CLASS, Importance.HIGH, "")
.define("value.serializer", Type.CLASS, Importance.HIGH, "")
.define("connections.max.idle.ms", Type.LONG, 540000, Importance.MEDIUM, "")
.define("partitioner.class", Type.CLASS, DefaultPartitioner.class, Importance.MEDIUM, "")
.define("interceptor.classes", Type.LIST, Collections.emptyList(), new NonNullValidator(), Importance.LOW, "")
.define("security.protocol", Type.STRING, "PLAINTEXT", Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).withClientSslSupport().withClientSaslSupport()
.define("enable.idempotence", Type.BOOLEAN, false, Importance.LOW, "")
.define("transaction.timeout.ms", Type.INT, 60000, Importance.LOW, "")
.define("transactional.id", Type.STRING, (Object)null, new NonEmptyString(), Importance.LOW, "");

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, Metadata metadata, KafkaClient kafkaClient) {
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = Time.SYSTEM;
//1、从配置项中解析出clientId,客户端指定该配置项的值来追踪程序运行情况
String clientId = config.getString("client.id");
if (clientId.length() <= 0) {
// 如果不存在clientId
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}

this.clientId = clientId;
String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
LogContext logContext;
if (transactionalId == null) {
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
} else {
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
}

this.log = logContext.logger(KafkaProducer.class);
this.log.trace("Starting the Kafka producer");
//2、创建和注册监控的相关对象
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
reporters.add(new JmxReporter("kafka.producer"));
this.metrics = new Metrics(metricConfig, reporters, this.time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
// 实例化分区器。分区器是用于为消息指定分区,客户端可以通过实现Partitioner接口自定义消息分配分区的规则。若用户没有自定义分区器,使用默认的DefaultPartitioner,该分区器的分配规则为:若消息指定了key,则对key取hash值,然后与可用分区数取模;若没有指定key,则通过一个随机数与可用分区数取模
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
long retryBackoffMs = config.getLong("retry.backoff.ms");
// 实例化消息Key和Value进行序列化操作的Serializer
if (keySerializer == null) {
this.keySerializer = Wrapper.ensureExtended((Serializer)config.getConfiguredInstance("key.serializer", Serializer.class));
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore("key.serializer");
this.keySerializer = Wrapper.ensureExtended(keySerializer);
}

if (valueSerializer == null) {
this.valueSerializer = Wrapper.ensureExtended((Serializer)config.getConfiguredInstance("value.serializer", Serializer.class));
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore("value.serializer");
this.valueSerializer = Wrapper.ensureExtended(valueSerializer);
}

userProvidedConfigs.put("client.id", clientId);
// 根据配置实例化一组拦截器,可以指定多个拦截器
List<ProducerInterceptor<K, V>> interceptorList = (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
this.interceptors = new ProducerInterceptors(interceptorList);
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
this.requestTimeoutMs = config.getInt("request.timeout.ms");
this.transactionManager = configureTransactionState(config, logContext, this.log);
int retries = configureRetries(config, this.transactionManager != null, this.log);
int maxInflightRequests = configureInflightRequests(config, this.transactionManager != null);
short acks = configureAcks(config, this.transactionManager != null, this.log);
this.apiVersions = new ApiVersions();
// 实例化用于存储消息的RecordAccumulator,消息累加器(类似于队列),发送的消息都会先被追加到消息累加器的一个双端队列Deque中,在消息累加器内部每一个主题的每一个分区TopicPartition对应一个双端队列,队列的元素是ProducerBatch ,而RProducerBatch 是由同一个主题发往同一个分区的多条消息组成,并将结果TopicPartition作为key,该TopicPartition所对应的双端队列作为value保存到一个ConcurrentMap类型的batches中。目的是为了当消息发送失败需要重试时,将消息优先插入到队列的头部,而最新的消息总是插入到队列的尾部,只有需要重试发送时才在队列头部插入,发送消息是从头部获取ProducerBatch ,实现了对发送失败的消息进行重试发送。消息累加器中存在一个BufferPool缓存数据结构,用于存储消息。
this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.totalMemorySize, this.compressionType, config.getLong("linger.ms"), retryBackoffMs, this.metrics, this.time, this.apiVersions, this.transactionManager);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"));
// 实例化用于消息发送相关的元数据信息。metadata的主要数据结构由两部分组成,一类是用于控制MetaData进行更新操作的相关配置信息,另一类是集群信息Cluster。
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new Metadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), true, true, clusterResourceListeners);
this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), this.time.milliseconds());
}
// 根据指定的安全协议${security.protocol}创建ChannelBuilder
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
//创建NetworkClient实例,底层是通过维持一个Socket连接来进行TCP通信的,用于生产者与各个代理进行Socket通信
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, "producer", channelBuilder, logContext), this.metadata, clientId, maxInflightRequests, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), this.requestTimeoutMs, this.time, true, this.apiVersions, throttleTimeSensor, logContext);
// 用于数据发送的Sender
this.sender = new Sender(logContext, (KafkaClient)client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt("max.request.size"), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong("retry.backoff.ms"), this.transactionManager, this.apiVersions);
String ioThreadName = "kafka-producer-network-thread | " + clientId;
// 守护线程,在后台不断轮询,发送消息给代理
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", clientId, this.metrics);
this.log.debug("Kafka producer started");
} catch (Throwable var26) {
this.close(0L, TimeUnit.MILLISECONDS, true);
throw new KafkaException("Failed to construct kafka producer", var26);
}
}
send方法分析
1
2
3
4
5
6
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
// 如果配置了拦截器,会调用onSend方法
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

doSend方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
// 获取metaData,只有获取到MetaData元数据信息才能真正进行消息的投递,该方法会一直被阻塞尝试去获取MetaData信息,超过maxBlockTimeMs依然没有获取到的话会抛异常
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
//序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 获取分区,如果客户端指定了分区则直接使用该分区,否则根据分区器分区策略计算
int partition = partition(record, serializedKey, serializedValue, cluster);
// 创建TopicPartition
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
// 检测消息长度
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
// 创建Callback对象
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 写BufferPool,将消息写入到RecordAccumulator的BufferPool中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 唤醒sender线程
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

waitOnMetadata方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();

if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);

metadata.add(topic);

Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);

long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic and the requested partition,
// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));

return new ClusterAndWaitTime(cluster, elapsed);
}

RecordAccumulator的append方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 变量        
private final Logger log;
private volatile boolean closed;
// 用于flush操作控制计数器
private final AtomicInteger flushesInProgress;
// 用于append操作控制计数器 为了追踪正在进行追加操作的线程数,以便在生产者close方法时,sender线程调用RecordAccumulator的abortIncompleteBatches方法时,放弃未处理完的请求,释放资源
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final long lingerMs;
private final long retryBackoffMs;
private final long deliveryTimeoutMs;
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 用于保存已写入内存而未被Sender处理的ProducerBatch
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
// 用于保存消息已发送但还未收到ack的TopicPartition
private final Map<TopicPartition, Long> muted;
private int drainIndex;
private final TransactionManager transactionManager;
private long nextBatchExpiryTimeMs = Long.MAX_VALUE;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
//append计数器+1
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// 获取TopicPartition中的双端队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 锁住dp,保证了只有一个TopicPartition可以进行append操作,保证同一分区数据是有序的
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 尝试进行消息写入
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}

// we don't have an in-progress record batch try to allocate a new batch
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 向BufferPool申请内存
buffer = free.allocate(size, maxTimeToBlock);
// 检查一下是否有相同TopicPartition的其他线程创建了ProducerBatch或者部分消息已经被sender处理释放掉了
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);

// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
// 释放内存
free.deallocate(buffer);
// append计数器减一
appendsInProgress.decrementAndGet();
}
}

tryAppend方法

1
2
3
4
5
6
7
8
9
10
11
12
13
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
// 先从队列中取出队尾ProducerBatch
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}

ProducerBatch的tryAppend方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 是否有足够的空间
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 写入ByteBuffer
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
// 取当前的maxRecordSize与写入的消息总长度相比更新当前ProducerBatch的maxRecordSize
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
// 更新lastAppendTime,记录最后追加时间
this.lastAppendTime = now;

FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
// ProducerBatch中的消息数+1
this.recordCount++;
return future;
}
}
sender线程

sender是在KafkaProducer实例化的时候启动的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void run(long now) {
if (transactionManager != null) {
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
transactionManager.resetProducerId();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
}

// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}

long pollTimeout = sendProducerData(now);
// 真正的发送方法
client.poll(pollTimeout, now);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
private long sendProducerData(long now) {
// 获取Cluster信息
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// if there are any partitions whose leaders are not known yet, force metadata update
// 存在没有找到分区Leader的主题
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
// 将这些主题信息加入到metadata中,强制更新metadata信息
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

// remove any nodes we aren't ready to send to
// 检测readyNodes中各节点的连接状态
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 检测状态,并根据一定条件决定是否为还未建立连接的节点创建连接
if (!this.client.ready(node, now)) {
// 如果与某个节点的连接还未就绪,则将该节点移除
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// create produce requests
// 根据nodeId进行分组
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

// 过滤掉超时的ProducerBatch
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
// 将Node转换为ClientRequest对象
sendProduceRequests(batches, now);
return pollTimeout;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);

this.inFlightRequests.add(inFlightRequest);
// 暂存在selector内部的kafkaChannel,没有真正的发送
selector.send(send);
}