0%

flume监控文件写入kafka

flume监控文件写入kafka

之前了解过log4j2将日志直接写入kafka中,这样会导致应用程序直接依赖于kafka运行环境。

而一般通常的做法是应用程序将日志写入本地,通过日志采集工具将本地日志同步到远程服务器,flume就是常用的数据采集工具之一。

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#事件源名称
agent.sources = execSource
#通道名称
agent.channels = memoryChannel
#接收器名称
agent.sinks = kafkaSink

# For each one of the sources, the type is defined
agent.sources.execSource.type = exec
agent.sources.execSource.command = echo "测试一下"

# The channel can be defined as follows.
# 事件源的通道,绑定通道
agent.sources.execSource.channels = memoryChannel

# Each sink's type must be defined
# kafka接收器配置
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkaSink.kafka.topic = flume-kafka
agent.sinks.kafkaSink.kafka.serializer.class = kafka.serializer.StringEncoder
agent.sinks.kafkaSink.kafka.producer.acks = 1

#Specify the channel the sink should use
# 接收器通道名称,绑定通道
agent.sinks.kafkaSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# 通道中停留的最大事件数
agent.channels.memoryChannel.capacity = 100

执行命令

1
flume-ng agent -c /usr/local/Cellar/flume/1.9.0_1/libexec/conf -f conf/flume-kafka-conf.properties --name agent

查看kafka结果

1
2
3
4
5
6
>kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/flume-kafka-0/00000000000000000000.log --print-data-log

Dumping /usr/local/var/lib/kafka-logs/flume-kafka-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1603274630544 size: 82 magic: 2 compresscodec: NONE crc: 2637763778 isvalid: true
| offset: 0 CreateTime: 1603274630544 keysize: -1 valuesize: 14 sequence: -1 headerKeys: [] payload: "测试一下"

欢迎关注我的其它发布渠道