0%

共享变量之累加器

共享变量之累加器

当一个传递给Spark操作的函数在远程节点上运行时,Spark实际上操作的是这个函数所用变量的独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。如果我们需要获取更新后的数据需要用到共享变量。

spark中有两种共享变量,累加器和广播变量

累加器用来对信息进行聚合,将工作节点的值聚合到驱动器

为什么需要累加器

先来个简单地示例,在不考虑分区的情况的情况下(只有一个分区),来进行数值的求和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val rdd = sc.makeRDD(List(1, 2, 3, 4))

var sum = 0
rdd.foreach(
num => {
println("加上数字"+num)
sum += num
println("累加为"+sum)
}
)
println("累加结束,最终结果"+sum)

加上数字1
累加为1
加上数字2
累加为3
加上数字3
累加为6
加上数字4
累加为10
累加结束,最终结果0

前面的计算一直都正常,为什么最后会是0?

首先,由于数据是由Driver端提供的,然后Executor端进行执行的,foreach中所传入的函数其实组成了一个闭包,而sum作为参数就需要序列化传递进去,传递到Executor,此时Executor中使用的其实是变量的一份新的副本,Executor中的sum是一个定值0,然后进行计算,计算到结果为10,但是Executor计算完之后的变量并没有返回,而且更新这些副本的值并不会影响到Driver中对应的变量,所以Driver端的sum没有变化,还是0

如何解决这个问题呢?spark提供了累加器

累加器是什么

累加器是一个分布式共享只写变量(各个Executor之间并不能获取到),用来将Executor端变量信息聚合到Driver端,在Driver端定义的变量,在Executor端的每一个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge

累加器怎么用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
// 声明累加器
val sumAcc = sc.longAccumulator
rdd.foreach(
num => {
println("加上数字"+num)
sumAcc.add(num)
println("累加为"+sumAcc.value)
}
)
println("累加结束,最终结果"+sumAcc.value)


加上数字1
累加为1
加上数字2
累加为3
加上数字3
累加为3
加上数字4
累加为7
累加结束,最终结果10

根据结果可以看到,Executor端无法看到整体累加器中的值,只可以获取到自身累加器中累加的值

欢迎关注我的其它发布渠道