PropertiesFileConfigurationProvider.java @Override public FlumeConfiguration getFlumeConfiguration() { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); Properties properties = new Properties(); properties.load(reader); return new FlumeConfiguration(toMap(properties)); } catch (IOException ex) { LOGGER.error("Unable to load file:" + file + " (I/O failure) - Exception follows.", ex); } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { LOGGER.warn( "Unable to close file reader for file: " + file, ex); } } } return new FlumeConfiguration(new HashMap<String, String>()); } private Map<String, String> toMap(Properties properties) { Map<String, String> result = Maps.newHashMap(); Enumeration<?> propertyNames = properties.propertyNames(); while (propertyNames.hasMoreElements()) { String name = (String) propertyNames.nextElement(); String value = properties.getProperty(name); result.put(name, value); } return result; } /** * Creates a populated Flume Configuration object. */ public FlumeConfiguration(Map<String, String> properties) { agentConfigMap = new HashMap<String, AgentConfiguration>(); errors = new LinkedList<FlumeConfigurationError>(); // Construct the in-memory component hierarchy for(String name : properties.keySet()) { String value = properties.get(name); if (!addRawProperty(name, value)) { logger.warn("Configuration property ignored: " + name + " = " + value); } } // Now iterate thru the agentContext and create agent configs and add them // to agentConfigMap // validate and remove improperly configured components validateConfiguration(); } private boolean addRawProperty(String name, String value) { // Null names and values not supported if (name == null || value == null) { errors .add(new FlumeConfigurationError("", "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } // Empty values are not supported if (value.trim().length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.PROPERTY_VALUE_NULL, ErrorOrWarning.ERROR)); return false; } // Remove leading and trailing spaces name = name.trim(); value = value.trim(); int index = name.indexOf('.'); // All configuration keys must have a prefix defined as agent name if (index == -1) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } String agentName = name.substring(0, index); // Agent name must be specified for all properties if (agentName.length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.AGENT_NAME_MISSING, ErrorOrWarning.ERROR)); return false; } String configKey = name.substring(index + 1); // Configuration key must be specified for every property if (configKey.length() == 0) { errors .add(new FlumeConfigurationError(name, "", FlumeConfigurationErrorType.PROPERTY_NAME_NULL, ErrorOrWarning.ERROR)); return false; } AgentConfiguration aconf = agentConfigMap.get(agentName); if (aconf == null) { aconf = new AgentConfiguration(agentName, errors); agentConfigMap.put(agentName, aconf); } // Each configuration key must begin with one of the three prefixes: // sources, sinks, or channels. return aconf.addProperty(configKey, value); }
相关推荐
flume采集日志存入MySQL,支持分库分表,动态加载配置文件-flume-mysql
ETL模块:加载原始数据,清洗,加工,为模型训练模块 和 推荐模块 准备所需的各种数据。 模型训练模块:负责产生模型,以及寻找最佳的模型。 推荐模块:包含离线推荐和实时推荐,离线推荐负责把推荐结果存储到存储...
ETL模块:加载原始数据,清洗,加工,为模型训练模块 和 推荐模块 准备所需的各种数据。 模型训练模块:负责产生模型,以及寻找最佳的模型。 推荐模块:包含离线推荐和实时推荐,离线推荐负责把推荐结果存储到存储...
01_自动批量加载数据到hive 02_Hive表批量加载数据的脚本实现(一) 03_Hive表批量加载数据的脚本实现(二) 04_HIve中的case when、cast及unix_timestamp的使用 05_复杂日志分析-需求分析 06_复杂日志分析-需求字段...
32_数仓采集_日志采集Flume配置分析.avi U6 j% Q4 F$ T6 U5 ^ 33_数仓采集_ETL拦截器.avi 34_数仓采集_分类型拦截器.avi! b5 ^. a8 ^; }$ x8 z) l2 U3 }" p 35_数仓采集_日志采集Flume启动停止脚本.avi2 ~/ r- J: h$...
后来读了flume中有关与kafka集成的代码(org.apache.flume.source.kafka.KafkaSource),觉得其中使用kafka Consumer的方式比较合理,可以实现消息的批处理,所以就参考并封装了一下,变得更加通用,形成了这个项目。...
MapReduce、Streaming、Hive、Pig、Spark、Flume、Kafka 的应用程序。 配置和运行 Sqoop 作业以在 Apache Hadoop 和结构化数据存储之间传输批量数据 脚本 HDFS/本地文件系统/FTP 操作 安装说明 需要 Python 2.7 或更...
技术点1 使用Flume 将系统日志文件导入HDFS 2.2.2 导入导出半结构化和二进制文件 技术点2 自动复制文件到HDFS 的机制 技术点3 使用Oozie 定期执行数据导入活动 2.2.3 从数据库中拉数据 技术点4 使用...
Hadoop分布式文件系统(HDFS)和MapReduce的工作原理 如何优化Hadoop机群所需要的硬件配置 搭建Hadoop机群所需要考虑的网络因素 如何利用Hadoop...Hadoop机群维护和监控 如何使用Flume从动态生成的文件加载数据到Hadoop
Flume⽇志采集,Sqoop数据库ETL(抽取、转换、加载),完成Hadoop系统组 件之间的互通,Hadoop系统组件当中数据和关系数据库当中数据之间相互导⼊导出(HDFS数据可以导⼊MySql当中去)。 MapReduce,Map函数和Reduce...
11.2.1 加载数据技术点67 加载Apache 日志文件11.2.2 过滤和投影技术点68 通过过滤和投影减少数据处理量11.2.3 分组和聚合UDF 技术点69 IP 地址的分组和计数 11.2.4 使用UDF 进行定位技术点70 使用...
第一部分 NoSQL入门 第1章 NoSQL的概念及适用范围 2 1.1 定义和介绍 3 1.1.1 背景与历史 3 1.1.2 大数据 5 1.1.3 可扩展性 7 1.1.4 MapReduce 8 1.2 面向列的有序存储 9 ...附录A 安装与配置 278
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...
AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...