>flume-ng version Flume 1.9.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9
# For each one of the sources, the type is defined # 事件源类型 常见的有avro(监听Avro端口并从外部Avro客户端流接收事件)、thrift(监听Thrift端口并从外部Thrift客户端流接收事件)、exec(Exec源在启动时运行给定的Unix命令,并期望该进程在标准输出上连续产生数据)、spooldir(此源允许您通过将要提取的文件放入磁盘上的“spooling”目录中来提取数据。此源将监视新文件的指定目录,并在新文件显示时解析新文件中的事件)、org.apache.flume.source.kafka.KafkaSource(从Kafka主题读取消息的Apache Kafka消费者)、seq(简单的序列发生器,不断的产生事件,值是从0开始每次递增1) agent.sources.seqGenSrc.type = seq
# The channel can be defined as follows. # 事件源的通道,绑定通道 agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined # 接收器的类型 常见的有hdfs(将事件写入Hadoop分布式文件系统(HDFS))、hive(将包含定界文本或JSON数据的事件直接传输到Hive表或分区)、hbase、avro、org.apache.flume.sink.kafka.KafkaSink(将数据发布到Kafka主题) agent.sinks.loggerSink.type = logger
#Specify the channel the sink should use # 接收器通道名称,绑定通道 agent.sinks.loggerSink.channel = memoryChannel
# Each channel's type is defined. # 通道类型 常见的有 file(将数据存储到磁盘上)、memory(存储在具有可配置最大大小的内存队列中)、jdbc(存放于一个支持JDBC连接的数据库中)、SPILLABLEMEMORY(存放在内存和磁盘上,内存作为主要存储,当内存达到一定临界点的时候会溢写到磁盘上。其中和了memory channel和File channel的优缺点) agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel # 通道中停留的最大事件数 agent.channels.memoryChannel.capacity = 100
# For each one of the sources, the type is defined agent.sources.execSource.type = exec agent.sources.execSource.command = echo "测试一下"
# The channel can be defined as follows. # 事件源的通道,绑定通道 agent.sources.execSource.channels = memoryChannel
# Each sink's type must be defined # kafka接收器配置 agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.kafka.bootstrap.servers = localhost:9092 agent.sinks.kafkaSink.kafka.topic = flume-kafka agent.sinks.kafkaSink.kafka.serializer.class = kafka.serializer.StringEncoder agent.sinks.kafkaSink.kafka.producer.acks = 1
#Specify the channel the sink should use # 接收器通道名称,绑定通道 agent.sinks.kafkaSink.channel = memoryChannel
# Each channel's type is defined. agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel # 通道中停留的最大事件数 agent.channels.memoryChannel.capacity = 100
###############################事件源source########################################### ## flume-kafka-source-1.9.0.jar # For each one of the sources, the type is defined # 使用kafka事件源 agent1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.kafkaSource.kafka.bootstrap.servers = localhost:9092 # 注意这里的topic配置是topics 我当初没有写s导致事件源没有启动 数据进不去 找了很长时间问题 agent1.sources.kafkaSource.kafka.topics = kafka-flume # 消费者组id agent1.sources.kafkaSource.kafka.consumer.group.id = kafka-flume-hdfs # 批量写入的最大数量 agent1.sources.kafkaSource.kafka.batchSize = 1000 # 批量写入最长等待时间 agent1.sources.kafkaSource.kafka.batchDurationMillis = 1000
# The channel can be defined as follows. # 事件源的通道,绑定通道 agent1.sources.kafkaSource.channels = kafka-channel