RDD持久化
默认情况下,Spark的RDD会在进行行动算子时重新计算,如果多个行动算子想要重用一个RDD怎么办呢?可以使用rdd的持久化
缓存
使用cache方法或者persist方法来进行持久化
1 | // 该方法内部调用的是persist(StorageLevel.MEMORY_ONLY),即缓存在内存中 |
1 | // persist方法可以传不同的存储级别来进行缓存 |
常见的方式有
- DISK_ONLY 存储到磁盘
- MEMORY_ONLY 存储到内存
- MEMORY_ONLY_SER 内存存放序列化后的数据
- MEMORY_AND_DISK 如果内存放不下,则溢写到磁盘上
- MEMORY_AND_DISK_SER 如果内存放不下,则溢写到磁盘上,在内存中存放序列化后的数据
由于RDD中并不存储数据,所以在调用持久化方法时并没有进行持久化,而是在执行后续的行动算子时,将该RDD执行的数据计算结果进行的持久化
1 | // 把持久化的RDD从缓存中移除 |
作用
- 进行持久化避免多次计算同一个RDD
- 持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据,如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重新计算丢失的数据分区
持久化也可以防止任务执行失败导致从头开始计算,所以在不进行RDD重用时,对于需要长时间计算的RDD操作也可以进行持久化
检查点
还可以使用检查点来进行持久化,检查点将中间结果写入磁盘
1 | // 设置检查点的存储路径 |
如果检查点之后的节点出现问题,可以从检查点重做血缘关系,减少开销
checkpoint也是在执行行动算子的时候才会触发
缓存和检查点的区别是什么的?
缓存只是将数据存储起来进行数据重用,会在血缘关系中添加新的依赖CachedPartitions,并不会切断血缘关系,而检查点会切断血缘关系,在血缘关系中添加新的依赖ReliableCheckpointRDD,并重新建立血缘关系(即改变了数据源)
1
2
3
4
5
6
7
8
9
10
11// 缓存后最终的血缘关系
(1) ShuffledRDD[4] at reduceByKey at TestPersist.scala:41 []
+-(1) MapPartitionsRDD[3] at map at TestPersist.scala:28 []
| CachedPartitions: 1; MemorySize: 616.0 B; DiskSize: 0.0 B
| MapPartitionsRDD[2] at flatMap at TestPersist.scala:25 []
| data/test.txt MapPartitionsRDD[1] at textFile at TestPersist.scala:22 []
| data/test.txt HadoopRDD[0] at textFile at TestPersist.scala:22 []
// 检查点后最终的血缘关系
(1) ShuffledRDD[4] at reduceByKey at TestPersist.scala:41 []
+-(1) MapPartitionsRDD[3] at map at TestPersist.scala:28 []
| ReliableCheckpointRDD[5] at collect at TestPersist.scala:43 []
缓存数据通常存储在内存或者磁盘(临时文件,作业执行完毕则删除),而检查点通常存于HDFS等高可用的分布式文件系统(可以长久的保存进行数据重用)
检查点会单独在执行一次RDD任务,所以做检查点的同时做缓存,这样检查点的任务就可以从缓存中读取数据,不需要从头计算一次RDD
1 | // 执行任务的源码 |