分区操作
分区Leader平衡
当创建一个主题时,该主题的分区及副本会被均匀的分配到kafka集群的相应节点上,这样优先副本也在集群中均匀分布。当一个主题创建后优先副本会作为分区的Leader副本,Leader负责所有的读写操作。当Leader节点发生故障时,就会从Follower节点中选出新的Leader,这样可能会导致集群负载不均衡,当原Leader节点恢复后再次加入集群也不会主动成为Leader副本。kafka提供了两种方式重新选择优先副本作为分区Leader的方法,使得集群负载重新达到平衡。
自动平衡 在代理节点启动时,设置auto.leader.rebalance.enable=true,默认为true,控制器在故障转移操作时会启动一个定时任务,每个${leader.imbalance.check.interval.secords}秒(默认5min)触发一次分区分配均衡操作,且只有在代理的不均衡的百分比达到${leader.imbalance.er.broker.percentage}(该值默认为10,即不均衡比例达到10%)以上才会真正执行分区重新分配操作。若配置为false,失效前是Leader副本,节点恢复后只是一个Follower副本
手动平衡 kafka提供一个对分区Leader重新平衡的工具脚本kafka-preferred-replica-election.sh,该工具将优先副本选举为Leader,从而重新让集群分区达到平衡。
在运行期间kafka集群整个宕机了,然后重启了一个节点之后,此时所有的Leader副本都在该节点下,可以使用手动平衡来重新分配
首先先写一个分配的json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| { "partitions":[ { "topic":"test-kafka-cluster", "partition":0 }, { "topic":"test-kafka-cluster", "partition":1 }, { "topic":"test-kafka-cluster", "partition":2 }, { "topic":"test-kafka-cluster", "partition":3 }, { "topic":"test-kafka-cluster", "partition":4 } ] }
|
1 2
| # 对该主题进行Leader平衡 path-to-json-file即为上述json的文件路径 >kafka-preferred-replica-election --bootstrap-server localhost:9091,localhost:9092,localhost:9093 --path-to-json-file ../../config/111.json
|
对于某一个节点挂掉的情况,只需要对该分区进行重新分配即可,在json中只需要写该分区编号就可以了
脚本
1
| exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand "$@"
|
分区迁移
kafka-reassign-partitions.sh脚本,在集群扩容、节点下线等场景时对分区迁移操作
当下线一个节点前,需要将该节点的分区副本迁移到其他可用节点上,kafka并不会自动进行分区副本迁移,如果不进行手动重新分配,就会导致某些主题数据丢失和不可用的情况。在新增节点时,也只有新创建的主题才会分配到这些节点上,而之前主题的分区并不会自动分配到新加入的节点
脚本
1
| exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"
|
节点下线
若需要将brokerId为2的节点下线,在下线前先通过kafka-reassign-partitions.sh脚本将该分区迁移到其他节点上
1 2 3 4 5 6 7 8
| #重新分配之前 >kafka-topics --zookeeper localhost:2181 --describe --topic test-replication Topic:test-replication PartitionCount:5 ReplicationFactor:2 Configs: Topic: test-replication Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: test-replication Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test-replication Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test-replication Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: test-replication Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| { "version": 1, "partitions": [{ "topic": "test-replication", "partition": 0, "replicas": [0, 1], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 3, "replicas": [1, 0], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 2, "replicas": [0, 1], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 4, "replicas": [0, 1], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 1, "replicas": [1, 0], "log_dirs": ["any", "any"] }] }
|
1
| >kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file ../../config/124.json --execute
|
分区迁移的原理是在目标节点上创建分区目录,然后复制原分区数据到目标节点,最后删除原节点数据
1
| >kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file ../../config/124.json --verify
|
1 2 3 4 5 6 7 8
| #重新分配之后 >kafka-topics --zookeeper localhost:2181 --describe --topic test-replication Topic:test-replication PartitionCount:5 ReplicationFactor:2 Configs: Topic: test-replication Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test-replication Partition: 1 Leader: 0 Replicas: 1,0 Isr: 0,1 Topic: test-replication Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1,0 Topic: test-replication Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: test-replication Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
|
集群扩容
与上述分区迁移相同的操作,只是由之前的减少节点,变成增加节点
生成分区副本方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| { "version": 1, "partitions": [{ "topic": "test-replication", "partition": 0, "replicas": [2, 1], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 3, "replicas": [2, 0], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 2, "replicas": [1, 0], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 4, "replicas": [0, 1], "log_dirs": ["any", "any"] }, { "topic": "test-replication", "partition": 1, "replicas": [0, 2], "log_dirs": ["any", "any"] }] }
|
分区迁移时复制限流有两种方法
1 2 3 4
| #设置数据复制被限流的副本列表 >kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test-replication --alter --add-config leader.replication.throttled.replicas=[0:0,1:1,2:2],follower.replication.throttled.replicas=[0:0,1:1,2:2] #设置节点2的复制速率 >kafka-configs --zookeeper localhost:2181 --entity-type brokers --entity-name 2 --alter --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024
|
- 通过kafka-reassign-partitions脚本支持的throttle参数设置
1 2
| #使用throttle参数设置限流值 >kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file ../../config/124.json --execute --throttle 1024
|
增加副本
增加副本是分区迁移的一个特例,本质也是分区副本的重分配
分配方案自己确定,只是要保证各副本均匀分配在所有的节点即可
将原本的两个副本改为三个副本,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| { "version": 1, "partitions": [ { "topic": "test-replication", "partition": 0, "replicas": [ 0, 1, 2 ] }, { "topic": "test-replication", "partition": 1, "replicas": [ 1, 2, 0 ] }, { "topic": "test-replication", "partition": 2, "replicas": [ 2, 0, 1 ] }, { "topic": "test-replication", "partition": 3, "replicas": [ 0, 1, 2 ] }, { "topic": "test-replication", "partition": 4, "replicas": [ 1, 2, 0 ] } ] }
|
执行分区副本重分配即可
1
| >kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file ../../config/125.json --execute
|