0%

数据输入

数据输入

InputFormat作为MapReduce的数据输入,会将读入的数据拆分成一个个InputSplit,称为分片。每个InputSplit对应一个Map处理,RecordReader读取InputSplit的内容给Map。有很多的实现类,如TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat等。

InputSplit

InputSplit代表的是一个个的逻辑分片,并没有真正的存储数据,只是提供了一个如何将数据分片的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class InputSplit {
// 获取Split的大小,支持根据size对InputSplit排序
public abstract long getLength() throws IOException, InterruptedException;


// 获取存储该分片的数据所在节点的位置
public abstract
String[] getLocations() throws IOException, InterruptedException;


// 获取存储该分片的数据所在节点的节点信息
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}

实现类

TextInputFormat

框架默认的TextInputFormat是按行读取每条记录,键是存储该行在整个文件中的起始字节偏移量, LongWritable类型;值是这行的内容,不包括任何行终止符,Text类型

切片机制

对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下

  • 简单地按照文件的内容长度进行切片
  • 切片大小,默认等于Block大小
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
配置切片大小

底层中使用Math.max(minSize, Math.min(maxSize, blockSize));来计算切片大小

可以配置

1
2
3
4
5
6
7
8
9
10
<!-- 切片最小值 -->
<property>
<name>mapreduce.input.fileinputformat.split.minsize</name>
<value></value>
</property>
<!-- 切片最大值 -->
<property>
<name>mapreduce.input.fileinputformat.split.maxsize</name>
<value></value>
</property>

CombineTextInputFormat

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理,避免过多的Map任务(因为split的数目决定了Map的数目)

采用虚拟存储的方式,将文件与MaxInputSplitSize进行比较

1
2
3
4
// 默认是TextInputFormat,需要改为CombineFileInputFormat
job.setInputFormatClass(CombineFileInputFormat.class);
// 设置切片的最大值
CombineFileInputFormat.setMaxInputSplitSize(job,20*1024*1024);

KeyValueTextInputFormat

每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, “\t”);来设定分隔符。默认分隔符是tab

NLineInputFormat

代表每个map进程处理的InputSplit不再按Block块去划分,而是按NlineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1

由mapred.line.input.format.linespermap来设置N值

自定义

自定义一个类继承FileInputFormat,并重写getRecordReader方法,自定义RecordReader

欢迎关注我的其它发布渠道