对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。
官方上提供的已有的拦截器有:
Timestamp Interceptor
Host Interceptor
Static Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor
像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。
Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到
Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。
Static Interceptor:可以在event的header中添加自定义的key和value。
Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。
Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分
下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:
public class WriteLog {
protected static final Log logger = LogFactory.getLog(WriteLog.class);
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
while (true) {
logger.info(new Date().getTime());
logger.info("{\"requestTime\":"
+ System.currentTimeMillis()
+ ",\"requestParams\":{\"timestamp\":1405499314238,\"phone\":\"02038824941\",\"cardName\":\"测试商家名称\",\"provinceCode\":\"440000\",\"cityCode\":\"440106\"},\"requestUrl\":\"/reporter-api/reporter/reporter12/init.do\"}");
Thread.sleep(2000);
}
}
}
又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。
接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。
修改后的flume.conf如下:
tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1
tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1
tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp
tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactionCapacity=1000
tier1.channels.channel1.keep-alive=30
tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d
tier1.sinks.sink1.hdfs.fileType=DataStream
tier1.sinks.sink1.hdfs.writeFormat=Text
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollSize=10240
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.idleTimeout=60
我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。
然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。
分享到:
相关推荐
Flume1.6.0入门:安装、部署、及flume的案例
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可...
让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...
flume-ng安装
尚硅谷大数据技术之Flume
flume官网下载太慢,请从这里下载,次文件是官方网站的1.8版本,也就是支持jdk1.8的,不支持jdk1.7,如果要支持jdk1.7的,请下载我的资源里面也有,flume1.7,
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家多学习基础。 流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无...
flume集群环境搭建,详细讲解,图文并茂,包括flume信息监控和众多文章链接
基于flume+kafka+实时计算引擎(storm,spark,flink)的实时计算框架目前是比较火的一个分支,在实时数据采集组件中flume扮演着极为重要角色,logtash是ELK的重要组件部分,filebeat也是一个实时采集工具;
Apache Flume 是一个分布式、高可靠、高可用的用来收集、聚合、转移不同来源的大量日志数据到中央数据仓库的工具 Apache Flume是Apache软件基金会(ASF)的顶级项目 Event是Flume定义的一个数据流传输的最小单元。...
Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
01_Flume的介绍及其架构组成 02_Flume的安装部署 03_Flume的测试运行 04_Flume中配置使用file channel及HDFS sink 05_Flume中配置HDFS文件生成大小及时间分区 06_Flume中配置Spooling Dir的使用 07_Flume中...
由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。
CDH版本的flume Flume是Cloudera提供的一个高可用的,高可靠...当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。
Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS 11111
flume支持RabbitMQ插件
Flume学习文档(2){Flume安装部署、Flume配置文件}。 记录我的学习之旅,每份文档倾心倾力,带我成我大牛,回头观望满脸笑意,望大家多多给予意见,有问题或错误,请联系 我将及时改正;借鉴文章标明出处,谢谢