转自:http://blog.csdn.net/yujimoyouran/article/details/59104131
简单描述一下这个例子:将项目日志实时采集到elasticsearch,便于统一管理。
1. 收集日志格式为:
log4j.properties : org.apache.log4j.Logger: %d{ISO8601} [%l-%M]-[%p] %t %m%n
logback.xml :org.slf4j.Logger: %date [%logger:%L]-[%level] %thread %msg%n
2. flume 1.7.0 (flume搭建只需解压配置jdk即可,官网教程都有案例,http://flume.apache.org/FlumeUserGuide.html)
source:Taildir Source
channel: File Channel
sinks:ElasticSearchSink
3. elasticsearch1.7.5
Flume搭建:
1. 下载flume安装包并解压apache-flume-1.7.0-bin.tar.gz, http://flume.apache.org/download.html
2. 创建软连接 ln -s apache-flume-1.7.0-bin flume
3. 进入conf目录,cp flume-env.sh.template flume-env.sh
4. 修改jdk,vi flume-env.sh
5. 将elasticsearch lib中的elasticsearch-1.7.5.jar和lucene相关jar包拷贝到flume lib中
6. 创建一些下面需要的目录(不同的source,channel,sink需要的目录不一样,用时看官网即可)
mkdir flume/conf/es (用来存放flume配置文件,并新建文件data-es.conf )
mkdir flume/tmp (Taildir Source生成的positionFile文件目录)
mkdir -p flume/file-channel/checkpoint (File Channel检查点写入间隔)
mkdir -p flume/file-channel/data (File Channel数据存放目录)
mkdir flume/test (存放测试数据)
Taildir Source配置,直接上data-es.conf截图:
File Channel配置:
ElasticSearchSink 配置:
接下来主要讲解一下上图标红的地方,就是flume自定义Serializer
首先大体说一下为什么需要自定义Seralizer,一是也许项目中的log4j日志我们只关心程序员自己输出的日志信息,而不需要log4j其他信息;二是项目日志一般都会统一格式,自定义的格式也许flume不能很精准的拆分。
这里定义日志格式为:
2017-02-27 13:57:19,218 [com.data.test]-[INFO] main {sysName###测试项目@@@status###正在初始化@@@info###开始执行了}
使用正则拆分日志:([\d- :,]{23}) (\[.+?\])-(\[[A-Z]+?\]) (.*) ([{].+}$)
拆分结构:
下载源码,只需导入flume-ng-elasticsearch-sink模块即可:
源码不做详细讲解 (因为我还没来的及仔细研究,只是了解大概照猫画虎)
一. 修改的地方是:通过ElasticSearchSink 传入的数据会按照 “prefix-yyyy-MM-dd” 每天创建index,如果数据量不大的话有点浪费,现在只想按照“prefix-yyyy-MM”每月创建index,修改org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder
二. 仿照org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder.ElasticSearchDynamicSerializer创建自己的Search,其中flume也是封装了es的api操作, Event对象就是你获取到的每条日志,然后按照你自己的逻辑将日志拆分即可,简单的几个截图吧:
三. 因为只要程序员自己输出日志,所以要将正则不匹配的日志过滤掉。修改org.apache.flume.sink.elasticsearch.ElasticSearchSink, 标红为自己修改的地方
最后maven打包替换到flume lib下面相应jar包即可
算是写完了吧。。。
相关推荐
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目源代码+文档说明。项目架构: 主要是基于Flume+Kafka+Sparkstreaming +HBase+ES来实现实时的用户信息存储轨迹查询任务。 含有代码注释,满分...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...
agent.sinks.es.serializer = org.apache.flume.sink.elasticsearch.serializer.JsonEventSerializer ``` 为了确保数据完整性和一致性,可以配置`batchSize`参数来控制每次批量写入Elasticsearch的事件数量。同时,...
在本项目中,我们将搭建一个基于Linux环境的实时数据流处理系统,主要涉及Flume 1.8、Elasticsearch 6.2.4和Kibana 6.2.4。这个组合常用于大数据日志收集、存储和分析。以下是详细步骤: 一、**Elasticsearch安装与...
5. **Elasticsearch**: Elasticsearch 是一个基于 Lucene 的分布式搜索引擎,用于实时、高效地搜索和分析大量数据。在本项目中,Elasticsearch 可能用于对用户轨迹数据进行全文检索和分析,支持快速的实时查询,便于...
标题 "flume1.6+es5.5.1" 暗示我们要讨论的是Apache Flume 1.6版本与Elasticsearch (ES) 5.5.1版本的集成。Flume是Apache Hadoop项目的一个子项目,主要用于收集、聚合和移动大量日志数据。而Elasticsearch是一种...
基于flume+kafka+HBase+spark+ElasticSearch的用户轨迹查询大数据开发项目 项目名称:实时的用户轨迹查询项目 项目介绍: 利用企业建设的WIFI基站,实时采集用户的信息,可以基于这些信息做用户画像处理,网络安全...
项目工程资源经过严格测试可直接运行成功且功能正常的情况才上传,可轻松复刻,拿到资料包后可轻松复现出一样的项目,本人系统开发经验充足(全领域),有任何使用问题欢迎随时与我联系,我会及时为您解惑,提供帮助...
flume1.9采集数据入存入elasticsearch6.2.4,flume1.9本身只支持低版本的elasticsearch,基于apache-flume-1.9.0-src的flume-ng-sinks/flume-ng-elasticsearch-sink源码修改,支持es6.2.4,打的包,直接替换flume/...
log-collector.sinks.es_sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink log-collector.sinks.es_sink.hostname = localhost log-collector.sinks.es_sink.port = 9200 log-collector....
而Flume-ES5.X是Flume的一个版本,它特别针对Elasticsearch 5.x进行了优化,使得数据能够无缝流入Elasticsearch进行存储和分析。本文将深入探讨Flume-ES5.X的依赖关系,并详细介绍如何自定义Sink以满足特定需求。 ...
在Flume中的ElasticSearchSink支持Flume与Elasticsearch整合,可以将Flume采集的数据传输到Elasticsearch中,Flume版本:1.8.0,ElasticSearchSink版本:6.2.4
这里,`esSink`将从`channel1`获取数据,并将其发送到Elasticsearch,索引名为`flume_log`,类型为`log`,而`localhost:9200`是Elasticsearch服务器的地址。 3. **Elasticsearch 6.5.4兼容性** `flume-ng-elastic...
flume1.8采集数据入存入elasticsearch5.3.0,flume1.8本身基于elasticsearch0.9版本,修改源码,支持es5.3,打的包,直接替换flume/lib下的elasticsearch-sink即可。