0%

hadoop数据存入es

hadoop数据存入es

ES-Hadoop是Elasticsearch推出的专门用于对接Hadoop生态的工具,可以让数据在Elasticsearch和Hadoop之间双向移动,无缝衔接Elasticsearch与Hadoop服务,充分使用Elasticsearch的快速搜索及Hadoop批处理能力,实现交互式数据处理

使用依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

编写Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class H2EJob {

public static void main(String[] args) {

JobConf hdfsConf = new JobConf();
JobConf conf = new JobConf();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
// 队列名称
conf.set("mapreduce.job.queuename", "dev");
//ElasticSearch节点
conf.set("es.nodes", "192.168.1.222:9200");
//ElaticSearch Index/Type
conf.set("es.resource", "muindex/mytype");
conf.set("es.index.auto.create", "true");
//Hadoop上的数据格式为JSON,可以直接导入
conf.set("es.input.json", "yes");
// 会关闭节点的自动 discovery,只使用es.nodes声明的节点进行数据读写操作
// conf.set("es.nodes.wan.only","true");
// 指定json对象中那种field对应的值为es中document的id
// conf.set("es.mapping.id", "id");
// 防止只读
// conf.set("es.index.blocks.read_only_allow_delete", "false");
// conf.set("es.index.blocks.read_only", "false");
// 由于数据量过大,增大超时时间
conf.set("es.http.timeout", "30m");
// 认证
conf.set("es.net.http.auth.user", "user");
conf.set("es.net.http.auth.pass", "password");


Job job = Job.getInstance(conf, "h2eJob");
job.setJarByClass(H2EJob.class);
job.setMapperClass(H2EMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(EsOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));

boolean success = job.waitForCompletion(true);

}


}

编写Mapper

1
2
3
4
5
6
7
8
9
10
11
12
public class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
}

@Override
protected void map(Void key, Group value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}

该程序没有reducer,因为只是数据的透传,不需要做额外处理

ES-HADOOP配置参数

基本配置

  • es.resource 默认none,elasticsearch资源,格式为/
  • es.resource.read 该参数用于指定要从elasticsearch中读取的资源和类型,默认为es.resource的值
  • es.resource.write 该参数用于指定要写入的elasticsearch资源和类型,默认为es.resource的值
  • es.nodes 默认为localhost,指定要连接的elasticsearch节点
  • es.port 默认9200

读写配置

  • es.query 默认为none,表示在elasticsearch索引和类型下的所有数据都会返回,从elasticsearch中读取数据时可以使用三种格式
    • uri 用参数指定查询条件,如q=title:java
    • query dsl 如 {“query”:{“mach_all”:{}}}
    • external resource 指定包含uri或query dsl的文件,如/path/to/query.json
  • es.input.json 默认为false,指定输入是否为json格式
  • es.write.operation 默认为index,索引存在/不存在某个文档时的写入行为
    • index 写入一个新的文档或者更新一个旧的文档
    • create 向elasticsearch写入一个新的文档时,索引中原来已经存在了相同id的文档,elasticsearch会抛出异常
    • update 试图更新一个elasticsearch不存在响应id的文档时,会抛出异常
    • upsert 写入新的文档之前,相同id的文档如果存在就将新文档和旧文档合并
  • es.update.script 默认为none,更新文档时使用的脚本
  • es.update.script.lang 默认为none,指定脚本语言
  • es.update.script.params 默认为none,指定脚本参数
  • es.update.script.params.json 默认为none,使用json来指定脚本参数
  • es.batch.size.bytes 默认为1mb,使用elasticsearch bulk时指定单次批处理写入的字节数
  • es.batch.size.entries 默认1000,使用elasticsearch bulk时指定单次批处理写入的文档个数,与es.batch.size.bytes联合使用时,任意一个满足条件都会触发批处理操作
  • es.batch.write.refresh 默认为true,表示在批处理写完时进行刷新操作
  • es.batch.write.retry.count 默认为3,批处理操作重试次数,如果为负数,则会进行无限次的重试
  • es.batch.write.retry.wait 默认10s,表示两次批处理操作的重试时间间隔
  • es.ser.reader.value.class 该参数用于指定将json转为对象的ValueReader的名称
  • es.ser.writer.value.class 该参数用于指定将对象转为json的ValueWriter的名称
  • es.update.retry.on.conflict 默认为0,检测冲突重试的次数

映射配置

  • es.mapping.id 默认为none,如果输入的文档中包含了参数中指定的字段,会将其映射为elasticsearch文档中相应的元数据字段,es.mapping.id=id该配置会将输入文档中的id映射为elasticsearch文档的_id字段
  • es.mapping.parent 默认为none,会将输入文档的字段映射到elasticsearch中的_parent字段
  • es.mapping.version 默认为none,将输入文档的相应字段映射为_version字段
  • es.mapping.version.type 默认为none,如果设置了es.mapping.version,该参数应该指定 internal、external、external_gt、external_gte、force
  • es.mapping.routing 默认为none,将输入文档中指定的字段映射到elasticsearch中的_routing字段
  • es.mapping.ttl 默认为none,将输入文档中指定的字段映射到elasticsearch中的_ttl字段
  • es.mapping.timestamp 默认为none,将输入文档中指定的字段映射到elasticsearch中的_timestamp字段
  • es.mapping.date.rich 默认true,用于指定返回Date类型还是string类型还是long类型
  • es.mapping.include 默认none,默认是将所有字段都包含在内,配置的话字段逗号分隔
  • es.mapping.exclude 默认none,默认是没有将字段排除在外,配置的话字段逗号分隔

索引配置

  • es.index.auto.create 默认yes,设置为no表示如果之前elasticsearch中不存在该索引,将会失败
  • es.index.read.missing.as.empty 默认为no,默认表示如果索引不存在将抛出异常,设置为yes则表示如果索引不存在就返回一个空数据集
  • es.field.read.empty.as.null 默认为yes,是否将空字段作为null
  • es.field.read.validate.presence 默认为warn,出现字段丢失时的行为
    • ignore 表示不进行任何校验
    • warn 表示会将警告信息记录在日志
    • strict 抛出一个异常

网络配置

  • es.nodes.discovery 默认为true,指定是否对集群中的其他节点进行寻找,还是只使用es.nodes中的节点
  • es.nodes.client.only 默认为false,是否将所有请求重定向到急群众的客户端
  • es.http.timeout 默认为1m,指定elasticsearch连接的超时时间
  • es.http.retries 默认为3,指定HTTP请求失败时重试的次数
  • es.scroll.keepalive 默认为10m,用于指定滚动查询的超时时间
  • es.scroll.size 默认值50,用于指定每次滚动返回的文档个数
  • es.action.heart.beat.lead 默认15s,确认hadoop的作业在正常运行的超时时间,超过该时间如果检测不到作业就会重启该任务

认证配置

  • es.net.http.auth.user 认证用户
  • es.net.http.auth.pass 认证密码

SSL配置

  • es.net.ssl 默认false,表示是否启用ssl
  • es.net.ssl.keystore.location 指定key store的位置
  • es.net.ssl.keystore.pass 指定key store的密码
  • es.net.ssl.keystore.type 指定key store的类型,默认JKS
  • es.net.ssl.truststore.location 指定trust store的位置
  • es.net.ssl.truststore.pass 指定trust store的密码
  • es.net.ssl.cert.allow.self.signed 默认false,指定是否可以使用自签名的证书
  • es.net.ssl.protocol 默认TLS,指定SSL协议

代理配置

  • es.net.proxy.http.host HTTP代理主机名称
  • es.net.proxy.http.port HTTP代理主机端口
  • es.net.proxy.http.user HTTP代理用户
  • es.net.proxy.http.pass HTTP代理密码
  • es.net.proxy.http.use.system.props 默认yes,用于确定是否使用http.proxyHost和http.proxyPort这样的HTTP代理参数
  • es.net.proxy.socks.host HTTP代理主机名称
  • es.net.proxy.socks.port HTTP代理主机端口
  • es.net.proxy.socks.user HTTP代理用户
  • es.net.proxy.socks.pass HTTP代理密码
  • es.net.proxy.socks.use.system.props 默认yes,用于确定是否使用http.socksProxyHost和http.socksProxyPort这样的HTTP代理参数

欢迎关注我的其它发布渠道