0%

RDD编程

RDD编程

RDD的创建方式

从集合(内存)中创建RDD

使用parallelize或者makeRDD方法传入序列即可

1
2
3
4
5
6
7
8
9
// 准备集合
val list = List(1,2,3,4)
// 创建RDD
val rdd = context.parallelize(list)
// 该方法内部也是直接调用的parallelize(seq, numSlices)
context.makeRDD(list)
rdd.collect().foreach(println)

context.stop()

从外部存储(文件)创建 RDD

使用textFile和wholeTextFiles方法传入文件路径即可

其中,textFile方法是以行为单位的,返回的是文件内的一行一行的数据

而wholeTextFiles是以文件单位的,返回的是文件名称和文件内容的元组

1
2
3
4
5
6
7
8
9
// 从文件中读取,以行为单位
val rdd = context.textFile("data")
rdd.collect().foreach(println)

// wholeTextFiles是以文件为单位,返回的是一个元组,第一个为文件,第二个为文件内容
// (file:/Users/zhanghe/Desktop/user/myself/GitProject/test/data/11.txt,hello world)
val rdd1 = context.wholeTextFiles("data")
rdd1.collect().foreach(println)
context.stop()

设置分区

在进行RDD创建的时候可以设置分区数,makeRDD的第二个参数即是分区数,能够并行计算的任务数量

1
2
3
4
5
6
7
8
9
10
// 准备集合
val list = List(1,2,3,4)
// 创建RDD
// 第二个参数默认是从SparkConf中取spark.default.parallelism,如果没有则使用电脑的CPU核数
// scheduler.conf.getInt("spark.default.parallelism", totalCores)
val rdd = context.makeRDD(list,2)

rdd.saveAsTextFile("output")

context.stop()

RDD操作

RDD中的方法又叫做RDD算子,可以将计算逻辑发送到Executor执行

RDD分为两种类型的操作,一种是转换,一种是执行(行动)

转换就代表着对于RDD的功能的补充和封装(即由一个RDD转换为另一个RDD),如map、flatmap方法,转化出来的RDD是惰性求值的,只有在行动算子中用到这些RDD才会进行计算

执行就代表着触发任务的调度和作业的执行,并将结果返回给Driver端,如collect方法

RDD转换算子

RDD的转换算子根据数据处理方式不同分为单值类型、双值类型和键值类型

单值类型
map操作

map操作是将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

map操作的定义

1
2
3
4
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

可以发现map的入参是一个函数,返参是一个MapPartitionsRDD,相当于是对于调用者RDD对象的再次封装,封装为了MapPartitionsRDD

1
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd = context.makeRDD(list)

// 所要执行的函数
def mapFunction(num: Int) = {
num * 2
}

val result = rdd.map(mapFunction)
//2
//4
//6
//8
result.collect().foreach(println)

context.stop()
mapPartitions操作

mapPartitions操作是将待处理的数据以分区为单位发送到计算节点进行处理

mapPartitions操作的定义

1
2
3
4
5
6
7
8
9
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

mapPartitions的入参也是一个函数,不过这个函数的入参和返参都是一个迭代器

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd = context.makeRDD(list,2)

// 化简
val result = rdd.mapPartitions(
(iter:Iterator[Int]) => {
println("########")
iter.map(_*2)
}
)

result.collect().foreach(println)

context.stop()

//########
//########
//2
//4
//6
//8

map和mapParttitions有什么区别呢?

➢ 数据处理角度

Map算子是分区内一个数据一个数据的执行,类似于串行操作;而 mapPartitions 算子是以分区为单位进行批处理操作

➢ 功能的角度

Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据;MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

➢ 性能的角度

Map 算子因为类似于串行操作,所以性能比较低;而 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,推荐使用 map 操作

mapPartitionsWithIndex操作

有时候会根据不同的分区进行不同的操作,那么可以使用mapPartitionsWithIndex操作来进行,mapPartitionsWithIndex操作可以将待处理的数据以分区为单位发送到计算节点进行处理,在处理时同时可以获取当前分区索引,相对于mapParttitions来说,多了一个分区索引

mapPartitionsWithIndex操作的定义

1
2
3
4
5
6
7
8
9
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd = context.makeRDD(list,2)

val result = rdd.mapPartitionsWithIndex(
// index是分区索引,从0开始
(index:Int,iter:Iterator[Int]) => {
if(index == 0){
iter.map(_*100)
} else if(index == 1){
iter.map(_*200)
} else {
Nil.iterator
}
}
)

result.collect().foreach(println)

context.stop()

//100
//200
//600
//800
flatMap操作

将处理的数据进行扁平化后(将集合进行拆分)再进行映射处理

flatMap的定义

1
2
3
4
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 准备集合
val list = List(List(1, 2, 3), 4, 5, List(6, 7))
// 创建RDD
val rdd = context.makeRDD(list)

val result = rdd.flatMap {
list => {
list match {
// 如果是list则输出list
case list: List[_] => list
// 如果不是list,则转换为list
case list => List(list)
}

}

}

//1
//2
//3
//4
//5
//6
//7
result.collect().foreach(println)

context.stop()
glom操作

glom操作会将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

glom定义

1
2
3
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray))
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list,2)

// 会将分区内数据组合成数组
val result: RDD[Array[Int]] = rdd.glom()


result.collect().foreach(data => {
println(">>>>>>>>>")
println(data.mkString(","))
})

context.stop()

//>>>>>>>>>
//1,2
//>>>>>>>>>
//3,4
groupBy操作

groupBy操作会将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle

groupBy定义

1
2
3
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 会根据函数返回值作为key来分组
val result = rdd.groupBy((num: Int) => num % 2)

result.collect().foreach(println)

context.stop()

//(0,CompactBuffer(2, 4))
//(1,CompactBuffer(1, 3))
filter操作

filter操作是将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃

filter定义

1
2
3
4
5
6
7
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(_, _, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 根据函数来筛选
val result = rdd.filter((num: Int) => num % 2 == 0)

result.collect().foreach(println)

context.stop()

//2
//4
distinct操作

distinct操作是将数据集中重复的数据去重

distinct定义

1
2
3
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 准备集合
val list = List(1, 2, 3, 4, 5, 4, 2)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 去重
val result = rdd.distinct()

result.collect().foreach(println)

context.stop()

//4
//2
//1
//3
//5
coalesce操作

coalesce操作是根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

coalesce定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 准备集合
val list = List(1, 2, 3, 4, 5, 6)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 3)

// 将原本的三个分区缩减为2个分区,但是默认情况下并不会将分区的数据重新打乱,可能会造成数据倾斜,造成的结果为
// 第一个分区 1 2
// 第二个分区 3 4 5 6
// 此时,可以使用第二个参数shuffle,默认为false,传入true,此时就会将数据重新打乱进行分配
// 第一个分区 1 4 5
// 第二个分区 2 3 6
// val result = rdd.coalesce(2)
val result = rdd.coalesce(2,true)
result.saveAsTextFile("output")

context.stop()
repartition操作

repartition是对于coalesce的又一层封装,shuffle设置为true,可以进行缩减分区和扩大分区

repartition定义

1
2
3
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
sortBy操作

排序,中间存在 shuffle 的过程,会打乱数据,默认是升序

sortBy定义

1
2
3
4
5
6
7
8
9
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 准备集合
val list = List(6, 2, 3, 1, 5, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 3)
// 默认是升序,第二个参数可以设置升序或降序
val result = rdd.sortBy(num => num)
result.collect().foreach(println)
context.stop()

//1
//2
//3
//4
//5
//6
双值操作

双值操作是指对两个RDD数据集进行操作

intersection操作

intersection操作是进行交集,源 RDD 和参数 RDD 求交集后返回一个新的 RDD

intersection定义

1
2
3
4
5
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 准备集合
val list = List(1, 2, 3, 4)
val list2 = List(5, 6, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)
val rdd2: RDD[Int] = context.makeRDD(list2, 2)

// 交集
val result = rdd.intersection(rdd2)

result.collect().foreach(println)

context.stop()

//4
//3
union操作

union操作是进行并集,源 RDD 和参数 RDD 求并集后返回一个新的 RDD

union定义

1
2
3
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 准备集合
val list = List(1, 2, 3, 4)
val list2 = List(5, 6, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)
val rdd2: RDD[Int] = context.makeRDD(list2, 2)

// 并集
val result = rdd.union(rdd2)

result.collect().foreach(println)

context.stop()

//1
//2
//3
//4
//5
//6
//3
//4
subtract操作

subtract操作是进行差集,源 RDD 和参数 RDD 求差集后返回一个新的 RDD

subtract定义

1
2
3
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 准备集合
val list = List(1, 2, 3, 4)
val list2 = List(5, 6, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)
val rdd2: RDD[Int] = context.makeRDD(list2, 2)

// 差集
val result = rdd.subtract(rdd2)

result.collect().foreach(println)

context.stop()

//2
//1
zip操作

zip操作是进行拉练操作,将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD
中的元素,Value 为第 2 个 RDD 中的相同位置的元素

zip定义

1
2
3
4
5
6
7
8
9
10
11
12
13
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +
"same number of elements in each partition")
}
def next(): (T, U) = (thisIter.next(), otherIter.next())
}
}
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 准备集合
val list = List(1, 2, 3, 4)
val list2 = List(5, 6, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)
val rdd2: RDD[Int] = context.makeRDD(list2, 2)

// 拉链
val result = rdd.zip(rdd2)

result.collect().foreach(println)

context.stop()

//(1,5)
//(2,6)
//(3,3)
//(4,4)
键值操作

键值操作中的函数都不属于RDD类中的,而是属于PairRDDFunctions类中的,那么为什么RDD对象却可以调用呢,是由于在RDD中有一个隐式转换函数

1
2
3
4
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

可以看到只有RDD[(K,V)]类型的才会进行转换,所以只有键值类型的才可以调用该方法

partitionBy操作

partitionBy操作是将数据按照指定 Partitioner 重新进行分区

partitionBy定义

1
2
3
4
5
6
7
8
9
10
11
// 入参是一个分区器
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 将原本的数据转为K-V类型的
val mapRdd = rdd.map((_, null))
// 分区
val result = mapRdd.partitionBy(new HashPartitioner(2))

result.saveAsTextFile("output")

context.stop()
reduceByKey操作

reduceByKey操作是将数据按照相同的 Key 对 Value 进行聚合,合并相同键的值

reduceByKey定义

1
2
3
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
// 准备集合
val list = List(("a",1),("b",2),("a",3),("b",4),("a",5))
// 创建RDD
val rdd = context.makeRDD(list, 2)

// 聚合
val result = rdd.reduceByKey(_+_)
result.collect().foreach(println)

context.stop()

//(b,6)
//(a,9)
groupByKey操作

groupByKey操作是将数据源的数据根据 key 对 value 进行分组

groupByKey定义

1
2
3
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
// 准备集合
val list = List(("a",1),("b",2),("a",3),("b",4),("a",5))
// 创建RDD
val rdd = context.makeRDD(list, 2)

// 分组
val result = rdd.groupByKey()
result.collect().foreach(println)

context.stop()

//(b,CompactBuffer(2, 4))
//(a,CompactBuffer(1, 3, 5))

reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高

aggregateByKey操作

由于reduceBykey对于分区内和分区间的操作都是聚合操作,但是实际的业务中可能分区内和分区间采用不同的计算方式,aggregateByKey操作是将数据根据不同的规则进行分区内计算和分区间计算

aggregateByKey定义

1
2
3
4
5
6
7
8
9
// 该方法采用了柯里化
// 第一个参数是初始值
// 第二个参数包含两个函数
// 第一个函数是分区内操作
// 第二个函数是分区间操作
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 准备集合
val list = List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
// 创建RDD
val rdd = context.makeRDD(list, 2)

// 第一个参数是初始值
// 第二个参数包含两个函数
// 第一个函数是分区内操作 分区内根据key取最大值
// 第二个函数是分区间操作 分区间根据key聚合
val result = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
result.collect().foreach(println)

context.stop()

//(b,6)
//(a,6)
flodByKey操作

由于aggregateByKey操作是将数据根据不同的规则进行分区内计算和分区间计算,如果分区内和分区间的计算规则相同,还需要写两次函数,而foldByKey就是对于aggregateByKey这种情况的简化,当分区内计算规则和分区间计算规则相同时使用foldByKey即可

foldByKey定义

1
2
3
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
foldByKey(zeroValue, defaultPartitioner(self))(func)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 准备集合
val list = List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
// 创建RDD
val rdd = context.makeRDD(list, 2)

// 第一个参数是初始值
// 分区内和分区间都是进行求最大值
val result = rdd.foldByKey(0)((x, y) => math.max(x, y))
result.collect().foreach(println)

context.stop()

//(b,4)
//(a,5)
combineByKey操作

combineByKey操作是最通用的对 key-value 型 rdd 进行聚集操作的聚集函数,允许用户返回值的类型与输入不一致

combineByKey定义

1
2
3
4
5
6
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 准备集合
val list = List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
// 创建RDD
val rdd = context.makeRDD(list, 2)

// 第一个参数是更改数据的格式
// 第二个参数 分区内操作 分区内根据key分别进行累加和计数 元组中第一个元素为累加,第二个元素为计数
// 第三个参数 分区间操作 分区内根据key分别进行累加和计数 元组中第一个元素为累加,第二个元素为计数
val combineResult = rdd.combineByKey(
(v => (v, 1)),
(x: (Int, Int), y) => {
(x._1 + y, x._2 + 1)
},
(x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})

// 求平均值
val result = combineResult.mapValues((res) => {
res._1 / res._2
})

result.collect().foreach(println)

context.stop()

//(b,3)
//(a,3)
join操作

join操作与数据库中的join操作(内连接)相似,根据key来进行关联操作,会每条数据依次进行匹配,可能会造成数据的笛卡尔积乘积,数据的几何式增长,同样的还有leftOuterJoin左外连接和rightOuterJoin右外连接

join定义

1
2
3
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 准备集合
val list = List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5))
val list1 = List(("a", 6), ("b", 5), ("a", 4), ("b", 3), ("a", 2))
// 创建RDD
val rdd = context.makeRDD(list, 2)
val rdd1 = context.makeRDD(list1, 2)

val result = rdd.join(rdd1)

result.collect().foreach(println)

context.stop()

//(b,(2,5))
//(b,(2,3))
//(b,(4,5))
//(b,(4,3))
//(a,(1,6))
//(a,(1,4))
//(a,(1,2))
//(a,(3,6))
//(a,(3,4))
//(a,(3,2))
//(a,(5,6))
//(a,(5,4))
//(a,(5,2))

RDD行动算子

上述的转换算在都是在进行RDD的再次封装,而行动算子是进行RDD的真正执行的,会触发作业的执行

reduce操作

进行数据的聚合,先聚合分区内数据,再聚合分区间数据

reduce定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (_: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
10
11
12
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 聚合
val result = rdd.reduce(_+_)

// 10
println(result)

context.stop()
collect操作

进行数据的采集,按照分区顺序进行采集,返回数组

collect定义

1
2
3
4
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 采集
val result = rdd.collect()

context.stop()
count操作

返回元素的个数

count定义

1
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
10
11
12
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 计数
val result = rdd.count()

// 4
println(result)

context.stop()
first操作

返回第一个元素

first定义

1
2
3
4
5
6
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
}

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
10
11
12
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 获得第一个元素
val result = rdd.first()

// 1
println(result)

context.stop()
take操作

返回前几个元素

take定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def take(num: Int): Array[T] = withScope {
val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
val left = num - buf.size
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}

val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}

buf.toArray
}
}

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 获得前两个元素
val result = rdd.take(2)

context.stop()
aggregate操作

分区的数据通过初始值和分区内的数据进行运算,然后再和初始值进行分区间的数据运算

aggregate定义

1
2
3
4
5
6
7
8
9
10
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
10
11
12
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

// 与aggregateByKey相似 不过aggregate的初始值既参与分区内运算也参与分区间运算
val result = rdd.aggregate(10)(_+_,_+_)

// 40
println(result)

context.stop()
fold操作

与aggregate不同的是分区内和分区间进行的操作相同

fold定义

1
2
3
4
5
6
7
8
9
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}

调用sc.runJob来执行作业

示例

1
2
3
4
5
6
7
8
9
10
11
// 准备集合
val list = List(1, 2, 3, 4)
// 创建RDD
val rdd: RDD[Int] = context.makeRDD(list, 2)

val result = rdd.fold(10)(_+_)

// 40
println(result)

context.stop()
countByKey操作

统计每种 key 的个数

countByKey定义

1
2
3
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

示例

1
2
3
4
5
6
7
8
9
10
11
// 准备集合
val list = List("a","b","c","a","a","c")
// 创建RDD
val rdd = context.makeRDD(list, 2)

val mapRdd = rdd.map((_,1))

val result = mapRdd.countByKey()

// Map(b -> 1, a -> 3, c -> 2)
println(result)