虽然我们把FlumeEventQueue想象成Event指针的内存队列,但FlumeEventQueue中的内部实现是很绕的,不跑跑Flume的单元测试,很容易看晕。本文的目的就是通过简化模型来剖析FlumeEventQueue中的四种操作:addTail、removeHead、addHead和remove。
上一篇博文地址:http://manzhizhen.iteye.com/admin/categories/357759
FlumeEventQueue中(准确的说应该是EventQueueBackingStore中)有queueSize和queueHead两个int属性,用来保存队列的长度和取元素时的索引(队列ADT中是从头取元素,从尾放元素)。这两个属性的初始值都是0。还有一个Map类型的overwriteMap属性(准确来说是EventQueueBackingStoreFile中的属性),它就是队列的容器了,它里面存放了所有未被消费的Event指针(准确来说不是所有的,具体细节请看Flume快速入门(三))。当然,还有个int型的队列容量属性capacity(准确来说是EventQueueBackingStore中的),有了对capacity的判断,queueHead在自增的过程中就不会过界了。我们接下来仔细看看FlumeEventQueue是如何通过如上几种属性来制造队列行为的。
操作名称:addTail
使用场景:Source将Event写入Channel。
具体操作(伪代码):
1. index = queueSize;
2. queueSize++;
3. put Event to overwriteMap at key is [queueHead+index];
操作名称:removeHead
使用场景:Sink从Channel取Event。
具体操作(伪代码):
1. index = 0;
2. take Event from overwriteMap at key is [queueHead] ;
3. set 0 to overwriteMap at key is [queueHead];
4. queueHead++;
5. queueSize--;
操作名称:addHead
使用场景:Sink从Channel取Event失败,将Event回滚到FlumeEventQueue中。
具体操作(伪代码):
1. index = 0;
2. queueSize++;
3. queueHead-- ;
4. set Event to overwriteMap at key is [queueHead];
操作名称:remove
使用场景:当从故障中恢复数据时,需要从log file中加载数据到FlumeEventQueue,每加载一个Event需要将FlumeEventQueue中可能存在的相同的Event删除。
具体操作(伪代码):
1. 遍历overwriteMap,找到该Event对应的index;
2. 既然已经决定删除index上的Event,则为了保证队列的元素的连续性,需要对部分Event位移,为了位移最少数量的Event,当index大于queueSize的一半,将index右边的元素向左(队尾)移动,否则则将index左边的元素向右(队头)移动;
3. 如果是左移,则set 0 to overwriteMap at key is [queueHead+queueSize-1] ;如果是右移,则set 0 to overwriteMap at key is [queueHead],并且queueHead++;
4. queueSize--;
我们已经给出了上面四种操作的详细步骤,当然,为了防止queueHead在自增或自减中越界,它会随时和0、capacity来做比较来使queueHead在一个首尾相连的环中移动,但具体映射到overwriteMap的key是根据EventQueueBackingStoreFile#getPhysicalIndex方法来的:
protected int getPhysicalIndex(int index) { return HEADER_SIZE + (getHead() + index) % getCapacity(); }
其中HEADER_SIZE的值是比1024稍大点的质数1029,而getHead()获取的就是queueHead.
如果你了解了FlumeEventQueue的这四种操作,就几乎了解了Flume的内存队列在运行状态时候的主体流程,其他细节是无关紧要的,感兴趣的可以读读源码。
相关推荐
Flume1.6.0入门:安装、部署、及flume的案例
Flume1.5.0入门:安装、部署、及flume的案例Flume1.5.0入门:安装、部署、及flume的案例
视频详细讲解,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 1、介绍: ...章节九:Channel选择器 章节十:Sink处理器 章节十一:导入数据到HDFS 章节十二:Flume SDK 章节十三:Flume监控
flume-ng安装
本文讲述了flume中channel和sink简单描述和linux配置 包括:Memory channel、File channel及其它测试阶段的Channel; 及channel通过sink的输出配置Logger Sink、File Roll Sink、HDFS Sink、Avro Sink(多级流动、...
flume介绍与原理: 1、flume的优势 2、Flume具有的特征:source->channel->sink 3、flume配置介绍 4、flume启动命令
尚硅谷大数据技术之Flume
apache-flume-1.9.0-bin.tar,kafka_2.11-0.10.1.0,zookeeper-3.3.6_.tar 压缩 到了一个logs.rar文件中,需要的请下载
hadoop12下载flume安装包wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.7.0/ap
1.Flume Agent的编写: flume_push_streaming.conf 2.代码 4.本地测试总结 1.Flume Agent的编写: flum
04_Flume中配置使用file channel及HDFS sink 05_Flume中配置HDFS文件生成大小及时间分区 06_Flume中配置Spooling Dir的使用 07_Flume中配置Spooling Dir的文件过滤 08_Flume中配置扇入架构的介绍 09_Flume中...
flume_exporter 普罗米修斯水槽出口商。 要运行它: make build ./flume_exporter [flags] 标志帮助: ./flume_exporter --help 配置:config.yml agents: - name: "flume-agents" enabled: true # ...
$ cd flume-round-robin-channel-selector $ mvn clean package $ ls target flume-round-robin-channel-selector-1.0.jar 将JAR添加到Flume类路径 $ cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-...
第 2 章 Flume 快速入门2.1.1 安装地址2)文档查看地址3)下载地址2.1.2 安装部署2.2.1 监控端口数据官方案例1)案例需求:使用 Flum
Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。) Source: 数据收集组件。(source从...
flume入门介绍,简单介绍flume的背景和应用场景,flume的实现原理以及案例分享
Flume入门使用.pdf 学习资料 复习资料 教学资源
Flume 快速入门教程,文本数据采集