0%

转换操作

转换操作

转换操作分为有状态转换和无状态转换

  • 在无状态转换操作中,每个批次的处理不依赖于之前批次的数据
  • 在有状态状态操作中,需要使用之前批次的数据或是中间结果来计算当前批次的数据,有状态操作基于滑动窗口的转化操作和追踪状态变化的转化操作

无状态转换

无状态转换的方法包括有map、flatMap、filter、repartition、reduceByKey、groupByKey等,如reduceByKey是只操作每个时间区间的数据,但是并不会统计不同区间的数据

有状态转换

有状态操作主要有两种类型:滑动窗口updateStateByKey,updateStateByKey操作需要设置检查点来确保容错性

使用updateStateByKey

有时在DStream操作时需要跨批次的进行数据统计,如需要统计累加的wordcount,可以使用有状态的转换操作updateStateByKey方法。
updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的

使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态

如果不设置检查点会出现错误

requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
// 第二个参数是采集周期
val streamingContext = new StreamingContext(sparkConf, Seconds(3))

// 设置检查点
streamingContext.checkpoint("./checkpoint")

val stream = streamingContext.receiverStream(new MyReceiver)

val map = stream.map((_, 1))
// updateStateByKey内需要传入一个函数
// 函数第一个参数 values 为当前批次单词频度
// 函数第二个参数 state 为以往批次单词频度
val wordCount = map.updateStateByKey[Int]((values: Seq[Int], state: Option[Int]) => {
val current = values.foldLeft(0)(_ + _)
val pre = state.getOrElse(0)
Some(current + pre)
})

wordCount.print()

// 启动采集器
streamingContext.start()

// 等待采集器停止
streamingContext.awaitTermination()

使用滑动窗口

滑动窗口的操作会在一个比StreamContext的批次间隔更长的时间范围内,通过整合多个批次的结果,来计算出整个窗口的结果,窗口需要设置两个重要的参数,窗口时长以及滑动步长,这两个参数都必须是StreamContext批次间隔的整数倍。窗口时长是控制每次计算最近的多少批次的数据,而滑动步长的默认值是与批次间隔一致,用来控制对新的DStream进行计算的间隔

1
2
3
4
5
6
7
// 统计的最新的十秒的数据
val window = map.reduceByKeyAndWindow(
{ (x, y) => x + y }, // 加上新进入窗口的批次中的元素
{ (x, y) => x - y }, //移除离开窗口的老批次中的元素
Seconds(10), //窗口时长
Seconds(1) //滑动步长
)

欢迎关注我的其它发布渠道