ExecSource会readLine()
读取日志中的每一行,把其作为每一个flume event的body放进去,这对于大部分这种每行就可以结束的日志记录,是完全可以的:
1 2 |
2016-03-18 17:53:48,374 INFO namenode.FSNamesystem (FSNamesystem.java:listCorruptFileBlocks(7217)) - there are no corrupt file blocks. 2016-03-18 17:53:48,278 INFO namenode.FSNamesystem (FSNamesystem.java:listCorruptFileBlocks(7217)) - there are no corrupt file blocks. |
但是,对于有stacktrace
的ERROR
日志记录,如果把一行的内容当作一个flume event会有很大的问题,直观上来看,肯定需要把若干行看作是一个flume event,比如下面这样的日志记录,要作为一个flume event,而不是27个(一共27行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2016-03-18 17:53:40,278 ERROR [HiveServer2-Handler-Pool: Thread-26]: Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:268) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) ... 10 more |
我这里的实现方式是:识别每行的开头部分,如果满足某种条件,就当作一条日志,否则,视作是上一条的日志的一部分。
比如:
对于上面举的例子来说(即符合标准log4j的日志),如果每一行开头满足下面这条正则表达式:
1
|
\s?\d\d\d\d-\d\d-\d\d\s\d\d:\d\d:\d\d,\d\d\d
|
就当作一条新的日志,如果不满足,就说明该行内容是上一条日志(已规定格式开头的那条)的一部分。
当然,我增加了可以自定义配置以哪种方式开头视为一条日志的regex配置,可以对不通的source进行不通的配置,已满足要求。
有了这样的约束,就可以写出将某些多行看作一个flume event的ExecSource,我把它开源到了github上,如有兴趣,欢迎前去试用,如有任何建议,欢迎提出与指正:MultiLineExecSource
1
|
github.com/qwurey/flume-source-multiline
|
该版本基于flume-ng-core 1.6.0
转自:http://blog.csdn.net/asia_kobe/article/details/51003173
相关推荐
由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。
flume抽取数据库数据的源码,可以自动检测数据库的sql语句是否更新
Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。
flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
Flume自定义Source,数据不丢失,一致性,可以根据自己开发情况选择
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
一个简单的工程,已经设置各种配置,直接只用maven打包好就可以在flume工程包新建plugins.d/custom/lib 目录,并拷贝到下面,并将工程里面的conf文件拷贝到flume的conf目录下启动命令 nohup flume-ng agent -n ...
flume-ng-sql-source 该项目用于与sql数据库进行通信 当前支持SQL数据库引擎 在最后一次更新之后,该代码已与hibernate集成在一起,因此该技术支持的所有数据库均应正常工作。 编译与包装 $ mvn package 部署方式 ...
flume-ng-sql-source-1.5.2源码
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
flume-ng从数据库抽取数据到kafka,支持按数据库中时间字段,准实时抽取实时数据。已经在oracle-kafka中长期测试可用
Flume NG SQS 插件 该项目提供了一个源插件,用于从 Amazon 的简单队列服务 ( ) 中提取消息,这是一个...sudo cp target/flume-sqs-source-1.0.0.jar /usr/lib/flume-ng/plugins.d/flume-sqs-source/lib/ 并将 AWS Jav
改了了flume的sqlsource的源码,直接可以根据时间做增量,解决了之前一定要使用递增主键的增量方式,可以使用任意字段做增量,使用起来更方便。
flume连接数据库
flume-ng extends source jar flume-ng-extends-source-0.8.0.jar