0%

ELK

ELK

为了方便分布式部署方便地查询应用日志,需要构建一个统一的日志管理系统。通常一个日志管理系统包括日志采集、日志传输、日志存储、日志搜索、日志分析和日志监控报警等模块。

ELK就是一款用于搭建实施日志分析平台的组件。ELK是指Elasticsearch、Logstash、Kiabana这三款框架首字母的缩写。

Elasticsearch是一个实时的分布式搜索和分析引擎,建立在全文搜索引擎Apache Luccne基础之上,具有分布式、高可用性、易扩展、具有副本和索引自动分片功能,提供基于HTTP协议以及JSON为数据交换格式的REST风格API、多数据源、实时分析存储等特点。

Logstash类似于flume,用于对日志进行收集、过滤,对数据进行格式化处理,并将所收集的日志传输到相关系统进行存储,如HDFS/kafka等。由数据输入端、过滤器和数据输出端3部分组成。数据输入端可以从数据源采集数据,常见的数据源包括文件、syslog、kafka等;过滤器是数据处理层,包括对数据进行格式化处理、数据类型转换、数据过滤等,支持正则;数据输出端是将Logstash收集到的数据经由过滤器处理后输出到其他系统,如kafka、HDFS、Elasticsearch等。

Kibana是针对Elasticsearch开源分析及可视化平台,可用来搜索,展示存储在Elasticsearch中的数据。

ELK配置

Elasticsearch配置

Elasticsearch.yml配置

1
2
3
4
5
6
7
8
9
10
# 集群名称
cluster.name: elasticsearch_test
# 集群中该节点名称
node.name: node-1
# 日志文件存储路径
path.data: /usr/local/var/lib/elasticsearch/
# 索引数据存储路径,可以设置多个路径,逗号分隔
path.logs: /usr/local/var/log/elasticsearch/
network.host: localhost
http.port: 9200
1
2
# 启动elasticsearch
elasticsearch

访问localhost:9200

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"name" : "node-1",
"cluster_name" : "elasticsearch_test",
"cluster_uuid" : "y88KhTnmQu-AAbxKii379w",
"version" : {
"number" : "7.9.2-SNAPSHOT",
"build_flavor" : "oss",
"build_type" : "tar",
"build_hash" : "unknown",
"build_date" : "2020-10-03T08:22:40.976826Z",
"build_snapshot" : true,
"lucene_version" : "8.6.2",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

Logstash配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 启动 从控制台输入,控制台输出  codec指定了数据输出格式  输出的@timestamp是UTC时间
logstash -e 'input{stdin{}}output{stdout{codec=>json}}'

logstash -f config/logstash.conf

// 来个复杂点的配置
input{ // 输入
file{
type => "log"
path => ["/data/log/*.log"] // 收集日志路径
start_position => "end" // 配置end表示从文件末尾开始读取,增量。如果配置begining就会从文件头开始读取,如果记录过文件的读取信息,则不会从最开始读取,重启读取信息不会丢失
ingore_older => 0 // 忽略最后修改时间大于多少s的
codec => mutiline { // 解决日志换行问题
pattern => "^%[Timestamp_iso8601]"
negate => true
what => "previous"
}
}
beats {
port => 5044
}
}
output{
if [type] == "log"{
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "log-%{+YYYY.MM.dd}" // 索引名称
user => user
password => pwd
}
}
}


{"@version":"1","host":"zhanghedeMacBook-Pro.local","@timestamp":"2020-10-26T10:02:02.357Z","message":"hell"}

Kibana配置

1
2
3
4
5
6
server.port: 5601
server.host: "localhost"
# kibana服务器名称
server.name: "test"
# elasticsearch的访问地址
elasticsearch.hosts: ["http://localhost:9200"]

启动

1
kibana

Kafka与Logstash

Logstash收集日志到kafka

编写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input{
stdin{}
}
output{
kafka{
# 主题
topic_id => "logstash-input-kafka"
bootstrap_servers => "localhost:9092"
# 数据编码方式
codec => "plain"
}
stdout{
# 设置打印出来的打印数据表现形式
codec => rubydebug
}
}

启动logstash

1
2
# -f后边是之前写的输入输出配置的文件位置
logstash -f ./etc/logstash_input_kafka

上面的配置是输入到控制台以及kafka中

看一下kafka中是否有数据

1
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/logstash-input-kafka-0/00000000000000000000.log --print-data-log

logstash从kafka消费日志

编写配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
input{
kafka{
# 导出数据解码
codec => "plain"
# 消费组
group_id => "kafka_elk_group"
client_id => logstash
topics => "kafka_elk_log"
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
}
}
output{
elasticsearch {
# elasticsearch地址
hosts => ["localhost:9200"]
# 导入到elasticsearch格式
codec => "plain"
# 创建索引
index => "kafka_elk_log-%{+YYYY.MM.dd}"
}
}

启动logstash

1
logstash -f ./etc/logstash_output_es

kafka发消息

1
kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_elk_log

查看elasticsearch该索引内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
localhost:9200/kafka_elk_log-2020.10.27/_search

{
"took": 6,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "kafka_elk_log-2020.10.27",
"_type": "_doc",
"_id": "tdsTaHUBepBPXnsJEzMU",
"_score": 1.0,
"_source": {
"@timestamp": "2020-10-27T03:21:20.939Z",
"message": "kafka来的消息",
"@version": "1"
}
}
]
}
}

Logstash

Logstash包含三部分,分别为输入(input)、输出(output)、数据清洗(filter)

上述使用了input和output,在这演示一下filter

192.168.1.1 [2020-10-27 15:10:20.909 +0800] “POST /account/home {“token”:”dla02okvbk9791bdj”}HTTP/1.1” 200 - “-“ “Apache-HttpClient/4.1.1 (Java 1.8)” 32 “C32A5405VNWVWVNK67”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
input{
kafka{
# 定义类型,进行区分
Type => "access"
# 导出数据解码
codec => "plain"
# 消费组
group_id => "kafka_access_group"
client_id => logstash
topics => "kafka_access_log"
bootstrap_servers => "localhost:9092"
auto_offset_reset => "earliest"
}
}
filter{
if [type] == "access"{
grok {
match => ["message", "%{IP:client}\s+\[%{NOTSPACE:access_date}\s+%{NOTSPACE:access_time}\s+(\+0800\)]\s"%{WORD:method} %{URIPATHPARAM:request}[\s\S]* (HTTP/1.1\") %{NUMBER:response}[\s\S]* %{NUMBER:cost}"]
}
}
}
output{
elasticsearch {
# elasticsearch地址
hosts => ["localhost:9200"]
# 导入到elasticsearch格式
codec => "plain"
# 创建索引
index => "kafka_access_log-%{+YYYY.MM.dd}"
}

logstash配置解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#输入部分
input {
jdbc {
#连接MySQL驱动
jdbc_driver_library => "/es/mysql-connector-java-5.1.31.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mstore"
#连接数据库账号信息
jdbc_user => "root"
jdbc_password => "123456"
#分页
jdbc_paging_enabled => true
#分页大小
jdbc_page_size => 100000
#流式获取数据,每次取10000.
jdbc_fetch_size => 10000
#Maximum number of times to try connecting to database
connection_retry_attempts => 3
#Number of seconds to sleep between connection attempts
connection_retry_attempts_wait_time => 1
#Connection pool configuration. The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5)
jdbc_pool_timeout => 5
#Whether to force the lowercasing of identifier fields
lowercase_column_names => true
#Whether to save state or not in last_run_metadata_path
#保存上次运行记录,增量提取数据时使用
record_last_run = > true
#"* * * * *"为每分钟执行一次
schedule => "* * * * *"
#Use an incremental column value rather than a timestamp
use_column_value => true
#sql_last_value
#The value used to calculate which rows to query. Before any query is run, this is set to Thursday, 1 January 1970, or 0 if use_column_value is true and tracking_column is set. It is updated accordingly after subsequent queries are run.
tracking_column => "id"
#查询语句
statement => "SELECT id,name,update_time FROM user WHERE id > :sql_last_value"
}
}

#过滤部分
filter {
json {
source => "message"
remove_field => ["message"]
}
date{
match => ["update_time","yyy-MM-dd HH:mm:ss"]
}
}

#输出到elastsicearch
output {
elasticsearch {
#elasticsearch集群地址,不用列出所有节点,默认端口号也可省略
hosts => ["127.0.0.1:9200"]
#索引值,查询的时候会用到;需要先在elasticsearch中创建对应的mapping,也可以采用默认的mapping
index => "user"
#指定插入elasticsearch文档ID,对应input中sql字段id
document_id => "%{id}"
}
}

欢迎关注我的其它发布渠道