kafka主题
kafka提供了一个kafka-topics.sh用于对主题的相关操作,如创建主题、删除主题、修改主题分区数和副本分配以及修改主题级别的配置信息,查看主题信息等操作。指令包括—list、—describe、—create、—alter和—delete。
主题脚本
1 | exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" |
主题源码
根据脚本可以看出,主题的脚本实际调用的是TopicCommand来进行操作的
创建主题包括两个阶段,第一阶段是客户端将主题元数据写入zookeeper,称为客户端创建主题;第二阶段是控制器负责管理主题的创建,称为服务端创建主题。
1 | def main(args: Array[String]): Unit = { |
kafka主题命令
创建主题
如果设置了auto.create.topics.enabled=true(默认为true),当生产者向一个未创建的主题发送消息时,就会自动创建一个拥有${num.partitions}个分区和${default.replication.factor}个副本的主题
手动创建主题,使用kafka-topics.sh脚本
创建kafka-action主题 (3个分区 1个副本[由于我只有一个brokers] )
- zookeeper参数 必传,zookeeper连接地址,已过期
- bootstrap-server 必传,替换原本的zookeeper参数,连接kafka地址
- partitions参数 必传,用于设置主题分区,通过分区分配策略,将一个主题的消息分散到多个分区并分别保存到不同的代理上,提高消息处理的吞吐量。kafka消费者和生产者可以采用多线程并行对主题消息进行处理,而每个线程处理的是一个分区的数据,因此分区实际上是kafka并行处理的基本单位。
- replication-factor参数 必传,设置主题副本数,副本会被分不在不同的集群节点上,副本数不能超过节点数,3个节点的kafka集群最多只能有3个副本
- config参数 可选,用来覆盖主题级别的默认配置
1 | >kafka-topics --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --topic kafka-action |
创建好之后,logs文件夹里就会出现kafka-action-0、kafka-action-1、kafka-action-2三个分区的文件夹
之后去zookeeper客户端查看信息
1 | ls /brokers/topics/kafka-action/partitions |
删除主题
删除主题的两种方式
手动删除各节点${log.dir}目录下该主题文件夹,同时登录zookeeper客户端删除待删除主题对应的节点,主题保存在/brokers/topics和/config/topics目录下
执行kafka-topics脚本删除,如果要彻底删除需要配置delete.topic.enable=true(默认false),否则并未真正删除,而是在zookeeper的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态
1 | >kafka-topics --delete --bootstrap-server localhost:9092 --topic kafka-action |
查看主题
kafka提供了list和describe命令查看主题信息,list列出所有主题名,describe查看所有主题或者某个特定主题的信息
查看所有主题
1 | >kafka-topics --list --bootstrap-server localhost:9092 |
查看详细信息
1 | >kafka-topics --describe --bootstrap-server localhost:9092 |
查看正在同步的主题
通过describe与under-replicated-partitions命令组合使用,可以查看处于”under replicated”状态的分区,处于该状态的主题可能正在进行同步操作。
1 | >kafka-topics --describe --bootstrap-server localhost:9092 --under-replicated-partitions |
查看没有Leader的分区
通过describe与unavailable-partitions命令组合使用,可以查看没有leader副本的主题。
1 | >kafka-topics --describe --bootstrap-server localhost:9092 --unavailable-partitions |
查看主题覆盖的配置
通过describe和topics-with-overrides命令组合使用,可以查看主题覆盖了哪些配置。
1 | >kafka-topics --describe --bootstrap-server localhost:9092 --topics-with-overrides |
修改主题
创建一个主题后,可以通过alter命令对主题进行修改,包括修改主题级别的配置、增加主题分区、修改副本分配方案、修改主题offset等
修改主题级别配置
可以使用alter和config参数组合来修改或者增加新的配置以覆盖相应配置原来的值,或者alter和delete-config来删除相应配置使其恢复默认值
alter修改配置已被废除,现在使用kafka-configs脚本(后续介绍)
1 | >kafka-topics --alter --bootstrap-server localhost:9092 --topic kafka-test --config max.message.bytes=204800 |
增加分区
1 | >kafka-topics --alter --bootstrap-server localhost:9092 --topic kafka-test --partitions 5 |