kafka提供了4类核心API
Producer API 生产消息相关接口,自定义生产者、自定义分区分配
Consumer API 消费消息相关接口,创建消费者、消费偏移量管理
Streams API 构建流处理程序的接口
Connect API kafka和外部系统进行数据流连接的连接器,实现将数据导入到kafka或从kafka中导出到外部系统。
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> </dependencies>
|
主题
主题创建
新版本的kafka使用AdminClient代替了之前的ZkUtils类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void createTopic(String topic){ AdminClient adminClient; Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); adminClient = AdminClient.create(properties); NewTopic newTopic = new NewTopic(topic,5,(short)1); CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic)); try{ result.all().get(); System.out.println("创建成功"); }catch (InterruptedException | ExecutionException e) { System.out.println("创建失败"); } finally { adminClient.close(); } }
|
主题查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
public static AdminClient createAdmin() { AdminClient adminClient; Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); adminClient = AdminClient.create(properties); return adminClient; }
public static void listTopic() throws ExecutionException, InterruptedException { AdminClient adminClient = createAdmin(); ListTopicsResult listTopicsResult = adminClient.listTopics(); Set<String> topicNameSet = listTopicsResult.names().get(); System.out.println("----------列出主题-----------"); topicNameSet.forEach(System.out::println); adminClient.close(); }
|
主题详细信息
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public static void descTopic(String topic) throws ExecutionException, InterruptedException { AdminClient adminClient = createAdmin(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topic)); TopicDescription topicDescription = describeTopicsResult.all().get().get(topic); System.out.println(topicDescription); }
|
修改主题配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public static void updateTopic(String topic) throws ExecutionException, InterruptedException { AdminClient adminClient = createAdmin(); ConfigEntry configEntry = new ConfigEntry(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "102400"); AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(); configs.put(configResource,Collections.singletonList(alterConfigOp)); AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configs); alterConfigsResult.all().get(); adminClient.close(); }
|
删除主题
1 2 3 4 5 6 7 8
|
public static void deleteTopic(String topic){ AdminClient adminClient = createAdmin(); adminClient.deleteTopics(Collections.singletonList(topic)); }
|
新增分区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public static void addPartitions(String topic, int num) { NewPartitions newPartitions = NewPartitions.increaseTo(num); AdminClient adminClient = createAdmin(); Map<String, NewPartitions> config = new HashMap<>(); config.put(topic, newPartitions); adminClient.createPartitions(config); adminClient.close(); }
|