0%

ELK

ELK

为了方便分布式部署方便地查询应用日志,需要构建一个统一的日志管理系统。通常一个日志管理系统包括日志采集、日志传输、日志存储、日志搜索、日志分析和日志监控报警等模块。

ELK就是一款用于搭建实施日志分析平台的组件。ELK是指Elasticsearch、Logstash、Kiabana这三款框架首字母的缩写。

Elasticsearch是一个实时的分布式搜索和分析引擎,建立在全文搜索引擎Apache Luccne基础之上,具有分布式、高可用性、易扩展、具有副本和索引自动分片功能,提供基于HTTP协议以及JSON为数据交换格式的REST风格API、多数据源、实时分析存储等特点。

Logstash类似于flume,用于对日志进行收集、过滤,对数据进行格式化处理,并将所收集的日志传输到相关系统进行存储,如HDFS/kafka等。由数据输入端、过滤器和数据输出端3部分组成。数据输入端可以从数据源采集数据,常见的数据源包括文件、syslog、kafka等;过滤器是数据处理层,包括对数据进行格式化处理、数据类型转换、数据过滤等,支持正则;数据输出端是将Logstash收集到的数据经由过滤器处理后输出到其他系统,如kafka、HDFS、Elasticsearch等。

Kibana是针对Elasticsearch开源分析及可视化平台,可用来搜索,展示存储在Elasticsearch中的数据。

ELK配置

Elasticsearch配置

Elasticsearch.yml配置

1
2
3
4
5
6
7
8
9
10
# 集群名称
cluster.name: elasticsearch_test
# 集群中该节点名称
node.name: node-1
# 日志文件存储路径
path.data: /usr/local/var/lib/elasticsearch/
# 索引数据存储路径,可以设置多个路径,逗号分隔
path.logs: /usr/local/var/log/elasticsearch/
network.host: localhost
http.port: 9200
1
2
# 启动elasticsearch
elasticsearch

访问localhost:9200

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"name" : "node-1",
"cluster_name" : "elasticsearch_test",
"cluster_uuid" : "y88KhTnmQu-AAbxKii379w",
"version" : {
"number" : "7.9.2-SNAPSHOT",
"build_flavor" : "oss",
"build_type" : "tar",
"build_hash" : "unknown",
"build_date" : "2020-10-03T08:22:40.976826Z",
"build_snapshot" : true,
"lucene_version" : "8.6.2",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

Logstash配置

1
2
3
4
5
# 启动 从控制台输入,控制台输出  codec指定了数据输出格式  输出的@timestamp是UTC时间
logstash -e 'input{stdin{}}output{stdout{codec=>json}}'


{"@version":"1","host":"zhanghedeMacBook-Pro.local","@timestamp":"2020-10-26T10:02:02.357Z","message":"hell"}

Kibana配置

1
2
3
4
5
6
server.port: 5601
server.host: "localhost"
# kibana服务器名称
server.name: "test"
# elasticsearch的访问地址
elasticsearch.hosts: ["http://localhost:9200"]

启动

1
kibana

Kafka与Logstash

Logstash收集日志到kafka

编写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input{
stdin{}
}
output{
kafka{
# 主题
topic_id => "logstash-input-kafka"
bootstrap_servers => "localhost:9092"
# 数据编码方式
codec => "plain"
}
stdout{
# 设置打印出来的打印数据表现形式
codec => rubydebug
}
}

启动logstash

1
2
# -f后边是之前写的输入输出配置的文件位置
logstash -f ./etc/logstash_input_kafka

上面的配置是输入到控制台以及kafka中

看一下kafka中是否有数据

1
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/logstash-input-kafka-0/00000000000000000000.log --print-data-log

logstash从kafka消费日志

编写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
input{
kafka{
# 导出数据解码
codec => "plain"
# 消费组
group_id => "kafka_elk_group"
client_id => logstash
topics => "kafka_elk_log"
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
}
}
output{
elasticsearch {
# elasticsearch地址
hosts => ["localhost:9200"]
# 导入到elasticsearch格式
codec => "plain"
# 创建索引
index => "kafka_elk_log-%{+YYYY.MM.dd}"
}
}

启动logstash

1
logstash -f ./etc/logstash_output_es

kafka发消息

1
kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_elk_log

查看elasticsearch该索引内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
localhost:9200/kafka_elk_log-2020.10.27/_search

{
"took": 6,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "kafka_elk_log-2020.10.27",
"_type": "_doc",
"_id": "tdsTaHUBepBPXnsJEzMU",
"_score": 1.0,
"_source": {
"@timestamp": "2020-10-27T03:21:20.939Z",
"message": "kafka来的消息",
"@version": "1"
}
}
]
}
}

Logstash

Logstash包含三部分,分别为输入(input)、输出(output)、数据清洗(filter)

上述使用了input和output,在这演示一下filter

192.168.1.1 [2020-10-27 15:10:20.909 +0800] “POST /account/home {“token”:”dla02okvbk9791bdj”}HTTP/1.1” 200 - “-“ “Apache-HttpClient/4.1.1 (Java 1.8)” 32 “C32A5405VNWVWVNK67”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
input{
kafka{
# 定义类型,进行区分
Type => "access"
# 导出数据解码
codec => "plain"
# 消费组
group_id => "kafka_access_group"
client_id => logstash
topics => "kafka_access_log"
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
}
}
filter{
if [type] == "access"{
grok {
match => ["message", "%{IP:client}\s+\[%{NOTSPACE:access_date}\s+%{NOTSPACE:access_time}\s+(\+0800\)]\s"%{WORD:method} %{URIPATHPARAM:request}[\s\S]* (HTTP/1.1\") %{NUMBER:response}[\s\S]* %{NUMBER:cost}"]
}
}
}
output{
elasticsearch {
# elasticsearch地址
hosts => ["localhost:9200"]
# 导入到elasticsearch格式
codec => "plain"
# 创建索引
index => "kafka_access_log-%{+YYYY.MM.dd}"
}