Shuffle机制
Map阶段之后,Reduce阶段之前的数据处理操作叫做Shuffle,包含了分区、压缩、排序、分组等处理
- 每个Mapper任务都有一个内存缓冲区,存储这mapper的输出结果
mapreduce.task.io.sort.mb
来调整缓冲区大小 - 当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,也就是溢写(spill.percent,默认是0.8,即缓冲区的0.8会触发溢写)
mapreduce.map.sort.spill.percent
配置阈值,默认0.8 - 溢写是由单独线程来完成的,不影响往缓冲区写mapper结果的线程
- 当溢写线程启动后,需要对这部分缓冲区空间内的key进行排序
graph TD; B1[数据]--流入-->B2[Mapper任务处理] B2--分区-->B3[内存缓冲区] B3--spill溢写:sort&Combiner-->B4(数据进行Merge)
- 如果client设置过Combiner,会将有相同key的value加起来,减少溢写到磁盘的数据量
- 当整个Mapper任务结束后再对磁盘中产生的所有临时文件进行Merge
- reduce从Task Tracker中copy数据,先放入内存缓冲区中
partition分区
默认的分区是HashPartitioner
1 | public class HashPartitioner<K, V> extends Partitioner<K, V> { |
根据key的hashCode对ReduceTask的个数取模,没办法控制哪个key存储在哪个分区
自定义Partitioner
继承Partitioner重写getPartition方法,设置job.setPartitionerClass();即可
分区数是由ReduceTask的数两决定的,所以需要设置ReduceTask的数量job.setNumReduceTasks();
排序
MapTask和ReduceTask都会对数据按照key进行排序,该操作属于Hadoop的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要,默认排序是按照字典顺序排序,且实现该排序的方法是快速排序
序列化的对象实现WritableComparable接口重写compareTo方法即可实现排序
分为三种排序
- 部分排序 对最终输出的每一个文件进行内部排序
- 全排序 对所有数据进行排序,通常只有一个reduce
- 二次排序 排序条件有两个
分组
继承WritableComparator类实现compare方法
job.setGroupingComparatorClass();