转换操作
转换操作分为有状态转换和无状态转换
- 在无状态转换操作中,每个批次的处理不依赖于之前批次的数据
- 在有状态状态操作中,需要使用之前批次的数据或是中间结果来计算当前批次的数据,有状态操作基于滑动窗口的转化操作和追踪状态变化的转化操作
无状态转换
无状态转换的方法包括有map、flatMap、filter、repartition、reduceByKey、groupByKey等,如reduceByKey是只操作每个时间区间的数据,但是并不会统计不同区间的数据
有状态转换
有状态操作主要有两种类型:滑动窗口和updateStateByKey,updateStateByKey操作需要设置检查点来确保容错性
使用updateStateByKey
有时在DStream操作时需要跨批次的进行数据统计,如需要统计累加的wordcount,可以使用有状态的转换操作updateStateByKey方法。
updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的
使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态
如果不设置检查点会出现错误
requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
1 | val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming") |
使用滑动窗口
滑动窗口的操作会在一个比StreamContext的批次间隔更长的时间范围内,通过整合多个批次的结果,来计算出整个窗口的结果,窗口需要设置两个重要的参数,窗口时长以及滑动步长,这两个参数都必须是StreamContext批次间隔的整数倍。窗口时长是控制每次计算最近的多少批次的数据,而滑动步长的默认值是与批次间隔一致,用来控制对新的DStream进行计算的间隔
1 | // 统计的最新的十秒的数据 |