0%

共享变量之累加器

当一个传递给Spark操作的函数在远程节点上运行时,Spark实际上操作的是这个函数所用变量的独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。如果我们需要获取更新后的数据需要用到共享变量。

spark中有两种共享变量,累加器和广播变量

累加器用来对信息进行聚合,将工作节点的值聚合到驱动器

为什么需要累加器

先来个简单地示例,在不考虑分区的情况的情况下(只有一个分区),来进行数值的求和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val rdd = sc.makeRDD(List(1, 2, 3, 4))

var sum = 0
rdd.foreach(
num => {
println("加上数字"+num)
sum += num
println("累加为"+sum)
}
)
println("累加结束,最终结果"+sum)

加上数字1
累加为1
加上数字2
累加为3
加上数字3
累加为6
加上数字4
累加为10
累加结束,最终结果0

前面的计算一直都正常,为什么最后会是0?

阅读全文 »

scala闭包

闭包就是一个函数和与其相关的引用环境组成的一个整体

1
2
3
4
5
// minusNum函数体是一个匿名函数
def minusNum(num : Int) = (y:Int) => num-y
// 返回函数是一个对象,而 num 就是该对象的一个字段,他们共同形成一个闭包
// fun函数是一个闭包
val fun = minusNum(20)

能组成闭包的函数也是高阶函数,因为需要该函数返回一个函数,而在上例中

1
val fun = minusNum(20)
阅读全文 »

RDD持久化

默认情况下,Spark的RDD会在进行行动算子时重新计算,如果多个行动算子想要重用一个RDD怎么办呢?可以使用rdd的持久化

缓存

使用cache方法或者persist方法来进行持久化

1
2
3
// 该方法内部调用的是persist(StorageLevel.MEMORY_ONLY),即缓存在内存中
// 内存如果不够,会采用LRU的缓存策略把最老的分区从内存中移除
rdd.cache()
1
2
3
// persist方法可以传不同的存储级别来进行缓存  
// 使用StorageLevel
rdd.persist(StorageLevel.DISK_ONLY)
阅读全文 »

RDD依赖关系

在执行RDD的过程中如果某一个RDD出现问题,且RDD本身并不存储数据,需要从数据源的位置重新开始执行,那么出现问题的RDD如何找到它的数据源在哪里?

RDD在创建的时候会保存一系列的RDD依赖关系,根据这些依赖关系可以重新找到数据源来进行重新计算,两个相邻RDD之间的关系就是依赖关系,而将一系列依赖关系串联起来就是血缘关系

可以使用dependencies方法来查看RDD的依赖关系

1
2
3
4
5
rdd.dependencies

------------
List(org.apache.spark.OneToOneDependency@2133661d)
List(org.apache.spark.ShuffleDependency@1a3071d8)
阅读全文 »

RDD序列化

算子以外的操作都是在Driver端执行的,算子内的都是在Executor端执行的,而在进行Driver端向Executor端传值时,需要进行序列化,虽然Java序列化也可以完成,但是产生的字节过多,导致数据传输过慢,Spark在2.0开始支持Kryo序列化机制,速度是Serializable的10倍,默认不配置的情况下使用的是Java序列化,可以进行配置为Kryo序列化

1
2
3
4
5
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("kryo")
// 默认为org.apache.spark.serializer.JavaSerializer
// 如果注册了org.apache.spark.serializer.KryoSerializer
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Teacher]))