Thrift IDL
Flume Thrift IDL在client包里面,定义如下:
namespace java org.apache.flume.thrift
struct ThriftFlumeEvent {
1: required map <string, string> headers,
2: required binary body,
}
enum Status {
OK,
FAILED,
ERROR,
UNKNOWN
}
service ThriftSourceProtocol {
Status append(1: ThriftFlumeEvent event),
Status appendBatch(1: list<ThriftFlumeEvent> events),
}注意:event在C#里面是关键字,所以利用Thrift编译器生成客户端的接口时,要把所有event关键字改成其他的、比如events.
Thrift Service
Flume的Source分两种:
?实现PollableSource接口
通过SinkRunner管理Source
?实现EventDrivenSource接口
可以自己接受数据、发送到channel。比如ThriftSource
Flume Thrift Service的实现类在core包
public class ThriftSource extends AbstractSource implements Configurable,
EventDrivenSource {
public static final String CONFIG_THREADS = "threads";
public static final String CONFIG_BIND = "bind";
public static final String CONFIG_PORT = "port";
private Integer port;
private String bindAddress;
private int maxThreads = 0;
private SourceCounter sourceCounter;
private TServer server;
private TServerTransport serverTransport;
private ExecutorService servingExecutor;
public void start() {
//创建工作者线程池
...
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
//ThriftSourceProtocol是Flume Thrift Service的真正实现
args.processor(new ThriftSourceProtocol
.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
/**
* Start serving.
*/
servingExecutor.submit(new Runnable() {
@Override
public void run() {
server.serve();
}
});
...
}Flume Thrift Service真正的实现类是内部类ThriftSourceHandler
private class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
@Override
public Status append(ThriftFlumeEvent event) throws TException {
Event flumeEvent = EventBuilder.withBody(event.getBody(),
event.getHeaders());
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
try {
//传给channel
getChannelProcessor().processEvent(flumeEvent);
} catch (ChannelException ex) {
logger.warn("Thrift source " + getName() + " could not append events " +
"to the channel.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
try {
getChannelProcessor().processEventBatch(flumeEvents);
} catch (ChannelException ex) {
logger.warn("Thrift source %s could not append events to the " +
"channel.", getName());
return Status.FAILED;
}
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
return Status.OK;
}
}
分享到:
相关推荐
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可
flume-ng-sql-source-1.5.2源码
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume-1.5.0-cdh5.3.6。 大数据日志收集工具 flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume连接数据库
flume-ng安装
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
flume-ng-sql-source实现oracle增量数据读取 flume连接oracle增量数据读取
flume-ng-1.6.0-cdh5.5.0.tar.gz
Flume读取mysql配置,导入即可用,详细参考https://github.com/keedio/flume-ng-sql-source/tree/release/1.5.1
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume实现oracle增量数据读取github最新版1.4.4,Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能
flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可