defcoalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = newRandom(hashing.byteswap32(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed newCoalescedRDD( newShuffledRDD[Int, T, T]( mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true), newHashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { newCoalescedRDD(this, numPartitions, partitionCoalescer) } }
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// 准备集合 val list = List(1, 2, 3, 4, 5, 6) // 创建RDD val rdd: RDD[Int] = context.makeRDD(list, 3)
// 将原本的三个分区缩减为2个分区,但是默认情况下并不会将分区的数据重新打乱,可能会造成数据倾斜,造成的结果为 // 第一个分区 1 2 // 第二个分区 3 4 5 6 // 此时,可以使用第二个参数shuffle,默认为false,传入true,此时就会将数据重新打乱进行分配 // 第一个分区 1 4 5 // 第二个分区 2 3 6 // val result = rdd.coalesce(2) val result = rdd.coalesce(2,true) result.saveAsTextFile("output")
defreduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (_: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { caseSome(value) => Some(f(value, taskResult.get)) caseNone => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(thrownewUnsupportedOperationException("empty collection")) }
调用sc.runJob来执行作业
示例
1 2 3 4 5 6 7 8 9 10 11 12
// 准备集合 val list = List(1, 2, 3, 4) // 创建RDD val rdd: RDD[Int] = context.makeRDD(list, 2)
deftake(num: Int): Array[T] = withScope { val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2) if (num == 0) { newArray[T](0) } else { val buf = newArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L val left = num - buf.size if (partsScanned > 0) { // If we didn't find any rows after the previous iteration, quadruple and retry. // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. if (buf.isEmpty) { numPartsToTry = partsScanned * scaleUpFactor } else { // As left > 0, numPartsToTry is always >= 1 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor) } }
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
// 准备集合 val list = List(1, 2, 3, 4) // 创建RDD val rdd: RDD[Int] = context.makeRDD(list, 2)
// 获得前两个元素 val result = rdd.take(2)
context.stop()
aggregate操作
分区的数据通过初始值和分区内的数据进行运算,然后再和初始值进行分区间的数据运算
aggregate定义
1 2 3 4 5 6 7 8 9 10
defaggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult }
调用sc.runJob来执行作业
示例
1 2 3 4 5 6 7 8 9 10 11 12
// 准备集合 val list = List(1, 2, 3, 4) // 创建RDD val rdd: RDD[Int] = context.makeRDD(list, 2)
// 与aggregateByKey相似 不过aggregate的初始值既参与分区内运算也参与分区间运算 val result = rdd.aggregate(10)(_+_,_+_)
// 40 println(result)
context.stop()
fold操作
与aggregate不同的是分区内和分区间进行的操作相同
fold定义
1 2 3 4 5 6 7 8 9
deffold(zeroValue: T)(op: (T, T) => T): T = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult }
调用sc.runJob来执行作业
示例
1 2 3 4 5 6 7 8 9 10 11
// 准备集合 val list = List(1, 2, 3, 4) // 创建RDD val rdd: RDD[Int] = context.makeRDD(list, 2)