0%

flume采集kafka消息写入HDFS

flume采集kafka消息写入HDFS

配置

该配置采用kafka作为事件源,kafka作为通道,hdfs作为接收器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#事件源名称
agent1.sources = kafkaSource
#通道名称
agent1.channels = kafka-channel
#接收器名称
agent1.sinks = hdfsSink


###############################事件源source###########################################
## flume-kafka-source-1.9.0.jar
# For each one of the sources, the type is defined
# 使用kafka事件源
agent1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafkaSource.kafka.bootstrap.servers = localhost:9092
# 注意这里的topic配置是topics 我当初没有写s导致事件源没有启动 数据进不去 找了很长时间问题
agent1.sources.kafkaSource.kafka.topics = kafka-flume
# 消费者组id
agent1.sources.kafkaSource.kafka.consumer.group.id = kafka-flume-hdfs
# 批量写入的最大数量
agent1.sources.kafkaSource.kafka.batchSize = 1000
# 批量写入最长等待时间
agent1.sources.kafkaSource.kafka.batchDurationMillis = 1000

# The channel can be defined as follows.
# 事件源的通道,绑定通道
agent1.sources.kafkaSource.channels = kafka-channel


#############################通道channel#############################################
## flume-kafka-channel-1.9.0.jar
agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
# 指定通道用于缓存数据的主题
agent1.channels.kafka-channel.kafka.topic = kafka-channel



##################################接收器sink##############################################
## flume-hdfs-sink-1.9.0.jar
## 采用HDFSEventSink,将KafkaChannel中的数据写入到HDFS中,以日期和小时来切割文件,即每天的每个小时生成一个子目录

agent1.sinks.hdfsSink.type = hdfs
# 配置hdfs路径,按照日期和小时切割文件
agent1.sinks.hdfsSink.hdfs.path = hdfs://localhost:9000/kafka-flume-hdfs/%Y-%m-%d/%H
# 文件前缀
agent1.sinks.hdfsSink.hdfs.filePrefix = test
# 正在接收数据写操作的临时文件后缀
agent1.sinks.hdfsSink.hdfs.inUseSuffix = .tmp
# 文件归档为目标文件的文件后缀名
agent1.sinks.hdfsSink.hdfs.fileSuffix = .txt
# 使用本地时间
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# 若以时间切割文件时,滚动为目标文件之前的最大时间间隔,单位秒
# 如果为0,则表示不根据时间来滚动文件
agent1.sinks.hdfsSink.hdfs.rollInverval = 0

# 若以大小切割文件时,滚动为目标文件之前的最多字节数
# 如果为0,则表示不根据临时文件大小来滚动文件
agent1.sinks.hdfsSink.hdfs.rollSize = 0

# 配置当事件数据达到该数量时,将临时文件滚动成目标文件
# 如果为0,表示不根据事件数据来滚动文件
agent1.sinks.hdfsSink.hdfs.rollCount = 0

# 每个批次刷新到HDFS上的事件数量
agent1.sinks.hdfsSink.hdfs.batchSize = 1

# 文件格式,默认为SequenceFile
agent1.sinks.hdfsSink.hdfs.fileType = DataStream

# 写sequence文件的格式,默认Writable
agent1.sinks.hdfsSink.hdfs.writeFormat = Text
# 配置当前被打开的临时文件在该参数指定的时间内,没有任何数据写入时则将该临时文件关闭并重命名为目标文件
agent1.sinks.hdfsSink.hdfs.idleTimeout = 0

# 接收器启动操作HDFS的线程数,默认10
agent1.sinks.hdfsSink.hdfs.threadsPoolSize = 15
# 执行HDFS操作的超时时间,默认10s
agent1.sinks.hdfsSink.hdfs.callTimeout = 60000

agent1.sinks.hdfsSink.channel = kafka-channel

启动

首先要启动zookeeper和kafka

1
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties &

然后启动hadoop(hadoop的启动要在start-all.sh所在目录启动)

1
./start-all.sh

最后启动flume

1
flume-ng agent -c /usr/local/Cellar/flume/1.9.0_1/libexec/conf -f conf/kafka-flume-hdfs-conf.properties --name agent1

使用kafka发送数据

1
kafka-console-producer --bootstrap-server localhost:9092 --topic kafka-flume

可以查看通道中是否包含数据

1
kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/kafka-channel-0/00000000000000000000.log --print-data-log

查看hdfs数据

1
hdfs dfs -cat /kafka-flume-hdfs/2020-10-26/15/test.1603698368703.txt