0%

操作Parquet

操作Parquet

由于性能问题,将文件存储格式从text改为了Parquet,那么hadoop如何读取Parquet文件呢?

先添加一下parquet依赖

1
2
3
4
5
6
7
8
9
10
11
<!--添加Parquet依赖-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>

主要还是看Mapper的读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class H2EMapper extends Mapper<Void, Group, NullWritable, Text> {

private static final Logger LOGGER = LoggerFactory.getLogger(H2EMapper.class);

Gson gson = new Gson();

Text v = new Text();

@Override
public void run(Context context) throws IOException, InterruptedException {

setup(context);

ParquetInputFormat<Group> parquetInputFormat = new ParquetInputFormat<>(GroupReadSupport.class);
try {
while (context.nextKeyValue()) {
InputSplit inputSplit = context.getInputSplit();

if(inputSplit instanceof FileSplit){
// 获取文件名
String name = ((FileSplit) inputSplit).getPath().getName();

TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), context.getTaskAttemptID());
// 读取parquet文件
try(RecordReader<Void,Group> recordReader = parquetInputFormat.createRecordReader(inputSplit, hadoopAttemptContext)){
recordReader.initialize(inputSplit,hadoopAttemptContext);

while (recordReader.nextKeyValue()){
map(recordReader.getCurrentKey(), recordReader.getCurrentValue(), context);
}
}


}


}
} finally {
cleanup(context);
}
}

@Override
protected void map(Void key, Group value, Context context) throws IOException, InterruptedException {
v.set(groupToJson(value));
context.write(NullWritable.get(), v);
}

// 解析parquet文件
private String groupToJson(Group value){

Map<String,Object> object = new HashMap<>();

// 这里按照字段名去进行解析
object.put("name", result.getString("name", 0).toString());
object.put("age", result.getInteger("age", 0));

return gson.toJson(object);
}

}

欢迎关注我的其它发布渠道