0%

flume配置

flume配置

之前了解过log4j2将日志直接写入kafka中,这样会导致应用程序直接依赖于kafka运行环境。

而一般通常的做法是应用程序将日志写入本地,通过日志采集工具将本地日志同步到远程服务器,flume就是常用的数据采集工具之一。

Flume

我下载的最新的flume,为1.9.0版本

1
2
3
4
5
6
>flume-ng version 
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9

Flume NG主要由事件源(Source)、通道(Channel)和接收器(Sink)3个组件组成,由3个组件组成一个代理Agent,一个Flume代理可能包括多个事件源、通道和接收器。

模型

事件

Flume将一个具有有效负荷的字节数据流和可选的字符串属性集称为事件,一个事件带有一个可选的消息头,消息头可用于路由判定或者一些用于消息标识的信息,事件是Flume数据传输的基本单位。

事件源

事件源就是负责从数据源采集数据将事件发送到一到多个通道中,是数据流入的入口。常见事件源有文件、网络、数据库、kafka等

通道

通道负责将事件源流入的数据进行聚合、暂存,通常数据在通道停留的时间不会太长,是位于事件源与接收器之间的构件。通道主要包括非持久性通道(如内存通道Memory Channel)与持久化通道(如文件通道File Channel、数据库JDBC Channel)

接收器

接收器负责从通道消费数据,将数据转移到其他存储系统,例如将数据存储到文件、数据库、HDFS、Kafka、HBase等

其他功能

拦截器

Flume提供了拦截器功能,可以在源之后、接收器之前定义多个拦截器,可以通过拦截器在数据流入通道之前或数据流出通道之后对数据进行处理。

负载均衡

Flume提供了负载均衡功能,提供了轮询(round_robin)和随机(random)两种策略,可以通过processor.selector属性指定

故障转移

Flume提供了故障转移功能,通过为接收器Processor配置维护一个优先级列表,以保证每一个有效事件都能够处理。通过processor.type来指定是故障转移还是负载均衡

命令参数

  • agent 指定以agent角色启动,另一个角色为avro-client
  • conf或c 指定配置源和接收器配置文件的绝对路径,不包括配置文件名
  • config-file或f 指定配置源和接收器配置文件的相对路径,相对于执行该命令的目录,包括文件名
  • name或n 指定agent的名称
  • D -D后接键值对,指定java相关配置

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#事件源名称
agent.sources = seqGenSrc
#通道名称
agent.channels = memoryChannel
#接收器名称
agent.sinks = loggerSink

# For each one of the sources, the type is defined
# 事件源类型 常见的有avro(监听Avro端口并从外部Avro客户端流接收事件)、thrift(监听Thrift端口并从外部Thrift客户端流接收事件)、exec(Exec源在启动时运行给定的Unix命令,并期望该进程在标准输出上连续产生数据)、spooldir(此源允许您通过将要提取的文件放入磁盘上的“spooling”目录中来提取数据。此源将监视新文件的指定目录,并在新文件显示时解析新文件中的事件)、org.apache.flume.source.kafka.KafkaSource(从Kafka主题读取消息的Apache Kafka消费者)、seq(简单的序列发生器,不断的产生事件,值是从0开始每次递增1)
agent.sources.seqGenSrc.type = seq

# The channel can be defined as follows.
# 事件源的通道,绑定通道
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
# 接收器的类型 常见的有hdfs(将事件写入Hadoop分布式文件系统(HDFS))、hive(将包含定界文本或JSON数据的事件直接传输到Hive表或分区)、hbase、avro、org.apache.flume.sink.kafka.KafkaSink(将数据发布到Kafka主题)
agent.sinks.loggerSink.type = logger

#Specify the channel the sink should use
# 接收器通道名称,绑定通道
agent.sinks.loggerSink.channel = memoryChannel

# Each channel's type is defined.
# 通道类型 常见的有 file(将数据存储到磁盘上)、memory(存储在具有可配置最大大小的内存队列中)、jdbc(存放于一个支持JDBC连接的数据库中)、SPILLABLEMEMORY(存放在内存和磁盘上,内存作为主要存储,当内存达到一定临界点的时候会溢写到磁盘上。其中和了memory channel和File channel的优缺点)
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# 通道中停留的最大事件数
agent.channels.memoryChannel.capacity = 100

命令

1
flume-ng agent -c /usr/local/Cellar/flume/1.9.0_1/libexec/conf -f conf/flume-conf.properties --name agent

按照上述配置的话,该命令会在log文件中一直打印数字,日志所在位置查看log4j.properties中的配置

写入kafka

配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#事件源名称
agent.sources = execSource
#通道名称
agent.channels = memoryChannel
#接收器名称
agent.sinks = kafkaSink

# For each one of the sources, the type is defined
agent.sources.execSource.type = exec
agent.sources.execSource.command = echo "测试一下"

# The channel can be defined as follows.
# 事件源的通道,绑定通道
agent.sources.execSource.channels = memoryChannel

# Each sink's type must be defined
# kafka接收器配置
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkaSink.kafka.topic = flume-kafka
agent.sinks.kafkaSink.kafka.serializer.class = kafka.serializer.StringEncoder
agent.sinks.kafkaSink.kafka.producer.acks = 1

#Specify the channel the sink should use
# 接收器通道名称,绑定通道
agent.sinks.kafkaSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# 通道中停留的最大事件数
agent.channels.memoryChannel.capacity = 100
执行命令
1
flume-ng agent -c /usr/local/Cellar/flume/1.9.0_1/libexec/conf -f conf/flume-kafka-conf.properties --name agent
查看kafka结果
1
2
3
4
5
6
>kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/flume-kafka-0/00000000000000000000.log --print-data-log

Dumping /usr/local/var/lib/kafka-logs/flume-kafka-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1603274630544 size: 82 magic: 2 compresscodec: NONE crc: 2637763778 isvalid: true
| offset: 0 CreateTime: 1603274630544 keysize: -1 valuesize: 14 sequence: -1 headerKeys: [] payload: "测试一下"

flume采集kafka消息写入HDFS

配置

该配置采用kafka作为事件源,kafka作为通道,hdfs作为接收器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#事件源名称
agent1.sources = kafkaSource
#通道名称
agent1.channels = kafka-channel
#接收器名称
agent1.sinks = hdfsSink


###############################事件源source###########################################
## flume-kafka-source-1.9.0.jar
# For each one of the sources, the type is defined
# 使用kafka事件源
agent1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafkaSource.kafka.bootstrap.servers = localhost:9092
# 注意这里的topic配置是topics 我当初没有写s导致事件源没有启动 数据进不去 找了很长时间问题
agent1.sources.kafkaSource.kafka.topics = kafka-flume
# 消费者组id
agent1.sources.kafkaSource.kafka.consumer.group.id = kafka-flume-hdfs
# 批量写入的最大数量
agent1.sources.kafkaSource.kafka.batchSize = 1000
# 批量写入最长等待时间
agent1.sources.kafkaSource.kafka.batchDurationMillis = 1000

# The channel can be defined as follows.
# 事件源的通道,绑定通道
agent1.sources.kafkaSource.channels = kafka-channel


#############################通道channel#############################################
## flume-kafka-channel-1.9.0.jar
agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
# 指定通道用于缓存数据的主题
agent1.channels.kafka-channel.kafka.topic = kafka-channel



##################################接收器sink##############################################
## flume-hdfs-sink-1.9.0.jar
## 采用HDFSEventSink,将KafkaChannel中的数据写入到HDFS中,以日期和小时来切割文件,即每天的每个小时生成一个子目录

agent1.sinks.hdfsSink.type = hdfs
# 配置hdfs路径,按照日期和小时切割文件
agent1.sinks.hdfsSink.hdfs.path = hdfs://localhost:9000/kafka-flume-hdfs/%Y-%m-%d/%H
# 文件前缀
agent1.sinks.hdfsSink.hdfs.filePrefix = test
# 正在接收数据写操作的临时文件后缀
agent1.sinks.hdfsSink.hdfs.inUseSuffix = .tmp
# 文件归档为目标文件的文件后缀名
agent1.sinks.hdfsSink.hdfs.fileSuffix = .txt
# 使用本地时间
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# 若以时间切割文件时,滚动为目标文件之前的最大时间间隔,单位秒
# 如果为0,则表示不根据时间来滚动文件
agent1.sinks.hdfsSink.hdfs.rollInverval = 0

# 若以大小切割文件时,滚动为目标文件之前的最多字节数
# 如果为0,则表示不根据临时文件大小来滚动文件
agent1.sinks.hdfsSink.hdfs.rollSize = 0

# 配置当事件数据达到该数量时,将临时文件滚动成目标文件
# 如果为0,表示不根据事件数据来滚动文件
agent1.sinks.hdfsSink.hdfs.rollCount = 0

# 每个批次刷新到HDFS上的事件数量
agent1.sinks.hdfsSink.hdfs.batchSize = 1

# 文件格式,默认为SequenceFile
agent1.sinks.hdfsSink.hdfs.fileType = DataStream

# 写sequence文件的格式,默认Writable
agent1.sinks.hdfsSink.hdfs.writeFormat = Text
# 配置当前被打开的临时文件在该参数指定的时间内,没有任何数据写入时则将该临时文件关闭并重命名为目标文件
agent1.sinks.hdfsSink.hdfs.idleTimeout = 0

# 接收器启动操作HDFS的线程数,默认10
agent1.sinks.hdfsSink.hdfs.threadsPoolSize = 15
# 执行HDFS操作的超时时间,默认10s
agent1.sinks.hdfsSink.hdfs.callTimeout = 60000

agent1.sinks.hdfsSink.channel = kafka-channel

启动

首先要启动zookeeper和kafka

1
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties &

然后启动hadoop(hadoop的启动要在start-all.sh所在目录启动)

1
./start-all.sh

最后启动flume

1
flume-ng agent -c /usr/local/Cellar/flume/1.9.0_1/libexec/conf -f conf/kafka-flume-hdfs-conf.properties --name agent1

使用kafka发送数据

1
kafka-console-producer --bootstrap-server localhost:9092 --topic kafka-flume

可以查看通道中是否包含数据

1
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/kafka-channel-0/00000000000000000000.log --print-data-log

查看hdfs数据

1
hdfs dfs -cat /kafka-flume-hdfs/2020-10-26/15/test.1603698368703.txt