数据输入
InputFormat作为MapReduce的数据输入,会将读入的数据拆分成一个个InputSplit,称为分片。每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map。有很多的实现类,如TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat等。
InputSplit
InputSplit代表的是一个个的逻辑分片,并没有真正的存储数据,只是提供了一个如何将数据分片的方法
1 | public abstract class InputSplit { |
实现类
TextInputFormat
框架默认的TextInputFormat是按行读取每条记录,键是存储该行在整个文件中的起始字节偏移量, LongWritable类型;值是这行的内容,不包括任何行终止符,Text类型
切片机制
对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
配置切片大小
底层中使用Math.max(minSize, Math.min(maxSize, blockSize));来计算切片大小
可以配置
1 | <!-- 切片最小值 --> |
CombineTextInputFormat
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理,避免过多的Map任务(因为split的数目决定了Map的数目)
采用虚拟存储的方式,将文件与MaxInputSplitSize进行比较
1 | // 默认是TextInputFormat,需要改为CombineFileInputFormat |
KeyValueTextInputFormat
每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, “\t”);来设定分隔符。默认分隔符是tab
NLineInputFormat
代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1
由mapred.line.input.format.linespermap来设置N值
自定义
自定义一个类继承FileInputFormat,并重写getRecordReader方法,自定义RecordReader