0%

RDD持久化

RDD持久化

默认情况下,Spark的RDD会在进行行动算子时重新计算,那么有时候如果多个行动算子想要重用一个RDD怎么办呢?可以使用rdd的持久化

缓存

使用cache方法或者persist方法来进行持久化

1
2
3
// 该方法内部调用的是persist(StorageLevel.MEMORY_ONLY),即缓存在内存中
// 内存如果不够,会采用LRU的缓存策略把最老的分区从内存中移除
rdd.cache()
1
2
3
// persist方法可以传不同的存储级别来进行缓存  
// 使用StorageLevel
rdd.persist(StorageLevel.DISK_ONLY)

常见的方式有

  • DISK_ONLY 存储到磁盘
  • MEMORY_ONLY 存储到内存
  • MEMORY_ONLY_SER 内存存放序列化后的数据
  • MEMORY_AND_DISK 如果内存放不下,则溢写到磁盘上
  • MEMORY_AND_DISK_SER 如果内存放不下,则溢写到磁盘上,在内存中存放序列化后的数据

由于RDD中并不存储数据,所以在调用持久化方法时并没有进行持久化,而是在执行后续的行动算子时,将该RDD执行的数据计算结果进行的持久化

1
2
// 把持久化的RDD从缓存中移除
rdd.unpersist()

作用

  • 进行持久化避免多次计算同一个RDD
  • 持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据,如果一个有持久化数据的节点发生故障,Spark会在需要用到缓存的数据时重新计算丢失的数据分区

持久化也可以防止任务执行失败导致从头开始计算,所以在不进行RDD重用时,对于需要长时间计算的RDD操作也可以进行持久化

检查点

还可以使用检查点来进行持久化,检查点将中间结果写入磁盘

1
2
3
4
5
6
7
// 设置检查点的存储路径
sc.setCheckpointDir("./checkpoint")

// 缓存持久化,避免重新跑一个job做checkpoint
wordToOne.cache()
// 数据检查点,进行检查点计算
wordToOne.checkpoint()

如果检查点之后的节点出现问题,可以从检查点重做血缘关系,减少开销

checkpoint也是在执行行动算子的时候才会触发

缓存和检查点的区别是什么的?

  • 缓存只是将数据存储起来进行数据重用,会在血缘关系中添加新的依赖CachedPartitions,并不会切断血缘关系,而检查点会切断血缘关系,在血缘关系中添加新的依赖ReliableCheckpointRDD,并重新建立血缘关系(即改变了数据源)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 缓存后最终的血缘关系
    (1) ShuffledRDD[4] at reduceByKey at TestPersist.scala:41 []
    +-(1) MapPartitionsRDD[3] at map at TestPersist.scala:28 []
    | CachedPartitions: 1; MemorySize: 616.0 B; DiskSize: 0.0 B
    | MapPartitionsRDD[2] at flatMap at TestPersist.scala:25 []
    | data/test.txt MapPartitionsRDD[1] at textFile at TestPersist.scala:22 []
    | data/test.txt HadoopRDD[0] at textFile at TestPersist.scala:22 []
    // 检查点后最终的血缘关系
    (1) ShuffledRDD[4] at reduceByKey at TestPersist.scala:41 []
    +-(1) MapPartitionsRDD[3] at map at TestPersist.scala:28 []
    | ReliableCheckpointRDD[5] at collect at TestPersist.scala:43 []
  • 缓存数据通常存储在内存或者磁盘(临时文件,作业执行完毕则删除),而检查点通常存于HDFS等高可用的分布式文件系统(可以长久的保存进行数据重用)

  • 检查点会单独在执行一次RDD任务,所以做检查点的同时做缓存,这样检查点的任务就可以从缓存中读取数据,不需要从头计算一次RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 执行任务的源码
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 执行任务
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
// 任务执行完之后还需要在进行检查点执行,再次执行一次任务
rdd.doCheckpoint()
}

欢迎关注我的其它发布渠道