ELK
为了方便分布式部署方便地查询应用日志,需要构建一个统一的日志管理系统。通常一个日志管理系统包括日志采集、日志传输、日志存储、日志搜索、日志分析和日志监控报警等模块。
ELK就是一款用于搭建实施日志分析平台的组件。ELK是指Elasticsearch、Logstash、Kiabana这三款框架首字母的缩写。
Elasticsearch是一个实时的分布式搜索和分析引擎,建立在全文搜索引擎Apache Luccne基础之上,具有分布式、高可用性、易扩展、具有副本和索引自动分片功能,提供基于HTTP协议以及JSON为数据交换格式的REST风格API、多数据源、实时分析存储等特点。
Logstash类似于flume,用于对日志进行收集、过滤,对数据进行格式化处理,并将所收集的日志传输到相关系统进行存储,如HDFS/kafka等。由数据输入端、过滤器和数据输出端3部分组成。数据输入端可以从数据源采集数据,常见的数据源包括文件、syslog、kafka等;过滤器是数据处理层,包括对数据进行格式化处理、数据类型转换、数据过滤等,支持正则;数据输出端是将Logstash收集到的数据经由过滤器处理后输出到其他系统,如kafka、HDFS、Elasticsearch等。
Kibana是针对Elasticsearch开源分析及可视化平台,可用来搜索,展示存储在Elasticsearch中的数据。
ELK配置
Elasticsearch配置
Elasticsearch.yml配置
1 2 3 4 5 6 7 8 9 10
| cluster.name: elasticsearch_test
node.name: node-1
path.data: /usr/local/var/lib/elasticsearch/
path.logs: /usr/local/var/log/elasticsearch/ network.host: localhost http.port: 9200
|
1 2
| # 启动elasticsearch elasticsearch
|
访问localhost:9200
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| { "name" : "node-1", "cluster_name" : "elasticsearch_test", "cluster_uuid" : "y88KhTnmQu-AAbxKii379w", "version" : { "number" : "7.9.2-SNAPSHOT", "build_flavor" : "oss", "build_type" : "tar", "build_hash" : "unknown", "build_date" : "2020-10-03T08:22:40.976826Z", "build_snapshot" : true, "lucene_version" : "8.6.2", "minimum_wire_compatibility_version" : "6.8.0", "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
|
Logstash配置
1 2 3 4 5
| # 启动 从控制台输入,控制台输出 codec指定了数据输出格式 输出的@timestamp是UTC时间 logstash -e 'input{stdin{}}output{stdout{codec=>json}}'
{"@version":"1","host":"zhanghedeMacBook-Pro.local","@timestamp":"2020-10-26T10:02:02.357Z","message":"hell"}
|
Kibana配置
1 2 3 4 5 6
| server.port: 5601 server.host: "localhost"
server.name: "test"
elasticsearch.hosts: ["http://localhost:9200"]
|
启动
Kafka与Logstash
Logstash收集日志到kafka
编写配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| input{ stdin{} } output{ kafka{ topic_id => "logstash-input-kafka" bootstrap_servers => "localhost:9092" codec => "plain" } stdout{ codec => rubydebug } }
|
启动logstash
1 2
| # -f后边是之前写的输入输出配置的文件位置 logstash -f ./etc/logstash_input_kafka
|
上面的配置是输入到控制台以及kafka中
看一下kafka中是否有数据
1
| kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/logstash-input-kafka-0/00000000000000000000.log --print-data-log
|
logstash从kafka消费日志
编写配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| input{ kafka{ codec => "plain" group_id => "kafka_elk_group" client_id => logstash topics => "kafka_elk_log" bootstrap_servers => "localhost:9092" auto_offset_reset => "earliest" } } output{ elasticsearch { hosts => ["localhost:9200"] codec => "plain" index => "kafka_elk_log-%{+YYYY.MM.dd}" } }
|
启动logstash
1
| logstash -f ./etc/logstash_output_es
|
kafka发消息
1
| kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_elk_log
|
查看elasticsearch该索引内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| localhost:9200/kafka_elk_log-2020.10.27/_search
{ "took": 6, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "kafka_elk_log-2020.10.27", "_type": "_doc", "_id": "tdsTaHUBepBPXnsJEzMU", "_score": 1.0, "_source": { "@timestamp": "2020-10-27T03:21:20.939Z", "message": "kafka来的消息", "@version": "1" } } ] } }
|
Logstash
Logstash包含三部分,分别为输入(input)、输出(output)、数据清洗(filter)
上述使用了input和output,在这演示一下filter
192.168.1.1 [2020-10-27 15:10:20.909 +0800] “POST /account/home {“token”:”dla02okvbk9791bdj”}HTTP/1.1” 200 - “-“ “Apache-HttpClient/4.1.1 (Java 1.8)” 32 “C32A5405VNWVWVNK67”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| input{ kafka{ Type => "access" codec => "plain" group_id => "kafka_access_group" client_id => logstash topics => "kafka_access_log" bootstrap_servers => "localhost:9092" auto_offset_reset => "earliest" } } filter{ if [type] == "access"{ grok { match => ["message", "%{IP:client}\s+\[%{NOTSPACE:access_date}\s+%{NOTSPACE:access_time}\s+(\+0800\)]\s"%{WORD:method} %{URIPATHPARAM:request}[\s\S]* (HTTP/1.1\") %{NUMBER:response}[\s\S]* %{NUMBER:cost}"] } } } output{ elasticsearch { hosts => ["localhost:9200"] codec => "plain" index => "kafka_access_log-%{+YYYY.MM.dd}" }
|