0%

flume扩展

flume扩展

flume可以自定义拦截器、自定义事件源、自定义接收器,可以通过自定义各个组件来进行flume的扩展

依赖

1
2
3
4
5
6
<!-- flume核心包 -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>

拦截器

Flume提供了拦截器功能,可以在源之后、接收器之前定义多个拦截器,可以通过拦截器在数据流入通道之前或数据流出通道之后对数据进行处理。

通过实现org.apache.flume.interceptor.Interceptor来自定义自己的拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {

}

/**
* 单个事件拦截
* @param event
* @return
*/
@Override
public Event intercept(Event event) {
// 获取头信息
Map<String,String> headers = event.getHeaders();

// 获取数据
String body = new String(event.getBody());

// 根据body内容来添加头信息
if(body.startsWith("number==")){
headers.put("type","num");
} else {
headers.put("type","str");
}

return event;
}

/**
* 批量事件拦截
* @param list
* @return
*/
@Override
public List<Event> intercept(List<Event> list) {
return null;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder{

@Override
public Interceptor build() {
return new MyInterceptor();
}

@Override
public void configure(Context context) {

}
}

}

可以搭配multiplexing channel选择器来根据头信息选择不同的通道

1
2
3
4
5
6
7
8
9
10
11
12
#拦截器名称
agent2.sources.mySource.interceptors = myInterceptor
# 配置拦截器
agent2.sources.mySource.interceptors.myInterceptor.type = com.zhanghe.study.custom_flume.interceptor.MyInterceptor$Builder

agent2.sources.mySource.channels = memoryChannel1 memoryChannel2
# 配置channel选择器
agent2.sources.mySource.selector.type = multiplexing
# 配置channel选择器的映射 根据header中的type来进行映射
agent2.sources.mySource.selector.header = type
agent2.sources.mySource.selector.mapping.num = memoryChannel1
agent2.sources.mySource.selector.mapping.str = memoryChannel2

自定义Source

自定义的Source需要继承AbstractSource,实现Configurable和PollableSource接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class MySource extends AbstractSource implements Configurable, PollableSource {

private String myProp;
private static volatile AtomicInteger count = new AtomicInteger(0);

// 获取配置信息
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");
this.myProp = myProp;
}

// 核心方法
// 1.接收数据
// 2.封装为事件
// 3.将事件传给channel
//
@Override
public Status process() throws EventDeliveryException {
Status status;
if(count.get() <= 5){
count.incrementAndGet();
try {
// 接收数据
Event e = new SimpleEvent();

e.setBody((myProp+"=="+"hello").getBytes());
// 将事件传给channel
getChannelProcessor().processEvent(e);

status = Status.READY;
} catch (Throwable t) {
status = Status.BACKOFF;

if (t instanceof Error) {
throw (Error)t;
}
}
} else {
status = Status.BACKOFF;
}
return status;
}

@Override
public long getBackOffSleepIncrement() {
return 0;
}

@Override
public long getMaxBackOffSleepInterval() {
return 0;
}

}
1
2
3
4
5
#事件源名称
agent2.sources = mySource

agent2.sources.mySource.type = com.zhanghe.study.custom_flume.source.MySource
agent2.sources.mySource.myProp = str

自定义Sink

自定义的Sink需要继承AbstractSink类,实现Configurable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class MySink extends AbstractSink implements Configurable {
private String myProp;

// 获取配置信息
@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");

this.myProp = myProp;
}

// 核心方法
@Override
public Status process() throws EventDeliveryException {
Status status = null;


// 获取Channel
Channel ch = getChannel();
// 开启事务
Transaction txn = ch.getTransaction();
txn.begin();
try {
// 从channel中获取事件
Event event = ch.take();
if(event != null){
LOGGER.info(myProp+"--"+new String(event.getBody()));
status = Status.READY;
} else {
status = Status.BACKOFF;
}

txn.commit();
} catch (Throwable t) {
// 事务回滚
txn.rollback();

status = Status.BACKOFF;

if (t instanceof Error) {
throw (Error)t;
}
} finally {
// 关闭事务
txn.close();
}
return status;
}
}
1
2
agent2.sinks.mySink.type = com.zhanghe.study.custom_flume.sink.MySink
agent2.sinks.mySink.myProp = mySink//

整体配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#事件源名称
agent2.sources = mySource
#通道名称
agent2.channels = memoryChannel1 memoryChannel2
#接收器名称
agent2.sinks = loggerSink mySink
#拦截器名称
agent2.sources.mySource.interceptors = myInterceptor

#source
agent2.sources.mySource.type = com.zhanghe.study.custom_flume.source.MySource
agent2.sources.mySource.myProp = str


# 配置拦截器
agent2.sources.mySource.interceptors.myInterceptor.type = com.zhanghe.study.custom_flume.interceptor.MyInterceptor$Builder

# sink
agent2.sinks.loggerSink.type = logger
agent2.sinks.mySink.type = com.zhanghe.study.custom_flume.sink.MySink
agent2.sinks.mySink.myProp = mySink//


#channel
agent2.channels.memoryChannel1.type = memory
agent2.channels.memoryChannel2.type = memory
# 通道中停留的最大事件数
agent2.channels.memoryChannel1.capacity = 100
agent2.channels.memoryChannel2.capacity = 100


agent2.sources.mySource.channels = memoryChannel1 memoryChannel2
# 配置channel选择器
agent2.sources.mySource.selector.type = multiplexing
# 配置channel选择器的映射 根据header中的type来进行映射
agent2.sources.mySource.selector.header = type
agent2.sources.mySource.selector.mapping.num = memoryChannel1
agent2.sources.mySource.selector.mapping.str = memoryChannel2
#Specify the channel the sink should use
# 接收器通道名称,绑定通道
agent2.sinks.loggerSink.channel = memoryChannel1
agent2.sinks.mySink.channel = memoryChannel2