0%

flume监控目录文件

flume监控目录文件

使用flume来监控目录文件的变化,并将目录中的文件内容传至hdfs中,有三种事件源可供选择

一是Exec Source来监控文件,适用于监控一个实时追加的文件,但是不能保证数据不丢失

二是Spooldir Source来监控目录,可以保证不丢失数据,且可以做到断点续传,但是延迟过高,无法保证实时监控

三是Tairdir Source来监控,既可以保证不丢失数据,又可以保证断点续传,还可以保证实时监控

使用exec Source

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#事件源名称
agent1.sources = execSource
#通道名称
agent1.channels = memoryChannel
#接收器名称
agent1.sinks = hdfsSink

# For each one of the sources, the type is defined
# 事件源类型 常见的有avro(监听Avro端口并从外部Avro客户端流接收事件)、thrift(监听Thrift端口并从外部Thrift客户端流接收事件)、exec(Exec源在启动时运行给定的Unix命令,并期望该进程在标准输出上连续产生数据)、spooldir(此源允许您通过将要提取的文件放入磁盘上的“spooling”目录中来提取数据。此源将监视新文件的指定目录,并在新文件显示时解析新文件中的事件)、org.apache.flume.source.kafka.KafkaSource(从Kafka主题读取消息的Apache Kafka消费者)、seq(简单的序列发生器,不断的产生事件,值是从0开始每次递增1)
agent1.sources.execSource.type = exec
agent1.sources.execSource.command = tail -F /Users/zhanghe/desktop/user/test/testExec.txt



# Each sink's type must be defined
# 接收器的类型 常见的有hdfs(将事件写入Hadoop分布式文件系统(HDFS))、hive(将包含定界文本或JSON数据的事件直接传输到Hive表或分区)、hbase、avro、org.apache.flume.sink.kafka.KafkaSink(将数据发布到Kafka主题)
agent1.sinks.hdfsSink.type = hdfs

# 配置hdfs路径,按照日期和小时切割文件
agent1.sinks.hdfsSink.hdfs.path = hdfs://localhost:9000/exec-flume-hdfs/%Y-%m-%d/%H
# 文件前缀
agent1.sinks.hdfsSink.hdfs.filePrefix = test-
# 正在接收数据写操作的临时文件后缀
agent1.sinks.hdfsSink.hdfs.inUseSuffix = .tmp
# 文件归档为目标文件的文件后缀名
agent1.sinks.hdfsSink.hdfs.fileSuffix = .txt
# 使用本地时间
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true


##############切割文件
# 若以时间切割文件时,滚动为目标文件之前的最大时间间隔,单位秒
# 如果为0,则表示不根据时间来滚动文件
agent1.sinks.hdfsSink.hdfs.rollInverval = 0

# 若以大小切割文件时,滚动为目标文件之前的最多字节数
# 如果为0,则表示不根据临时文件大小来滚动文件
agent1.sinks.hdfsSink.hdfs.rollSize = 0

# 配置当事件数据达到该数量时,将临时文件滚动成目标文件
# 如果为0,表示不根据事件数据来滚动文件
agent1.sinks.hdfsSink.hdfs.rollCount = 0

############文件夹
# 是否更换文件夹
agent1.sinks.hdfsSink.hdfs.round=true
# 周期值
agent1.sinks.hdfsSink.hdfs.roundValue=1
# 周期单元 注意与hdfs。path的文件夹进行匹配,如果文件夹没有%H文件夹,不生效
agent1.sinks.hdfsSink.hdfs.roundUnit=hour


# 每个批次刷新到HDFS上的事件数量
agent1.sinks.hdfsSink.hdfs.batchSize = 1

# 文件格式,默认为SequenceFile
agent1.sinks.hdfsSink.hdfs.fileType = DataStream

# 写sequence文件的格式,默认Writable
agent1.sinks.hdfsSink.hdfs.writeFormat = Text
# 配置当前被打开的临时文件在该参数指定的时间内,没有任何数据写入时则将该临时文件关闭并重命名为目标文件
agent1.sinks.hdfsSink.hdfs.idleTimeout = 0

# 接收器启动操作HDFS的线程数,默认10
agent1.sinks.hdfsSink.hdfs.threadsPoolSize = 15
# 执行HDFS操作的超时时间,默认10s
agent1.sinks.hdfsSink.hdfs.callTimeout = 60000



# Each channel's type is defined.
# 通道类型 常见的有 file(将数据存储到磁盘上)、memory(存储在具有可配置最大大小的内存队列中)、jdbc(存放于一个支持JDBC连接的数据库中)、SPILLABLEMEMORY(存放在内存和磁盘上,内存作为主要存储,当内存达到一定临界点的时候会溢写到磁盘上。其中和了memory channel和File channel的优缺点)
agent1.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
# 通道中停留的最大事件数
agent1.channels.memoryChannel.capacity = 100



# The channel can be defined as follows.
# 事件源的通道,绑定通道
agent1.sources.execSource.channels = memoryChannel
#Specify the channel the sink should use
# 接收器通道名称,绑定通道
agent1.sinks.hdfsSink.channel = memoryChannel

使用Spooldir Source

接收器/通道的配置与上述一致,只需要调整事件源的配置即可

1
2
3
4
5
6
7
8
9
10
#事件源名称
agent1.sources = spooldirSource
#通道名称
agent1.channels = memoryChannel
#接收器名称
agent1.sinks = hdfsSink

agent1.sources.spooldirSource.type = spooldir
# 监听的目录
agent1.sources.spooldirSource.spoolDir = /Users/zhanghe/desktop/user/test/

使用Taildir Source

1
2
3
4
5
6
7
agent2.sources.tairDirSource.type = TAILDIR
# 文件组,可以有多个,空格隔开
agent2.sources.tairDirSource.filegroups = log1
# log1文件组的文件位置
agent2.sources.tairDirSource.filegroups.log1 = /Users/zhanghe/desktop/user/test/testTailDir/test1.log
# 该文件用于记录最后读取到的文件位置了
agent2.sources.tairDirSource.positionFile = /Users/zhanghe/desktop/user/test/testTailDir/taildir_position.json