`
侯上校
  • 浏览: 217954 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

flume加载配置

阅读更多
PropertiesFileConfigurationProvider.java
@Override
  public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      Properties properties = new Properties();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }


private Map<String, String> toMap(Properties properties) {
    Map<String, String> result = Maps.newHashMap();
    Enumeration<?> propertyNames = properties.propertyNames();
    while (propertyNames.hasMoreElements()) {
      String name = (String) propertyNames.nextElement();
      String value = properties.getProperty(name);
      result.put(name, value);
    }
    return result;
  }

  /**
   * Creates a populated Flume Configuration object.
   */
  public FlumeConfiguration(Map<String, String> properties) {
    agentConfigMap = new HashMap<String, AgentConfiguration>();
    errors = new LinkedList<FlumeConfigurationError>();
    // Construct the in-memory component hierarchy
    for(String name : properties.keySet()) {
      String value = properties.get(name);

      if (!addRawProperty(name, value)) {
        logger.warn("Configuration property ignored: " + name + " = " + value);
      }
    }
    // Now iterate thru the agentContext and create agent configs and add them
    // to agentConfigMap

    // validate and remove improperly configured components
    validateConfiguration();
  }

 private boolean addRawProperty(String name, String value) {
    // Null names and values not supported
    if (name == null || value == null) {
      errors
      .add(new FlumeConfigurationError("", "",
          FlumeConfigurationErrorType.AGENT_NAME_MISSING,
          ErrorOrWarning.ERROR));
      return false;
    }

    // Empty values are not supported
    if (value.trim().length() == 0) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
          ErrorOrWarning.ERROR));
      return false;
    }

    // Remove leading and trailing spaces
    name = name.trim();
    value = value.trim();

    int index = name.indexOf('.');

    // All configuration keys must have a prefix defined as agent name
    if (index == -1) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.AGENT_NAME_MISSING,
          ErrorOrWarning.ERROR));
      return false;
    }

    String agentName = name.substring(0, index);

    // Agent name must be specified for all properties
    if (agentName.length() == 0) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.AGENT_NAME_MISSING,
          ErrorOrWarning.ERROR));
      return false;
    }

    String configKey = name.substring(index + 1);

    // Configuration key must be specified for every property
    if (configKey.length() == 0) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
          ErrorOrWarning.ERROR));
      return false;
    }

    AgentConfiguration aconf = agentConfigMap.get(agentName);

    if (aconf == null) {
      aconf = new AgentConfiguration(agentName, errors);
      agentConfigMap.put(agentName, aconf);
    }

    // Each configuration key must begin with one of the three prefixes:
    // sources, sinks, or channels.
    return aconf.addProperty(configKey, value);
  }

 

分享到:
评论

相关推荐

    flume采集日志存入MySQL,支持分库分表,动态加载配置文件-flume-mysql.zip

    flume采集日志存入MySQL,支持分库分表,动态加载配置文件-flume-mysql

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    ETL模块:加载原始数据,清洗,加工,为模型训练模块 和 推荐模块 准备所需的各种数据。 模型训练模块:负责产生模型,以及寻找最佳的模型。 推荐模块:包含离线推荐和实时推荐,离线推荐负责把推荐结果存储到存储...

    基于Flume&spark&Flask的分布式实时日志分析与入侵检测系统.zip

    ETL模块:加载原始数据,清洗,加工,为模型训练模块 和 推荐模块 准备所需的各种数据。 模型训练模块:负责产生模型,以及寻找最佳的模型。 推荐模块:包含离线推荐和实时推荐,离线推荐负责把推荐结果存储到存储...

    利用Hive进行复杂用户行为大数据分析及优化案例

    01_自动批量加载数据到hive 02_Hive表批量加载数据的脚本实现(一) 03_Hive表批量加载数据的脚本实现(二) 04_HIve中的case when、cast及unix_timestamp的使用 05_复杂日志分析-需求分析 06_复杂日志分析-需求字段...

    尚gg大数据项目实战电商数仓系统开发教程.txt

    32_数仓采集_日志采集Flume配置分析.avi U6 j% Q4 F$ T6 U5 ^ 33_数仓采集_ETL拦截器.avi 34_数仓采集_分类型拦截器.avi! b5 ^. a8 ^; }$ x8 z) l2 U3 }" p 35_数仓采集_日志采集Flume启动停止脚本.avi2 ~/ r- J: h$...

    kafka-effective:更有效地使用apache kafka消费者和生产者

    后来读了flume中有关与kafka集成的代码(org.apache.flume.source.kafka.KafkaSource),觉得其中使用kafka Consumer的方式比较合理,可以实现消息的批处理,所以就参考并封装了一下,变得更加通用,形成了这个项目。...

    java连接sqoop源码-Merlin:标准化的大数据ETL框架

    MapReduce、Streaming、Hive、Pig、Spark、Flume、Kafka 的应用程序。 配置和运行 Sqoop 作业以在 Apache Hadoop 和结构化数据存储之间传输批量数据 脚本 HDFS/本地文件系统/FTP 操作 安装说明 需要 Python 2.7 或更...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点1 使用Flume 将系统日志文件导入HDFS 2.2.2 导入导出半结构化和二进制文件 技术点2 自动复制文件到HDFS 的机制 技术点3 使用Oozie 定期执行数据导入活动 2.2.3 从数据库中拉数据 技术点4 使用...

    Hadoop大数据从入门到精通

    Hadoop分布式文件系统(HDFS)和MapReduce的工作原理 如何优化Hadoop机群所需要的硬件配置 搭建Hadoop机群所需要考虑的网络因素 如何利用Hadoop...Hadoop机群维护和监控 如何使用Flume从动态生成的文件加载数据到Hadoop

    大数据技术概述.pdf

    Flume⽇志采集,Sqoop数据库ETL(抽取、转换、加载),完成Hadoop系统组 件之间的互通,Hadoop系统组件当中数据和关系数据库当中数据之间相互导⼊导出(HDFS数据可以导⼊MySql当中去)。 MapReduce,Map函数和Reduce...

    Hadoop实战(第2版)

    11.2.1 加载数据技术点67 加载Apache 日志文件11.2.2 过滤和投影技术点68 通过过滤和投影减少数据处理量11.2.3 分组和聚合UDF 技术点69 IP 地址的分组和计数 11.2.4 使用UDF 进行定位技术点70 使用...

    nosql 入门教程

    第一部分 NoSQL入门 第1章 NoSQL的概念及适用范围 2 1.1 定义和介绍 3 1.1.1 背景与历史 3 1.1.2 大数据 5 1.1.3 可扩展性 7 1.1.4 MapReduce 8 1.2 面向列的有序存储 9 ...附录A 安装与配置 278

    java开源包1

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包11

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包2

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包3

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包6

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包5

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包10

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

    java开源包4

    AutoTips是为解决应用系统对于【自动提示】的需要(如:Google搜索), 而开发的架构无关的公共控件, 以满足该类需求可以通过快速配置来开发。AutoTips基于搜索引擎Apache Lucene实现。AutoTips提供统一UI。 WAP浏览器...

Global site tag (gtag.js) - Google Analytics