0%

spark streaming容错

spark streaming容错

为了spark streaming可以有强大的容错保障,必须设置检查点,检查点机制主要有两个目的

  • 控制发生失败时需要重算的状态数
  • 提供驱动器程序容错

驱动器程序容错

在驱动器重启的时候,如果想要从检查点来恢复数据,必须要使用特殊的方式来进行创建StreamingContext,使用getOrCreate方法可以使得在重启驱动器程序的时候,如果检查点文件存在,会重新从检查点目录中初始化出StreamingContext,然后继续处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val checkPointDir = "./checkpoint3"

// 驱动器程序容错
val context = StreamingContext.getOrCreate(checkPointDir, () => {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(checkPointDir)
// 业务逻辑要放在该方法内部,否则无法生效
val stream = ssc.receiverStream(new MyReceiver)
val map = stream.map((_, 1))

val window = map.reduceByKeyAndWindow(
{ (x, y) => x + y }, // 加上新进入窗口的批次中的元素
{ (x, y) => x - y }, //移除离开窗口的老批次中的元素
Seconds(10), //窗口时长
Seconds(1) //滑动步长
)

window.print()

ssc
})