0%

kafka API操作之主题

kafka提供了4类核心API

  • Producer API 生产消息相关接口,自定义生产者、自定义分区分配

  • Consumer API 消费消息相关接口,创建消费者、消费偏移量管理

  • Streams API 构建流处理程序的接口

  • Connect API kafka和外部系统进行数据流连接的连接器,实现将数据导入到kafka或从kafka中导出到外部系统。

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>

主题

主题创建

新版本的kafka使用AdminClient代替了之前的ZkUtils类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void createTopic(String topic){
AdminClient adminClient;
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
adminClient = AdminClient.create(properties);
// 主题 分区 副本
NewTopic newTopic = new NewTopic(topic,5,(short)1);
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
try{
result.all().get();
System.out.println("创建成功");
}catch (InterruptedException | ExecutionException e) {
System.out.println("创建失败");
} finally {
adminClient.close();
}
}

主题查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 创建client
* @return
*/
public static AdminClient createAdmin() {
AdminClient adminClient;
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
adminClient = AdminClient.create(properties);
return adminClient;
}

/**
* 列出主题
*/
public static void listTopic() throws ExecutionException, InterruptedException {
AdminClient adminClient = createAdmin();
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> topicNameSet = listTopicsResult.names().get();
System.out.println("----------列出主题-----------");
topicNameSet.forEach(System.out::println);
adminClient.close();
}

主题详细信息

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 主题详细信息
*
* @param topic
* @throws ExecutionException
* @throws InterruptedException
*/
public static void descTopic(String topic) throws ExecutionException, InterruptedException {
AdminClient adminClient = createAdmin();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic));
TopicDescription topicDescription = describeTopicsResult.all().get().get(topic);
System.out.println(topicDescription);
}

修改主题配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 修改主题配置
*
* @param topic
*/
public static void updateTopic(String topic) throws ExecutionException, InterruptedException {
AdminClient adminClient = createAdmin();
ConfigEntry configEntry = new ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "102400");
AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(configResource,Collections.singletonList(alterConfigOp));
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configs);
alterConfigsResult.all().get();
adminClient.close();
}

删除主题

1
2
3
4
5
6
7
8
/**
* 删除主题
* @param topic
*/
public static void deleteTopic(String topic){
AdminClient adminClient = createAdmin();
adminClient.deleteTopics(Collections.singletonList(topic));
}

新增分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 增加分区
*
* @param topic
* @param num
*/
public static void addPartitions(String topic, int num) {
// 增加分区数 num为分区总数
NewPartitions newPartitions = NewPartitions.increaseTo(num);
AdminClient adminClient = createAdmin();
Map<String, NewPartitions> config = new HashMap<>();
config.put(topic, newPartitions);
adminClient.createPartitions(config);
adminClient.close();
}