共享变量之累加器
当一个传递给Spark操作的函数在远程节点上运行时,Spark实际上操作的是这个函数所用变量的独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。如果我们需要获取更新后的数据需要用到共享变量。
spark中有两种共享变量,累加器和广播变量
累加器用来对信息进行聚合,将工作节点的值聚合到驱动器
为什么需要累加器
先来个简单地示例,在不考虑分区的情况的情况下(只有一个分区),来进行数值的求和
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4)) |
前面的计算一直都正常,为什么最后会是0?
首先,由于数据是由Driver端提供的,然后Executor端进行执行的,foreach中所传入的函数其实组成了一个闭包,而sum作为参数就需要序列化传递进去,传递到Executor,此时Executor中使用的其实是变量的一份新的副本,Executor中的sum是一个定值0,然后进行计算,计算到结果为10,但是Executor计算完之后的变量并没有返回,而且更新这些副本的值并不会影响到Driver中对应的变量,所以Driver端的sum没有变化,还是0
如何解决这个问题呢?spark提供了累加器
累加器是什么
累加器是一个分布式共享只写变量(各个Executor之间并不能获取到),用来将Executor端变量信息聚合到Driver端,在Driver端定义的变量,在Executor端的每一个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge
累加器怎么用
1 | val rdd = sc.makeRDD(List(1, 2, 3, 4),2) |
根据结果可以看到,Executor端无法看到整体累加器中的值,只可以获取到自身累加器中累加的值