0%

MDC日志跟踪

MDC是log4j,logback和java.util.logging中提供的一种方便在多线程下记录日志的功能,全称是Mapped Diagnostic Context,可以看做是一个与当前线程绑定的哈希表,依赖于ThreadLocalMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class LogInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//如果有上层调用就用上层的ID
String traceId = request.getHeader("traceId");
if (traceId == null) {
traceId = TraceIdUtil.getTraceId();
}

MDC.put("traceId", traceId);
return true;
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView)
throws Exception {
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
throws Exception {
//调用结束后删除
MDC.remove("traceId");
}
}

在配置日志格式时,使用%X{}来获取MDC中的值

1
<PatternLayout charset="UTF-8" pattern="[%-5p %d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceId}]] %l [%m]%n"/>

如果想要子线程获取到父线程的MDC内容,在父线程新建子线程之前调用MDC.getCopyOfContextMap()方法将MDC内容取出来传给子线程,子线程在执行操作前先调用MDC.setContextMap()方法将父线程的MDC内容设置到子线程

netty线程模型

目前存在的线程模型有

  • 传统阻塞I/O服务模型,采用阻塞IO获取输入的数据,每个连接都需要独立的线程操作,无法实现高并发

  • Reactor模式

    • 基于IO复用模型:多个连接共用一个阻塞对象,应用程序只需在一个阻塞对象等待,无需阻塞等待多个连接,当某个连接有新的数据可以处理时,操作系统通知应用程序,线程开始进行业务处理
    • 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务

    根据Reactor的数量和处理资源线程的数量不同,有三种实现

    • 单Reactor单线程

      所有IO操作都由一个线程完成,即多路复用、事件分发和处理都是在一个Reactor线程上完成的,既要接收客户端的连接请求,向服务器发起连接,又要读取请求或响应消息

      • 优点:模型简单,没有多线程问题,全都在一个线程完成
      • 缺点:单线程无法发挥多核CPU的性能,handler在处理某个连接上的业务时,无法处理其他连接的事件
    • 单Reactor多线程

      一个NIO线程只负责监听服务端,接收客户端的TCP连接请求;一个NIO线程池负责网络IO的操作,即消息的读取、编解码和响应消息

      • 优点:可以充分利用多核CPU
      • 缺点:多线程数据共享比较复杂
    • 主从Reactor多线程

      Acceptor线程用于绑定监听端口,接收客户端连接,将SocketChannel从主线程的Reactor线程的多路复用器上移除,重新注册到Sub线程池的线程上,用于处理IO的读写操作

      • 优点:父子线程数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理
      • 缺点:编程复杂度高
阅读全文 »

hadoop数据存入es

ES-Hadoop是Elasticsearch推出的专门用于对接Hadoop生态的工具,可以让数据在Elasticsearch和Hadoop之间双向移动,无缝衔接Elasticsearch与Hadoop服务,充分使用Elasticsearch的快速搜索及Hadoop批处理能力,实现交互式数据处理

使用依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>

编写Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class H2EJob {

public static void main(String[] args) {

JobConf hdfsConf = new JobConf();
JobConf conf = new JobConf();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
// 队列名称
conf.set("mapreduce.job.queuename", "dev");
//ElasticSearch节点
conf.set("es.nodes", "192.168.1.222:9200");
//ElaticSearch Index/Type
conf.set("es.resource", "muindex/mytype");
conf.set("es.index.auto.create", "true");
//Hadoop上的数据格式为JSON,可以直接导入
conf.set("es.input.json", "yes");
// 会关闭节点的自动 discovery,只使用es.nodes声明的节点进行数据读写操作
// conf.set("es.nodes.wan.only","true");
// 指定json对象中那种field对应的值为es中document的id
// conf.set("es.mapping.id", "id");
// 防止只读
// conf.set("es.index.blocks.read_only_allow_delete", "false");
// conf.set("es.index.blocks.read_only", "false");
// 由于数据量过大,增大超时时间
conf.set("es.http.timeout", "30m");
// 认证
conf.set("es.net.http.auth.user", "user");
conf.set("es.net.http.auth.pass", "password");


Job job = Job.getInstance(conf, "h2eJob");
job.setJarByClass(H2EJob.class);
job.setMapperClass(H2EMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(EsOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));

boolean success = job.waitForCompletion(true);

}


}

编写Mapper

1
2
3
4
5
6
7
8
9
10
11
12
public class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
}

@Override
protected void map(Void key, Group value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}

该程序没有reducer,因为只是数据的透传,不需要做额外处理

ES-HADOOP配置参数

基本配置

  • es.resource 默认none,elasticsearch资源,格式为<index>/<type>
  • es.resource.read 该参数用于指定要从elasticsearch中读取的资源和类型,默认为es.resource的值
  • es.resource.write 该参数用于指定要写入的elasticsearch资源和类型,默认为es.resource的值
  • es.nodes 默认为localhost,指定要连接的elasticsearch节点
  • es.port 默认9200
阅读全文 »

索引监控

elasticsearch提供了接口来监控索引的状态,包括索引的统计信息、碎片信息、恢复的状态以及分片信息

索引统计

索引统计提供了索引中不同内容的统计数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// 统计所有索引
GET _stats

// 统计某个索引
GET testmapping/_stats


{
"_all": {
"primaries": {
"docs": { // 文档数据
"count": 0,
"deleted": 0
},
"store": { // 索引的大小
"size_in_bytes": 261
},
"indexing": { // 索引统计数据
"index_total": 0,
"index_time_in_millis": 0,
"index_current": 0,
"index_failed": 0,
"delete_total": 0,
"delete_time_in_millis": 0,
"delete_current": 0,
"noop_update_total": 0,
"is_throttled": false,
"throttle_time_in_millis": 0
},
"get": { // 获取统计数据
"total": 0,
"time_in_millis": 0,
"exists_total": 0,
"exists_time_in_millis": 0,
"missing_total": 0,
"missing_time_in_millis": 0,
"current": 0
},
"search": { // 搜索统计数据
"open_contexts": 0,
"query_total": 1,
"query_time_in_millis": 0,
"query_current": 0,
"fetch_total": 1,
"fetch_time_in_millis": 0,
"fetch_current": 0,
"scroll_total": 0,
"scroll_time_in_millis": 0,
"scroll_current": 0,
"suggest_total": 0,
"suggest_time_in_millis": 0,
"suggest_current": 0
},
"merges": { //混合统计数据
"current": 0,
"current_docs": 0,
"current_size_in_bytes": 0,
"total": 0,
"total_time_in_millis": 0,
"total_docs": 0,
"total_size_in_bytes": 0,
"total_stopped_time_in_millis": 0,
"total_throttled_time_in_millis": 0,
"total_auto_throttle_in_bytes": 20971520
},
"refresh": { //刷新统计数据
"total": 5,
"total_time_in_millis": 2,
"listeners": 0
},
"flush": { // 冲洗统计数据
"total": 1,
"periodic": 0,
"total_time_in_millis": 4
},
"warmer": {
"current": 0,
"total": 2,
"total_time_in_millis": 0
},
"query_cache": {
"memory_size_in_bytes": 0,
"total_count": 0,
"hit_count": 0,
"miss_count": 0,
"cache_size": 0,
"cache_count": 0,
"evictions": 0
},
"fielddata": { // 字段数据统计数据
"memory_size_in_bytes": 0,
"evictions": 0
},
"completion": { // 完成建议统计数据
"size_in_bytes": 0
},
"segments": { // 分片信息
"count": 0,
"memory_in_bytes": 0,
"terms_memory_in_bytes": 0,
"stored_fields_memory_in_bytes": 0,
"term_vectors_memory_in_bytes": 0,
"norms_memory_in_bytes": 0,
"points_memory_in_bytes": 0,
"doc_values_memory_in_bytes": 0,
"index_writer_memory_in_bytes": 0,
"version_map_memory_in_bytes": 0,
"fixed_bit_set_memory_in_bytes": 0,
"max_unsafe_auto_id_timestamp": -1,
"file_sizes": {}
},
"translog": { // 事务日志统计数据
"operations": 0,
"size_in_bytes": 110,
"uncommitted_operations": 0,
"uncommitted_size_in_bytes": 55,
"earliest_last_modified_age": 14932257
},
"request_cache": { // 分片请求缓存统计数据
"memory_size_in_bytes": 0,
"evictions": 0,
"hit_count": 0,
"miss_count": 0
},
"recovery": { // 索引恢复信息
"current_as_source": 0,
"current_as_target": 0,
"throttle_time_in_millis": 0
}
}
}
}
阅读全文 »

maven使用外部依赖

有时候我们使用的依赖在中央仓库和远程仓库中都没有,但是代码中又要使用到,那么如何进行引入呢?

可以使用system作用域,来引用本地的jar包

1
2
3
4
5
6
7
<dependency>
<groupId>abc</groupId>
<artifactId>abc</artifactId>
<version>1.0</version>
<scope>system</scope> <!-- 作用域为system -->
<systemPath>${basedir}\src\lib\abc.jar</systemPath>
</dependency>