spark streaming容错
为了spark streaming可以有强大的容错保障,必须设置检查点,检查点机制主要有两个目的
- 控制发生失败时需要重算的状态数
- 提供驱动器程序容错
驱动器程序容错
在驱动器重启的时候,如果想要从检查点来恢复数据,必须要使用特殊的方式来进行创建StreamingContext,使用getOrCreate方法可以使得在重启驱动器程序的时候,如果检查点文件存在,会重新从检查点目录中初始化出StreamingContext,然后继续处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| val checkPointDir = "./checkpoint3"
val context = StreamingContext.getOrCreate(checkPointDir, () => { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(checkPointDir) val stream = ssc.receiverStream(new MyReceiver) val map = stream.map((_, 1))
val window = map.reduceByKeyAndWindow( { (x, y) => x + y }, { (x, y) => x - y }, Seconds(10), Seconds(1) )
window.print() ssc })
|