`
manzhizhen
  • 浏览: 290015 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Flume快速入门(四):File Channel之FlumeEventQueue

阅读更多

       虽然我们把FlumeEventQueue想象成Event指针的内存队列,但FlumeEventQueue中的内部实现是很绕的,不跑跑Flume的单元测试,很容易看晕。本文的目的就是通过简化模型来剖析FlumeEventQueue中的四种操作:addTail、removeHead、addHeadremove

       上一篇博文地址:http://manzhizhen.iteye.com/admin/categories/357759

       FlumeEventQueue中(准确的说应该是EventQueueBackingStore中)有queueSizequeueHead两个int属性,用来保存队列的长度和取元素时的索引(队列ADT中是从头取元素,从尾放元素)。这两个属性的初始值都是0。还有一个Map类型的overwriteMap属性(准确来说是EventQueueBackingStoreFile中的属性),它就是队列的容器了,它里面存放了所有未被消费的Event指针(准确来说不是所有的,具体细节请看Flume快速入门(三))。当然,还有个int型的队列容量属性capacity(准确来说是EventQueueBackingStore中的),有了对capacity的判断,queueHead在自增的过程中就不会过界了。我们接下来仔细看看FlumeEventQueue是如何通过如上几种属性来制造队列行为的。

 

操作名称:addTail

使用场景:Source将Event写入Channel。

具体操作(伪代码):

       1. index = queueSize;

       2. queueSize++;

       3. put Event to overwriteMap at key is [queueHead+index];

 

操作名称:removeHead

使用场景:Sink从Channel取Event。

具体操作(伪代码):

       1. index = 0;

       2. take Event from overwriteMap at key is [queueHead] ;

       3. set 0 to overwriteMap at key is [queueHead];

       4. queueHead++;

       5. queueSize--;

 

操作名称:addHead

使用场景:Sink从Channel取Event失败,将Event回滚到FlumeEventQueue中

具体操作(伪代码):

       1. index = 0;

       2. queueSize++;

       3. queueHead-- ;

       4. set Event to overwriteMap at key is [queueHead];

 

操作名称:remove

使用场景:当从故障中恢复数据时,需要从log file中加载数据到FlumeEventQueue,每加载一个Event需要将FlumeEventQueue中可能存在的相同的Event删除

具体操作(伪代码):

       1. 遍历overwriteMap,找到该Event对应的index;

       2. 既然已经决定删除index上的Event,则为了保证队列的元素的连续性,需要对部分Event位移,为了位移最少数量的Event,当index大于queueSize的一半,将index右边的元素向左(队尾)移动,否则则将index左边的元素向右(队头)移动;

       3. 如果是左移,则set 0 to overwriteMap at key is [queueHead+queueSize-1] ;如果是右移,则set 0 to overwriteMap at key is [queueHead],并且queueHead++;

       4. queueSize--;

 

       我们已经给出了上面四种操作的详细步骤,当然,为了防止queueHead在自增或自减中越界,它会随时和0capacity来做比较来使queueHead在一个首尾相连的环中移动,但具体映射到overwriteMapkey是根据EventQueueBackingStoreFile#getPhysicalIndex方法来的:

 

  protected int getPhysicalIndex(int index) {
    return HEADER_SIZE + (getHead() + index) % getCapacity();
  }

 其中HEADER_SIZE的值是比1024稍大点的质数1029,而getHead()获取的就是queueHead.

       如果你了解了FlumeEventQueue的这四种操作,就几乎了解了Flume的内存队列在运行状态时候的主体流程,其他细节是无关紧要的,感兴趣的可以读读源码。

       下一篇:http://manzhizhen.iteye.com/blog/2311120

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics