ELK 为了方便分布式部署方便地查询应用日志,需要构建一个统一的日志管理系统。通常一个日志管理系统包括日志采集、日志传输、日志存储、日志搜索、日志分析和日志监控报警等模块。
ELK就是一款用于搭建实施日志分析平台的组件。ELK是指Elasticsearch、Logstash、Kiabana这三款框架首字母的缩写。
Elasticsearch 是一个实时的分布式搜索和分析引擎,建立在全文搜索引擎Apache Luccne基础之上,具有分布式、高可用性、易扩展、具有副本和索引自动分片功能,提供基于HTTP协议以及JSON为数据交换格式的REST风格API、多数据源、实时分析存储等特点。
Logstash 类似于flume,用于对日志进行收集、过滤,对数据进行格式化处理,并将所收集的日志传输到相关系统进行存储,如HDFS/kafka等。由数据输入端、过滤器和数据输出端3部分组成。数据输入端可以从数据源采集数据,常见的数据源包括文件、syslog、kafka等;过滤器是数据处理层,包括对数据进行格式化处理、数据类型转换、数据过滤等,支持正则;数据输出端是将Logstash收集到的数据经由过滤器处理后输出到其他系统,如kafka、HDFS、Elasticsearch等。
Kibana 是针对Elasticsearch开源分析及可视化平台,可用来搜索,展示存储在Elasticsearch中的数据。
ELK配置 Elasticsearch配置 Elasticsearch.yml配置
1 2 3 4 5 6 7 8 9 10 cluster.name: elasticsearch_test node.name: node-1 path.data: /usr/local/var/lib/elasticsearch/ path.logs: /usr/local/var/log/elasticsearch/ network.host: localhost http.port: 9200
1 2 # 启动elasticsearch elasticsearch
访问localhost:9200
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "name" : "node-1" , "cluster_name" : "elasticsearch_test" , "cluster_uuid" : "y88KhTnmQu-AAbxKii379w" , "version" : { "number" : "7.9.2-SNAPSHOT" , "build_flavor" : "oss" , "build_type" : "tar" , "build_hash" : "unknown" , "build_date" : "2020-10-03T08:22:40.976826Z" , "build_snapshot" : true , "lucene_version" : "8.6.2" , "minimum_wire_compatibility_version" : "6.8.0" , "minimum_index_compatibility_version" : "6.0.0-beta1" }, "tagline" : "You Know, for Search" }
Logstash配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 # 启动 从控制台输入,控制台输出 codec指定了数据输出格式 输出的@timestamp是UTC时间 logstash -e 'input{stdin{}}output{stdout{codec=>json}}' logstash -f config/logstash.conf // 来个复杂点的配置 input{ // 输入 file{ type => "log" path => ["/data/log/*.log"] // 收集日志路径 start_position => "end" // 配置end表示从文件末尾开始读取,增量。如果配置begining就会从文件头开始读取,如果记录过文件的读取信息,则不会从最开始读取,重启读取信息不会丢失 ingore_older => 0 // 忽略最后修改时间大于多少s的 codec => mutiline { // 解决日志换行问题 pattern => "^%[Timestamp_iso8601]" negate => true what => "previous" } } beats { port => 5044 } } output{ if [type] == "log"{ elasticsearch { hosts => ["http://127.0.0.1:9200"] index => "log-%{+YYYY.MM.dd}" // 索引名称 user => user password => pwd } } } {"@version":"1","host":"zhanghedeMacBook-Pro.local","@timestamp":"2020-10-26T10:02:02.357Z","message":"hell"}
Kibana配置 1 2 3 4 5 6 server.port: 5601 server.host: "localhost" server.name: "test" elasticsearch.hosts: ["http://localhost:9200" ]
启动
Kafka与Logstash Logstash收集日志到kafka 编写配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 input{ stdin{} } output{ kafka{ topic_id => "logstash-input-kafka" bootstrap_servers => "localhost:9092" codec => "plain" } stdout{ codec => rubydebug } }
启动logstash
1 2 # -f后边是之前写的输入输出配置的文件位置 logstash -f ./etc/logstash_input_kafka
上面的配置是输入到控制台以及kafka中
看一下kafka中是否有数据
1 kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/logstash-input-kafka-0/00000000000000000000.log --print-data-log
logstash从kafka消费日志 编写配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 input{ kafka{ codec => "plain" group_id => "kafka_elk_group" client_id => logstash topics => "kafka_elk_log" bootstrap_servers => "localhost:9092" auto_offset_reset => "earliest" } } output{ elasticsearch { hosts => ["localhost:9200" ] codec => "plain" index => "kafka_elk_log-%{+YYYY.MM.dd}" } }
启动logstash
1 logstash -f ./etc/logstash_output_es
kafka发消息
1 kafka-console-producer --bootstrap-server localhost:9092 --topic kafka_elk_log
查看elasticsearch该索引内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 localhost:9200/kafka_elk_log-2020.10.27/_search { "took" : 6 , "timed_out" : false , "_shards" : { "total" : 1 , "successful" : 1 , "skipped" : 0 , "failed" : 0 }, "hits" : { "total" : { "value" : 1 , "relation" : "eq" }, "max_score" : 1.0 , "hits" : [ { "_index" : "kafka_elk_log-2020.10.27" , "_type" : "_doc" , "_id" : "tdsTaHUBepBPXnsJEzMU" , "_score" : 1.0 , "_source" : { "@timestamp" : "2020-10-27T03:21:20.939Z" , "message" : "kafka来的消息" , "@version" : "1" } } ] } }
Logstash Logstash包含三部分,分别为输入(input)、输出(output)、数据清洗(filter)
上述使用了input和output,在这演示一下filter
192.168.1.1 [2020-10-27 15:10:20.909 +0800] “POST /account/home {“token”:”dla02okvbk9791bdj”}HTTP/1.1” 200 - “-“ “Apache-HttpClient/4.1.1 (Java 1.8)” 32 “C32A5405VNWVWVNK67”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 input{ kafka{ Type => "access" codec => "plain" group_id => "kafka_access_group" client_id => logstash topics => "kafka_access_log" bootstrap_servers => "localhost:9092" auto_offset_reset => "earliest" } } filter{ if [type] == "access" { grok { match => ["message" , "%{IP:client}\s+\[%{NOTSPACE:access_date}\s+%{NOTSPACE:access_time}\s+(\+0800\)]\s" %{WORD:method} %{URIPATHPARAM:request} [\s\S]* (HTTP/1.1 \") %{NUMBER:response}[\s\S]* %{NUMBER:cost}" ] } } } output{ elasticsearch { hosts => ["localhost:9200" ] codec => "plain" index => "kafka_access_log-%{+YYYY.MM.dd}" }
logstash配置解释 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 input { jdbc { jdbc_driver_library => "/es/mysql-connector-java-5.1.31.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mstore" jdbc_user => "root" jdbc_password => "123456" jdbc_paging_enabled => true jdbc_page_size => 100000 jdbc_fetch_size => 10000 connection_retry_attempts => 3 connection_retry_attempts_wait_time => 1 jdbc_pool_timeout => 5 lowercase_column_names => true record_last_run = > true schedule => "* * * * *" use_column_value => true tracking_column => "id" statement => "SELECT id,name,update_time FROM user WHERE id > :sql_last_value" } } filter { json { source => "message" remove_field => ["message" ] } date { match => ["update_time" ,"yyy-MM-dd HH:mm:ss" ] } } output { elasticsearch { hosts => ["127.0.0.1:9200" ] index => "user" document_id => "%{id}" } }