0%

kafka客户端流程分析

生产者

初始化

  • kafka在初始化时会实例化一个sender对象用于发送消息

    1
    this.sender = newSender(logContext, kafkaClient, this.metadata);
  • 初始化时还会实例化一个ioThread线程

    1
    2
    3
    String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
    this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
    this.ioThread.start();

发送消息

  • 拉取元数据

    sender线程拉取,主线程等待(使用version和上一次版本号进行比较,来等待sender线程唤醒或者时间超时)

    RecordAccumulator缓存区有一个对象batches,存在多个队列,每个队列代表一个分区

  • 序列化消息

    1
    2
    3
    4
    5
    6
    byte[] serializedKey;
    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

    byte[] serializedValue;
    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

  • 路由分区

    1
    2
    int partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);

    在partition中默认先取ProducerRecord中设置的partition,如果没有指定就是用路由策略,使用partitioner.class进行配置,默认是DefaultPartitioner

    1
    2
    3
    4
    5
    6
    7
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
    partition :
    partitioner.partition(
    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
  • 写入内部缓存

    此时会将数据写入内部缓存batches中

    1
    2
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
    serializedValue, headers, interceptCallback, remainingWaitMs);
  • 消费缓存

    ioThread线程会执行sender的run方法,来消费缓存中的数据然后进行发送

    1
    2
    3
    4
    5
    6
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

    sendProduceRequests(batches, now);

sender线程
NetWorkClient Selector KafkaChannel NIO
kafka发消息只会发送到leader partition

欢迎关注我的其它发布渠道