kafka消息丢失和重复消费问题
消息丢失
消息丢失可能发生在生产者端、消息队列、消费者端
生产者丢失数据
Kafka消息发送有两种方式:同步(sync)和异步(async),
默认是同步方式,可通过producer.type
属性进行配置。
Kafka通过配置request.required.acks
属性来确认消息的生产
- 0—-表示不进行消息接收是否成功的确认;
- 1—-表示当Leader接收成功时确认;
- -1—-表示Leader和Follower都接收成功时确认;
所以存在消息丢失的场景如下:
- acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;
- acks=1,同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
解决方法:
同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态
消息队列丢失数据
消息队列丢数据的原因是数据还没有同步,leader就挂了,这时其他的follower就会切换为leader导致数据丢失,那么解决这个问题首先就是要保证leader和follower的数据一致
- 可以设置acks为-1,此时leader和follower需要全部接收成功才会进行确认
二需要保证消息队列的高可用
- 设置
replication.factor
参数,该值大于1,保证每个partition至少有2个副本 - 设置
min.insync.replicas
参数,该值大于1,保证每个leader感知到至少有一个follower还在保持联系
消费者丢失数据
Kafka消息消费有两个consumer接口,Low-level API
和High-level API
:
- Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;
- High-level API:封装了对parition和offset的管理,使用简单;
如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就消失了,导致数据丢失
解决方法:
可以采用手动提交的方式,将业务逻辑完成之后在进行提交offset
消息重复消费
将消息的唯一标识保存到数据库或内存中,每次消费时判断是否处理过即可。