0%

生产者如何保证不丢数据

生产者是通过ack确认机制来进行保证数据可靠的,topic的每一个partition收到生产者发送的数据后,都需要向生产者发送ack,如果生产者收到ack,则会进行下一轮的发送,否则就会重发数据。

而kafka中一个partition分区包含有一个leader和多个follower副本,而leader是与生产者进行读写交互的,那么partition中的leader何时向生产者回复ack呢?leader下有一个ISR(in-sync replica set),ISR存储的是与leader保持同步的follower集合,只有当ISR中的follower完成数据同步后,leader才会向生产者发送ack,如果follower长时间没有向leader同步数据,则该follower会被踢出ISR,该时间由replica.lag.time.max.ms 参数设定,如果leader发生故障之后,会从ISR中进行选举出新的leader

阅读全文 »

消费者的分区分配策略

消费者是存在于消费者组的,如果一个消费者组中包含有多个消费者,那么消费者组内的消费者如何进行分区分配呢?

kafka有四种分区分配策略,

  • RoundRobinAssignor,该分区策略是对于所有的分区和组内所有的消费者进行轮询分配,例如,有两个消费者C0,C1和两个主题T0,T1,且每个主题存在三个分区t0p0,t0p1,t0p2,t1p0,t1p1,t1p2,分区分配结果是

    C0: [t0p0, t0p2, t1p1]

    C1: [t0p1, t1p0, t1p2]

    但是如果消费者组内的消费者所订阅的主题并不相同的话,就会出现问题

  • RangeAssignor,面向topic

  • CooperativeStickyAssignor

  • StickyAssignor

在consumer.properties使用

1
partition.assignment.strategy=class org.apache.kafka.clients.consumer.RangeAssignor

进行配置,默认使用的是RangeAssignor

故障时保证数据一致性

kafka中还存在两个概念

LEO(Log End Offset):表示当前日志文件中下一条要写入的消息的offset,即当前日志分区中最后一条消息的offset值加一,每个副本都有自己的LEO

HW(High Watermark):指消费者能见到的最大的offset,ISR队列中最小的LEO。将一个分区对应的ISR 队列中最小的LEO作为HW,HW之前的消息表示已提交的消息,对消费者是可见的,消费者最多只能消费到HW所在的位置,HW之后的消息表示还没有被Follower副本同步完成。每个副本都有自己的HW,副本Leader和Follower各自负责更新自己的HW状态

这两个概念主要是用来保证kafka在发生故障时的数据一致性问题

  • 如果follower 故障

    follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该Partition的HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了

  • 如果leader 故障

    leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

windows进行端口映射

1. 查询端口映射情况

1
netsh interface portproxy show v4tov4

2. 查询某一个IP的所有端口映射情况

1
2
3
netsh interface portproxy show v4tov4 | find "[IP]"
# 例:
netsh interface portproxy show v4tov4 | find "192.168.1.1"

3. 增加一个端口映射

1
2
3
netsh interface portproxy add v4tov4 listenaddress=[外网IP] listenport=[外网端口] connectaddress=[内网IP] connectport=[内网端口]
# 例:
netsh interface portproxy add v4tov4 listenaddress=2.2.2.2 listenport=8080 connectaddress=192.168.1.50 connectport=80

4. 删除一个端口映射

1
2
3
netsh interface portproxy delete v4tov4 listenaddress=[外网IP] listenport=[外网端口]
# 例:
netsh interface portproxy delete v4tov4 listenaddress=2.2.2.2 listenport=8080

kafka数据存储机制

以三个broker为例

先看一下zookeeper中存储的broker信息

1
2
ls /brokers/ids
[0, 1, 2]

kafka数据存储机制之broker

创建一个三个分区两个副本的topic

1
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 3 --replication-factor 2 --topic first
阅读全文 »