1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class H2EMapper extends Mapper<Void, Group, NullWritable, Text> {
private static final Logger LOGGER = LoggerFactory.getLogger(H2EMapper.class);
Gson gson = new Gson();
Text v = new Text();
@Override public void run(Context context) throws IOException, InterruptedException {
setup(context); ParquetInputFormat<Group> parquetInputFormat = new ParquetInputFormat<>(GroupReadSupport.class); try { while (context.nextKeyValue()) { InputSplit inputSplit = context.getInputSplit(); if(inputSplit instanceof FileSplit){ String name = ((FileSplit) inputSplit).getPath().getName();
TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), context.getTaskAttemptID()); try(RecordReader<Void,Group> recordReader = parquetInputFormat.createRecordReader(inputSplit, hadoopAttemptContext)){ recordReader.initialize(inputSplit,hadoopAttemptContext);
while (recordReader.nextKeyValue()){ map(recordReader.getCurrentKey(), recordReader.getCurrentValue(), context); } }
}
} } finally { cleanup(context); } }
@Override protected void map(Void key, Group value, Context context) throws IOException, InterruptedException { v.set(groupToJson(value)); context.write(NullWritable.get(), v); }
private String groupToJson(Group value){
Map<String,Object> object = new HashMap<>();
object.put("name", result.getString("name", 0).toString()); object.put("age", result.getInteger("age", 0)); return gson.toJson(object); }
}
|