flume是一个高可靠性的分布式的大文件收集系统。它提供了transaction来保证数据不会丢失。
flume官网:http://flume.apache.org/
Flume文档:http://flume.apache.org/FlumeUserGuide.html,http://flume.apache.org/FlumeDeveloperGuide.html
安装:从官网下载flume,然后解压
启动:nohup bin/flume-ng agent --conf <conf_file_path> --conf-file <conf_file> --name <agent_name> -Dflume.root.logger=DEBUG,console &
Flume主要包含三部分:source,channel,sink. source 用于接收数据,channel是一个缓冲通道,sink发送数据到目的端。source可以配置多个channel。channel可以通过channelSelect来选择发往那个channel。可以配置往每个channel发送,也可以配置一个参数,当满足特定值时,发往某个channel。每个channel可以配置多个sink。通过sinkprocess来做load balance,或者failover。
flume-ng命令会调用Application的main函数,如果需要reload configure 文件,则注册application到eventBus中,当文件变更时,调用application的handleConfigurationEvent方法
public static void main(String[] args) { Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); } application.start(); }
application中的start方法会调用supervisor.supervise(),这个方法会尝试调用component的start方法,component列表中包含了PollingPropertiesFileConfigurationProvider对象,这个对象的start方法启动了一个线程来监控文件的变更,初始状态文件是变更的,接着就会调用application的handleConfigurationEvent方法
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
在 handleConfigurationEvent中先调用 PropertiesFileConfigurationProvider的getConfiguration方法,这个方法通过配置文件创建了source,sink,channel,并调用了各个组件的configure方法,然后 调用了startAllComponents方法,启动了channel,source,sink,并且加载了monitor,用于监控flume的metrics
private void startAllComponents(MaterializedConfiguration materializedConfiguration) { logger.info("Starting new configuration:{}", materializedConfiguration); this.materializedConfiguration = materializedConfiguration; for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) { try{ logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e){ logger.error("Error while starting {}", entry.getValue(), e); } } /* * Wait for all channels to start. */ for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START && !supervisor.isComponentInErrorState(ch)){ try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); } } } for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners() .entrySet()) { try{ logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } for (Entry<String, SourceRunner> entry : materializedConfiguration .getSourceRunners().entrySet()) { try{ logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); } } this.loadMonitoring(); }
相关推荐
20-Flume副本机制channel选择器-需求分析.avi 21-Flume副本机制channel选择器-配置信息.avi 22-Flume副本机制channel选择器-案例测试.avi 25-Flume负载均衡案例-案例实操.avi 27-Flume聚合案例-案例实操.avi 29-...
经过对Flume FileChannel相关源码的分析,导致FileChannel吞吐率下降的主要原因集中于事务的提交过程——commit
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...
各领域数据集,工具源码,适合毕业设计、课程设计作业,所有源码均经过严格测试,可以直接运行,可以放心下载使用。有任何使用问题欢迎随时与博主沟通,第一时间进行解答! 软件开发设计:PHP、QT、应用软件开发、...
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子...基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分为控制台版本和Web UI可视化版本).zip
基于Flume+spark+Flask的分布式实时日志分析与入侵检测系统.zip 1、该资源内项目代码经过严格调试,下载即用确保可以运行! 2、该资源适合计算机相关专业(如计科、人工智能、大数据、数学、电子信息等)正在做课程...
毕业设计,课程设计,项目源码均经过助教老师测试,运行无误,欢迎下载交流 ----- 下载后请首先打开README.md文件(如有)
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和...基于Flume + Kafka + Spark的电商实时访问日志分析系统源码+项目说明.zip
基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统.zip 本资源中的源码都是经过本地编译过可运行的,下载后按照文档配置好环境就可以运行。资源项目源码系统完整,内容都是经过专业老师审定过的,能够满足...
基于Hadoop网站流量日志数据分析系统项目源码+教程.zip网站流量日志数据分析系统 典型的离线流数据分析系统 技术分析 hadoop nginx flume hive sqoop mysql springboot+mybatisplus+vcharts 基于Hadoop网站流量日志...
1、资源内容:基于spark streaming+flume+kafka+hbase的实时日志处理分析系统(分为控制台版本和基于springboot、Echarts等的Web UI可视化版本) 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便...
【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。 2、适用人群:主要针对计算机相关专业(如计科、信息安全、数据科学与大数据技术、人工智能、通信、物联网、...
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip ...
项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...
项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...
毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...