`
qianshangding
  • 浏览: 124366 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Flume之ChannelProcessor源码分析

阅读更多
接着上篇:Flume之ChannelSelector源码分析
ChannelSelector主要是根据Event选择将其发送到哪些Channel。ChannelProcessor是通过ChannelSelector获取到Channels后,如何发送Event到Channel。

ChannelProcessor提供了将Source接收到的Events放入到Channels的一些方法,如果写入Channels发生错误,统一抛出ChannelException异常。每个ChannelProcessor实例都有一个ChannelSelector实例(ChannelSelector实例分别维护了可选的Channels列表和必选的Channels列表)和一个拦截链。


一,configure(Context context)加载配置文件,该方法主要是用来加载拦截器:

  private void configureInterceptors(Context context) {

    List<Interceptor> interceptors = Lists.newLinkedList();
    //获取拦截器
    String interceptorListStr = context.getString("interceptors", "");
    if (interceptorListStr.isEmpty()) {
      return;
    }
    //解析成拦截器名的数组
    String[] interceptorNames = interceptorListStr.split("\\s+");

    //获取interceptors的Context
    Context interceptorContexts =
        new Context(context.getSubProperties("interceptors."));

    // run through and instantiate all the interceptors specified in the Context
    InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
    for (String interceptorName : interceptorNames) {
      Context interceptorContext = new Context(
          interceptorContexts.getSubProperties(interceptorName + "."));
      //得到拦截器的类型,Flume支持TIMESTAMP, HOST, STATIC, REGEX_FILTER, REGEX_EXTRACTOR, SEARCH_REPLACE
      //定义在org.apache.flume.interceptor.InterceptorType类中。
      String type = interceptorContext.getString("type");
      if (type == null) {
        LOG.error("Type not specified for interceptor " + interceptorName);
        throw new FlumeException("Interceptor.Type not specified for " +
          interceptorName);
      }
      try {
        //实例化拦截器,并存放到List中
        Interceptor.Builder builder = factory.newInstance(type);
        builder.configure(interceptorContext);
        interceptors.add(builder.build());
      } catch (ClassNotFoundException e) {
        LOG.error("Builder class not found. Exception follows.", e);
        throw new FlumeException("Interceptor.Builder not found.", e);
      } catch (InstantiationException e) {
        LOG.error("Could not instantiate Builder. Exception follows.", e);
        throw new FlumeException("Interceptor.Builder not constructable.", e);
      } catch (IllegalAccessException e) {
        LOG.error("Unable to access Builder. Exception follows.", e);
        throw new FlumeException("Unable to access Interceptor.Builder.", e);
      }
    }
    //将拦截器List设置到拦截链中。
    interceptorChain.setInterceptors(interceptors);
  }
二,configure方法已经初始化拦截链,接下来org.apache.flume.source.EventDrivenSourceRunner的start方法调用initialize()初始化拦截链并启动Source:

  @Override
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    //初始化ChannelProcessor,实际上就是初始化拦截链的所有的拦截器
    cp.initialize();
    //启动Source
    source.start();
    lifecycleState = LifecycleState.START;
  }

三,提交Event

ChannelProcessor提交Event有两个方法processEvent(提交单个Event)和processEventBatch(批量提交Event),下面以processEventBatch为例:

  public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
    //根据拦截链挨个处理Event
    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    //将Event分类,放入reqChannelQueue 和 optChannelQueue中
    for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch: optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }
    //提交Event到必选的Channel
    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);

        for (Event event : batch) {
          reqChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }

    //提交Event到可选的Channel
    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch ) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }
以上批量批量提交Event的流程:

1,根据配置的拦截器组成的拦截链挨个处理Event

2,根据Event获取必选和可选的Channel,然后分别根据可选和必选的Channel,将Event和Channel存放在以Channel为Key,Event列表为Value的Map中。

3,遍历可选和必选的Map,通过Channel的Transaction将Event commit到Channel中。

看下可选和必选map提交Channel异常处理有什么区别:

(1)必选Map的异常处理

        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          //抛出ChannelException异常,Source可以通过该异常保证数据不丢失的
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
(2)可选Map的异常处理
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          //只有在异常是Error才会抛出异常
          throw (Error) t;
        }
也就是说,只有异常为java.lang.Error类实例,可选和必选Channel才会回滚,不然可选提交到Channel后失败是不会回滚的。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics