0%

kafka主题

kafka主题

kafka提供了一个kafka-topics.sh用于对主题的相关操作,如创建主题、删除主题、修改主题分区数和副本分配以及修改主题级别的配置信息,查看主题信息等操作。指令包括—list、—describe、—create、—alter和—delete。

主题脚本

1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

主题源码

根据脚本可以看出,主题的脚本实际调用的是TopicCommand来进行操作的

创建主题包括两个阶段,第一阶段是客户端将主题元数据写入zookeeper,称为客户端创建主题;第二阶段是控制器负责管理主题的创建,称为服务端创建主题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def main(args: Array[String]): Unit = {

val opts = new TopicCommandOptions(args)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")

// should have exactly one action
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")

opts.checkArgs()

val time = Time.SYSTEM
val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000,
Int.MaxValue, time)

var exitCode = 0
try {
// 创建操作
if(opts.options.has(opts.createOpt))
createTopic(zkClient, opts)
// 修改操作
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
// list列表操作
else if(opts.options.has(opts.listOpt))
listTopics(zkClient, opts)
// 详情操作
else if(opts.options.has(opts.describeOpt))
describeTopic(zkClient, opts)
// 删除操作
else if(opts.options.has(opts.deleteOpt))
deleteTopic(zkClient, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
zkClient.close()
Exit.exit(exitCode)
}

}

kafka主题命令

创建主题

  • 如果设置了auto.create.topics.enabled=true(默认为true),当生产者向一个未创建的主题发送消息时,就会自动创建一个拥有${num.partitions}个分区和${default.replication.factor}个副本的主题

  • 手动创建主题,使用kafka-topics.sh脚本

创建kafka-action主题 (3个分区 1个副本[由于我只有一个brokers] )

  • zookeeper参数 必传,zookeeper连接地址,已过期
  • bootstrap-server 必传,替换原本的zookeeper参数,连接kafka地址
  • partitions参数 必传,用于设置主题分区,通过分区分配策略,将一个主题的消息分散到多个分区并分别保存到不同的代理上,提高消息处理的吞吐量。kafka消费者和生产者可以采用多线程并行对主题消息进行处理,而每个线程处理的是一个分区的数据,因此分区实际上是kafka并行处理的基本单位。
  • replication-factor参数 必传,设置主题副本数,副本会被分不在不同的集群节点上,副本数不能超过节点数,3个节点的kafka集群最多只能有3个副本
  • config参数 可选,用来覆盖主题级别的默认配置
1
2
3
4
>kafka-topics --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --topic kafka-action
#创建成功
Created topic kafka-action.

创建好之后,logs文件夹里就会出现kafka-action-0、kafka-action-1、kafka-action-2三个分区的文件夹

之后去zookeeper客户端查看信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ls /brokers/topics/kafka-action/partitions
[0, 1, 2]

get /brokers/topics/kafka-action
{"version":1,"partitions":{"2":[0],"1":[0],"0":[0]}}
cZxid = 0x36
ctime = Tue May 12 14:32:07 CST 2020
mZxid = 0x36
mtime = Tue May 12 14:32:07 CST 2020
pZxid = 0x38
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 52
numChildren = 1

删除主题

删除主题的两种方式

  • 手动删除各节点${log.dir}目录下该主题文件夹,同时登录zookeeper客户端删除待删除主题对应的节点,主题保存在/brokers/topics和/config/topics目录下

  • 执行kafka-topics脚本删除,如果要彻底删除需要配置delete.topic.enable=true(默认false),否则并未真正删除,而是在zookeeper的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态

1
2
3
4
>kafka-topics --delete --bootstrap-server localhost:9092 --topic kafka-action

Topic kafka-action is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

查看主题

kafka提供了list和describe命令查看主题信息,list列出所有主题名,describe查看所有主题或者某个特定主题的信息

查看所有主题

1
2
>kafka-topics --list --bootstrap-server localhost:9092
kafka-test

查看详细信息

1
2
3
4
5
6
7
8
9
10
11
>kafka-topics --describe --bootstrap-server localhost:9092
Topic:kafka-test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: kafka-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-test Partition: 2 Leader: 0 Replicas: 0 Isr: 0

>kafka-topics --describe --zookeeper bootstrap-server:9092 --topic kafka-test
Topic:kafka-test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: kafka-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-test Partition: 2 Leader: 0 Replicas: 0 Isr: 0

查看正在同步的主题

通过describe与under-replicated-partitions命令组合使用,可以查看处于”under replicated”状态的分区,处于该状态的主题可能正在进行同步操作。

1
>kafka-topics --describe --bootstrap-server localhost:9092 --under-replicated-partitions

查看没有Leader的分区

通过describe与unavailable-partitions命令组合使用,可以查看没有leader副本的主题。

1
>kafka-topics --describe --bootstrap-server localhost:9092 --unavailable-partitions

查看主题覆盖的配置

通过describe和topics-with-overrides命令组合使用,可以查看主题覆盖了哪些配置。

1
>kafka-topics --describe --bootstrap-server localhost:9092 --topics-with-overrides

修改主题

创建一个主题后,可以通过alter命令对主题进行修改,包括修改主题级别的配置、增加主题分区、修改副本分配方案、修改主题offset等

修改主题级别配置

可以使用alter和config参数组合来修改或者增加新的配置以覆盖相应配置原来的值,或者alter和delete-config来删除相应配置使其恢复默认值

alter修改配置已被废除,现在使用kafka-configs脚本(后续介绍)

1
2
3
4
>kafka-topics --alter --bootstrap-server localhost:9092 --topic kafka-test --config max.message.bytes=204800
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic kafka-test.

增加分区

1
>kafka-topics --alter --bootstrap-server localhost:9092 --topic kafka-test --partitions 5

欢迎关注我的其它发布渠道