flume扩展
flume可以自定义拦截器、自定义事件源、自定义接收器,可以通过自定义各个组件来进行flume的扩展
依赖
1 2 3 4 5 6
| <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency>
|
拦截器
Flume提供了拦截器功能,可以在源之后、接收器之前定义多个拦截器,可以通过拦截器在数据流入通道之前或数据流出通道之后对数据进行处理。
通过实现org.apache.flume.interceptor.Interceptor来自定义自己的拦截器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public class MyInterceptor implements Interceptor { @Override public void initialize() {
}
@Override public Event intercept(Event event) { Map<String,String> headers = event.getHeaders();
String body = new String(event.getBody());
if(body.startsWith("number==")){ headers.put("type","num"); } else { headers.put("type","str"); }
return event; }
@Override public List<Event> intercept(List<Event> list) { return null; }
@Override public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override public Interceptor build() { return new MyInterceptor(); }
@Override public void configure(Context context) {
} }
}
|
可以搭配multiplexing channel选择器来根据头信息选择不同的通道
1 2 3 4 5 6 7 8 9 10 11 12
| agent2.sources.mySource.interceptors = myInterceptor
agent2.sources.mySource.interceptors.myInterceptor.type = com.zhanghe.study.custom_flume.interceptor.MyInterceptor$Builder
agent2.sources.mySource.channels = memoryChannel1 memoryChannel2
agent2.sources.mySource.selector.type = multiplexing
agent2.sources.mySource.selector.header = type agent2.sources.mySource.selector.mapping.num = memoryChannel1 agent2.sources.mySource.selector.mapping.str = memoryChannel2
|
自定义Source
自定义的Source需要继承AbstractSource,实现Configurable和PollableSource接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp; private static volatile AtomicInteger count = new AtomicInteger(0);
@Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); this.myProp = myProp; }
@Override public Status process() throws EventDeliveryException { Status status; if(count.get() <= 5){ count.incrementAndGet(); try { Event e = new SimpleEvent();
e.setBody((myProp+"=="+"hello").getBytes()); getChannelProcessor().processEvent(e);
status = Status.READY; } catch (Throwable t) { status = Status.BACKOFF;
if (t instanceof Error) { throw (Error)t; } } } else { status = Status.BACKOFF; } return status; }
@Override public long getBackOffSleepIncrement() { return 0; }
@Override public long getMaxBackOffSleepInterval() { return 0; }
}
|
1 2 3 4 5
| agent2.sources = mySource
agent2.sources.mySource.type = com.zhanghe.study.custom_flume.source.MySource agent2.sources.mySource.myProp = str
|
自定义Sink
自定义的Sink需要继承AbstractSink类,实现Configurable接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class MySink extends AbstractSink implements Configurable { private String myProp;
@Override public void configure(Context context) { String myProp = context.getString("myProp", "defaultValue"); this.myProp = myProp; }
@Override public Status process() throws EventDeliveryException { Status status = null;
Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if(event != null){ LOGGER.info(myProp+"--"+new String(event.getBody())); status = Status.READY; } else { status = Status.BACKOFF; }
txn.commit(); } catch (Throwable t) { txn.rollback();
status = Status.BACKOFF;
if (t instanceof Error) { throw (Error)t; } } finally { txn.close(); } return status; } }
|
1 2
| agent2.sinks.mySink.type = com.zhanghe.study.custom_flume.sink.MySink agent2.sinks.mySink.myProp = mySink//
|
整体配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| agent2.sources = mySource
agent2.channels = memoryChannel1 memoryChannel2
agent2.sinks = loggerSink mySink
agent2.sources.mySource.interceptors = myInterceptor
agent2.sources.mySource.type = com.zhanghe.study.custom_flume.source.MySource agent2.sources.mySource.myProp = str
agent2.sources.mySource.interceptors.myInterceptor.type = com.zhanghe.study.custom_flume.interceptor.MyInterceptor$Builder
agent2.sinks.loggerSink.type = logger agent2.sinks.mySink.type = com.zhanghe.study.custom_flume.sink.MySink agent2.sinks.mySink.myProp = mySink//
agent2.channels.memoryChannel1.type = memory agent2.channels.memoryChannel2.type = memory
agent2.channels.memoryChannel1.capacity = 100 agent2.channels.memoryChannel2.capacity = 100
agent2.sources.mySource.channels = memoryChannel1 memoryChannel2
agent2.sources.mySource.selector.type = multiplexing
agent2.sources.mySource.selector.header = type agent2.sources.mySource.selector.mapping.num = memoryChannel1 agent2.sources.mySource.selector.mapping.str = memoryChannel2
agent2.sinks.loggerSink.channel = memoryChannel1 agent2.sinks.mySink.channel = memoryChannel2
|