0%

kafka streams

kafka Streams

kafka Streams是一个用来构建流处理程序的java库,基于Kafka的分区水平扩展来对数据进行有序高效的处理,利用kafka的并发模型来实现透明的负载均衡,其充当了kafka普通消费者与生产者一样的角色,可以处理输入是一个kafka主题,输出是另一个kafka主题

可以解决的问题

  • 一次一件事件的处理而不是微批处理
  • 有状态的处理,包括连接操作(join)和聚合类操作
  • 提供了必要的流处理原语,包括高级流处理DSL和低级处理器API
    • 高级流处理DSL 提供了常用流处理变换操作
    • 低级流处理API 支持客户端自定义处理器并与状态仓库交互
  • 使用类似于DataFlow的模型来处理乱序数据的时间窗口问题
  • 分布式处理,并且有容错机制,可以快速地实现容错
  • 有重新处理数据的能力

基本概念

流(stream)是kafka Streams提供的最重要的抽象,它代表的是一个无限的、不断更新的数据集。

一个流就是由一个有序的、可重放的、支持故障转移的不可变的数据记录序列,其中每个数据记录被定义为一个键值对。

流处理器

一个流处理器(stream processor)是处理拓扑的一个节点,它代表了拓扑中的处理步骤。一个流处理器从它所在的拓扑上游接收数据,通过kafka streams提供的流处理的基本方法,如map()、filter()、join()以及聚合等方法,对数据进行处理,然后将处理后的一个或多个输出结果发送给下游流处理器。一个拓扑中的流处理器当中有Source处理器和Sink处理器两个特殊的流处理器。

  • Source处理器:在一个处理拓扑中该处理器没有任何上游处理器。该处理器从kafka的一个或多个主题消费数据作为处理拓扑的输入流,将该输入流发送到下游处理器。
  • Sink处理器:在一个处理拓扑中该处理器没有任何下游处理器。该处理器将从上游处理器接收到的任何数据发送到指定的主题当中。

处理器拓扑

处理器拓扑(processor topology)是流处理应用程序进行数据处理的计算逻辑。一个处理器拓扑是由流处理器和相连接的流组成的有向无环图,其中流处理器是图的节点,流是图的边。

kafka提供两种定义流处理拓扑的API。

  • kafka streams DSL API:提供了一些开箱即用的数据转换操作算子,如map、filter、join和聚合类算子,操作简单,但是不够灵活
  • Low-Level Processor API:允许开发者自定义处理器,构造处理器拓扑,还可以与状态仓库进行交互操作。由于可定制处理器,该类API相对来说更加灵活,开发者根据自己业务需要定制开发相应的处理逻辑,但同时涉及底层的操作处理,开发成本相对较高

时间

流处理定义了通用3种类型的时间

  • 事件时间(event time):指事件或数据记录产生的时间,即kafka消息源对应的时间
  • 处理时间(processing time):指事件或数据记录被流处理应用处理的时间点,也就是对消息源进行处理时的时间。处理时间一般晚于事件时间。
  • 摄入时间(ingestion time):指消息被处理后保存到kafka主题时间。有可能消息不会被处理而直接保存到主题当中,此时指的是存储时间。

kafka有时间戳类型,每条消息都会被附加一个时间戳,可以根据配置来设置消息的时间戳类型。通过message.timestamp.type配置来设置时间戳类型是LogAppendTime还是CreateTime,分别对应处理时间和存储时间。

Kafka Streams通过TimestampExtractor接口给每个数据记录赋一个时间戳,开发者可以根据不同的需要来确定时间戳的实现。每个数据记录赋予时间戳之后就可以对数据进行聚合操作,实现窗口功能,能够方便地解决数据乱序的问题。

状态

一些流处理并不关注状态,即对每个消息的处理都是相互独立的,如对消息进行简单的转换操作或者基于某些条件对消息进行筛选操作等。

某些场景可能需要保存流处理的中间结果,即流的中间状态。同时保存状态的话可以提供更多复杂的操作,如对流进行join,group和聚合操作等。Kafka Streams DSL提供了很多这样的包含状态的DSL。Kafka Streams提供了一种状态仓库,被流处理应用用来存储和查询状态数据,默认状态存储在本地RocksDB当中,存储路径通过参数state.dir配置,默认路径是/tmp/kafka-streams。Kafka Streams的每个Task使用一个或多个状态仓库,可通过API来访问和存储流处理需要的数据。这种状态仓库可以是持久化的键值对引擎,也可以是内存中的HashMap或其他方便合理的数据结构。

Kafka Streams对本地状态仓库提供了容错和自动恢复,这是由于本地状态本身是通过kafka进行复制的,所以当一个机器出现故障时,其他机器可以自动恢复本地状态,并且从故障出错点继续处理。

KStream和KTable

Kafka Stream定义了KStream和KTable两种基本抽象。两者的区别在于,KStream是一个由键值对构成的抽象记录流,每个键值对是一个独立单元,即使相同的key也不会被覆盖,类似数据库的插入操作;KTable可以理解为一个基于表主键的日志更新流,相同key的每条记录只保存最新的一条记录,类似数据库基于主键更新。

无论是记录流(用KStream定义)还是更新日志流(用KTable定义),都可以从一个或多个Kafka主题数据源来创建。一个KStream可以与另一个KStream或者KTable进行join操作,或者聚合成一个KTable,一个KTable也可以转换成一个KStream。KStream和KTable都提供了一系列的转换操作,每个转换操作都可以转化为一个KStream或者KTable对象,将这些转换操作连接在一起就构成了一个处理器拓扑。

窗口

对流处理时可能需要把数据记录按时间分组。窗口是流处理状态转换操作的基本条件,一个窗口相关操作通常需要存储中间状态,根据窗口的设置旧的状态在窗口中持续时间大于窗口大小之后就会被删除。一个窗口包括窗口大小滑动步长两个属性。

窗口大小是指一条记录在窗口中持续的时间,持续时间超过窗口大小的记录就会被删除。

滑动步长指定了一个窗口每次相对于前一个窗口向前移动的距离。滑动步长不可以大于窗口大小,否则会导致部分记录不属于任何窗口而不被处理。

跳跃时间窗口

滑动步长小于窗口大小,基于时间间隔,描述了一个大小固定、可能会重叠的窗口模型。

翻转时间窗口

滑动步长等于窗口大小,基于时间间隔,描述了一个大小固定、不可重叠、无间隙的窗口模型。一条记录仅属于唯一的窗口。

滑动窗口

大小固定并沿着时间轴连续滑动的窗口模型。如果两条数据的时间戳之差在窗口大小之内,则两条数据记录属于同一个窗口。在kafka流中,滑动窗口只有在join操作中才会用到

使用介绍

maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.6.0</version>
</dependency>

KStream 与 KTable

KStream

KStream是一个记录流,每条数据集都是一个独立的数据单元。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class KStreamTest {
public static void main(String[] args) {
Properties prop = init();

StreamsBuilder builder = new StreamsBuilder();
// 构造kstream日志流(数据来源来自 kafka的主题stream-input)
KStream<String,String> source = builder.stream("stream-input");
// 打印到控制台
source.print(Printed.<String, String>toSysOut());
Topology topology = builder.build();
System.out.println(topology.describe());
final KafkaStreams kafkaStreams = new KafkaStreams(topology,prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("hook"){
@Override
public void run() {
System.out.println("线程停止");
kafkaStreams.close();
latch.countDown();
}
});
try {
kafkaStreams.start();
latch.await();
} catch (InterruptedException e){
System.exit(1);
}
System.out.println("项目终止");
System.exit(0);


}

public static Properties init(){
Properties properties = new Properties();
// 流处理应用的id,必须指定
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"KStream-test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化反序列化
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//value序列化反序列化
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
return properties;
}
}
1
2
3
4
 kafka-console-producer --bootstrap-server localhost:9092 --topic stream-input --property parse.key=true                         
>aa bb
>aa cc
>aa dd
1
2
3
[KSTREAM-SOURCE-0000000000]: aa, bb
[KSTREAM-SOURCE-0000000000]: aa, cc
[KSTREAM-SOURCE-0000000000]: aa, dd
KTable

KTable是一个日志更新流,即相同Key数据集是进行更新操作,同一个Key总是保留最新的值

1
2
// 构造kstream日志流(数据来源来自  kafka的主题stream-input)
KTable<String,String> kTable = builder.table("stream-input");

计数程序

此程序是用来进行统计value出现的次数的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TestCount {
public static void main(String[] args) {
Properties prop = init();
StreamsBuilder builder = new StreamsBuilder();
KStream<String,String> stream = builder.stream("testCountInputTopic");
stream.print(Printed.toSysOut());
KTable<String,Long> table = stream
// .flatMapValues(line -> Arrays.asList(line.split("\\t")))
.groupBy((key,value) -> value)
//指定状态仓库的名称 (路径默认在/tmp/kafka-streams中)防止重启之后数据从头开始
.count(Materialized.as("counts-store"));
// 指定序列化
table.toStream().to("testCountOutputTopic", Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),prop);
kafkaStreams.start();
}

public static Properties init(){
Properties properties = new Properties();
// 流处理应用的id,必须指定
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"KStream-test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化反序列化
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//value序列化反序列化
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
return properties;
}
}

欢迎关注我的其它发布渠道