0%

kafka分区

分区操作

分区Leader平衡

当创建一个主题时,该主题的分区及副本会被均匀的分配到kafka集群的相应节点上,这样优先副本也在集群中均匀分布。当一个主题创建后优先副本会作为分区的Leader副本,Leader负责所有的读写操作。当Leader节点发生故障时,就会从Follower节点中选出新的Leader,这样可能会导致集群负载不均衡,当原Leader节点恢复后再次加入集群也不会主动成为Leader副本。kafka提供了两种方式重新选择优先副本作为分区Leader的方法,使得集群负载重新达到平衡。

  • 自动平衡 在代理节点启动时,设置auto.leader.rebalance.enable=true,默认为true,控制器在故障转移操作时会启动一个定时任务,每个${leader.imbalance.check.interval.secords}秒(默认5min)触发一次分区分配均衡操作,且只有在代理的不均衡的百分比达到${leader.imbalance.er.broker.percentage}(该值默认为10,即不均衡比例达到10%)以上才会真正执行分区重新分配操作。若配置为false,失效前是Leader副本,节点恢复后只是一个Follower副本

  • 手动平衡 kafka提供一个对分区Leader重新平衡的工具脚本kafka-preferred-replica-election.sh,该工具将优先副本选举为Leader,从而重新让集群分区达到平衡。

在运行期间kafka集群整个宕机了,然后重启了一个节点之后,此时所有的Leader副本都在该节点下,可以使用手动平衡来重新分配

首先先写一个分配的json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"partitions":[
{
"topic":"test-kafka-cluster",
"partition":0
},
{
"topic":"test-kafka-cluster",
"partition":1
},
{
"topic":"test-kafka-cluster",
"partition":2
},
{
"topic":"test-kafka-cluster",
"partition":3
},
{
"topic":"test-kafka-cluster",
"partition":4
}
]
}
1
2
# 对该主题进行Leader平衡  path-to-json-file即为上述json的文件路径
>kafka-preferred-replica-election --bootstrap-server localhost:9091,localhost:9092,localhost:9093 --path-to-json-file ../../config/111.json

对于某一个节点挂掉的情况,只需要对该分区进行重新分配即可,在json中只需要写该分区编号就可以了

脚本

1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand "$@"

分区迁移

kafka-reassign-partitions.sh脚本,在集群扩容、节点下线等场景时对分区迁移操作

当下线一个节点前,需要将该节点的分区副本迁移到其他可用节点上,kafka并不会自动进行分区副本迁移,如果不进行手动重新分配,就会导致某些主题数据丢失和不可用的情况。在新增节点时,也只有新创建的主题才会分配到这些节点上,而之前主题的分区并不会自动分配到新加入的节点

脚本

1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"

节点下线

若需要将brokerId为2的节点下线,在下线前先通过kafka-reassign-partitions.sh脚本将该分区迁移到其他节点上

1
2
3
4
5
6
7
8
#重新分配之前
>kafka-topics --zookeeper localhost:2181 --describe --topic test-replication
Topic:test-replication PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test-replication Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test-replication Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test-replication Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test-replication Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test-replication Partition: 4 Leader: 0 Replicas: 0,2 Isr: 0,2
  • 生成分区分配方案 首先创建一个文件,以json字符串指定要进行分区重分配的主题。

    1
    2
    3
    4
    5
    6
    7
    8
    {
    "topics":[
    {
    "topic":"test-replication"
    }
    ],
    "version": 1
    }
    1
    2
    3
    4
    # broker-list 指定分区可迁移的broke列表,由于2要下线,需要将该节点的分区迁移到1/0
    # generate 表示表示生成一个分区副本分配方案
    >kafka-reassign-partitions --zookeeper localhost:2181 --topics-to-move-json-file ../../config/123.json --broker-list "1,0" --generate
    # 该命令原理为:从zookeeper中读取主题元数据信息及指定的有效代理,根据分区副本分配算法重新计算指定主题的分区副本分配方案

    该命令执行输出信息包含两部分,一部分是当前分区分配信息,一部分是根据指定的代理列表生成的分区分配方案

  • 生成分区副本分配方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"version": 1,
"partitions": [{
"topic": "test-replication",
"partition": 0,
"replicas": [0, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 3,
"replicas": [1, 0],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 2,
"replicas": [0, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 4,
"replicas": [0, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 1,
"replicas": [1, 0],
"log_dirs": ["any", "any"]
}]
}
  • 执行分区迁移
1
>kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file  ../../config/124.json --execute

分区迁移的原理是在目标节点上创建分区目录,然后复制原分区数据到目标节点,最后删除原节点数据

  • 查看分区进度
1
>kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file  ../../config/124.json --verify
1
2
3
4
5
6
7
8
#重新分配之后
>kafka-topics --zookeeper localhost:2181 --describe --topic test-replication
Topic:test-replication PartitionCount:5 ReplicationFactor:2 Configs:
Topic: test-replication Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test-replication Partition: 1 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: test-replication Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: test-replication Partition: 3 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test-replication Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1

集群扩容

与上述分区迁移相同的操作,只是由之前的减少节点,变成增加节点

生成分区副本方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"version": 1,
"partitions": [{
"topic": "test-replication",
"partition": 0,
"replicas": [2, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 3,
"replicas": [2, 0],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 2,
"replicas": [1, 0],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 4,
"replicas": [0, 1],
"log_dirs": ["any", "any"]
}, {
"topic": "test-replication",
"partition": 1,
"replicas": [0, 2],
"log_dirs": ["any", "any"]
}]
}

分区迁移时复制限流有两种方法

  • 通过动态修改配置
1
2
3
4
#设置数据复制被限流的副本列表
>kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test-replication --alter --add-config leader.replication.throttled.replicas=[0:0,1:1,2:2],follower.replication.throttled.replicas=[0:0,1:1,2:2]
#设置节点2的复制速率
>kafka-configs --zookeeper localhost:2181 --entity-type brokers --entity-name 2 --alter --add-config follower.replication.throttled.rate=1024,leader.replication.throttled.rate=1024
  • 通过kafka-reassign-partitions脚本支持的throttle参数设置
1
2
#使用throttle参数设置限流值
>kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file ../../config/124.json --execute --throttle 1024

增加副本

增加副本是分区迁移的一个特例,本质也是分区副本的重分配

分配方案自己确定,只是要保证各副本均匀分配在所有的节点即可

将原本的两个副本改为三个副本,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
"version": 1,
"partitions": [
{
"topic": "test-replication",
"partition": 0,
"replicas": [
0,
1,
2
]
},
{
"topic": "test-replication",
"partition": 1,
"replicas": [
1,
2,
0
]
},
{
"topic": "test-replication",
"partition": 2,
"replicas": [
2,
0,
1
]
},
{
"topic": "test-replication",
"partition": 3,
"replicas": [
0,
1,
2
]
},
{
"topic": "test-replication",
"partition": 4,
"replicas": [
1,
2,
0
]
}
]
}

执行分区副本重分配即可

1
>kafka-reassign-partitions --zookeeper localhost:2181 --reassignment-json-file  ../../config/125.json --execute