0%

生产者如何保证不丢数据

生产者如何保证不丢数据

生产者是通过ack确认机制来进行保证数据可靠的,topic的每一个partition收到生产者发送的数据后,都需要向生产者发送ack,如果生产者收到ack,则会进行下一轮的发送,否则就会重发数据。

而kafka中一个partition分区包含有一个leader和多个follower副本,而leader是与生产者进行读写交互的,那么partition中的leader何时向生产者回复ack呢?leader下有一个ISR(in-sync replica set),ISR存储的是与leader保持同步的follower集合,只有当ISR中的follower完成数据同步后,leader才会向生产者发送ack,如果follower长时间没有向leader同步数据,则该follower会被踢出ISR,该时间由replica.lag.time.max.ms 参数设定,如果leader发生故障之后,会从ISR中进行选举出新的leader

而kafka中ack确认机制有三种级别

acks确认机制,用于配置代理接收到消息后向生产者发送确认信号,以便生产者根据acks进行相应的处理,该机制通过属性request.required.acks设置,0、-1、1三种,默认1

  • acks=0时,生产者不需要等待代理返回确认消息,而连续发送消息。消息投递速度加快,broker接收了但是没有写入磁盘,但是无法保证消息的持久化,如果broker故障就可能丢失数据 (at most once)

  • acks=1时,生产者需要等待leader副本已成功将消息写入日志文件。降低了数据丢失的可能性,但还是存在,如果在leader成功存储数据后,follower没有来得及同步,此时leader挂掉了,而follower没有进行同步,从ISR中选举出来新的leader也就没有这条数据,数据就会丢失,默认为1

  • acks=-1(all)时,leader副本和所有ISR列表中的副本都完成数据存储之后才会向生产者发送确认消息,这种策略保证只要leader副本和follower副本中至少有一个节点存活,数据就不会丢失,但是如果数据同步完成之后,leader没有返回ack之前leader发生故障,导致生产者数据重发,会因此而导致数据重复 (at least once)

对于数据重复的情况,可以采用幂等性,保证同一条数据只存储一次,要启用幂等性,只需要将 Producer 的参数中 enable.idempotence 设置为 true 即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的重复。

在配置enable.idempotence为true时,需要保证max.in.flight.requests.per.connection<=5,且retries大于0,且acks为all

也可以在业务处理层面来为每条消息设置唯一id,数据库中没有该id的才进行消费

欢迎关注我的其它发布渠道