MapedFileQueue
应用层访问commitlog和consumequeue文件是通过MappFileQueue来操作MapedFile类,从而间接操作磁盘上面的文件;MappFileQueue是由多个MapedFile队列组成的,该类的结果如下图所示。
功能清单如下:
1. 获取在某时间点之后更新的文件(getMapedFileByTime)
方法getMapedFileByTime(final long timestamp),遍历MapedFile列表,若遇到文件的更新时间戳大于某时间点timestamp则返回该MapedFile对象,遍历完之后仍然没有找到则返回列表的最后一个MapedFile对象。
清理指定偏移量所在文件之后的文件(truncateDirtyFiles)
方法truncateDirtyFiles(long offset):遍历MapedFile列表,每个MapedFile对象对应着一个固定大小的文件,若当文件的起始偏移量fileFromOffset<=offset<=fileFromOffset+fileSize,则表示指定的位置偏移量offset落在的该文件上,则将对应的MapedFile对象的wrotepostion和commitPosition设置为offset%fileSize,若文件的起始偏移量fileFromOffset>offset,即是命中的文件之后的文件,则将这些文件删除并且从MappFileQueue的MapedFile列表中清除掉。
获取或创建最后一个文件(getLastMapedFile)
从MapedFile列表中获取最后一个MapedFile对象,若列表为空或者最后一个对象对应的文件已经写满,则创建一个新的文件(即新的MapedFile对象);若存在最后一个文件(对应最后一个MapedFile对象)并且未写满,则直接返回最后一个MapedFile对象。
若列表为空,则创建的新文件的文件名(即fileFromOffset值)为0;若最后一个文件写满,则新文件的文件名等于最后一个文件的fileFromOffset+fileSize;
若在Broker启动初始化的时候会创建了后台服务线程(AllocateMapedFileService服务),则调用AllocateMapedFileService.putRequestAndReturnMapedFile方法,在该方法中用下一个文件的文件路径、下下一个文件的路径、文件大小为参数初始化AllocateRequest对象,并放入该服务线程的requestQueue:PriorityBlockingQueue变量中,由该线程在后台监听requestQueue队列,若该队列中存在AllocateRequest对象,则利用该对象的变量值创建MapedFile对象(即在磁盘中生成了对应的物理文件),并存入AllocateRequest对象的MapedFile变量中,并且在下一个新文件之后继续将下下一个新文件也创建了。若是在当前线程中直接创建MappeFile对象,则只创建一个新的文件。
最后将创建或返回的MapedFile对象存入MapedFileQueue的MapedFile列表中,并返回该MapedFile对象给调用者。
获取列表中的最后一个文件(getLastMapedFile2)
1
2
|
<code>从MapedFile列表中获取最后一个MapFile对象;若列表为空则返回 null 。
</code> |
统计内存的数据还有多少未持久化(howMuchFallBehind)
调用getLastMapedFile方法获取Mapped队列中最后一个MapedFile对象,计算得出未刷盘的消息大小,计算公式为:最后一个MapedFile对象fileFromOffset+写入位置wrotepostion-commitedWhere(上次刷盘的位置)。
获取MapedFile队列中最小Offset值(getMinOffset)
先获取MapedFile队列中第一个MapedFile对象,再取该对象的fileFromOffset值(即文件的名字值),该值即为最小offset值;若MapedFile列表为空,则直接返回-1;
获取MapedFile队列中最大Offset值(getMaxOffset)
获取Mapped队列中最后一个MapedFile对象,将最后一个MapedFile对象fileFromOffset加上写入位置wrotepostion值即为最大offset值。
删除某类文件中的最后一个文件(deleteLastMaped)
例如commit类型的文件下面有多个固定大小(1G)的文件,即对应在MapedFile列表中有多个MapedFile对象,若要删除最后一个文件,首先从磁盘中删除物理文件,然后从列表中删除最后一个MapedFile对象。
根据指定的offset找到所在文件(findMapedFileByOffset)
方法findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound):首先找到MapedFile队列中的第一个MapedFile对象,取该对象的fileFromOffset值;然后指定的offset减去fileFromOffset值再除以fileSize得到文件在MapedFile队列中的序列号index,最后根据index值从列表中获取MapedFile对象。若index值获取对象时出错误(index<0或者大于列表的总数),则根据returnFirstOnNotFound参数决定是返回null或者第一个MapedFile对象。
MapedFile队列中的消息刷盘(commit)
首先根据commitWhere值(上次刷盘的位置)在队列中找到所处的文件对象MapedFile,即调用findMapedFileByOffset方法;然后调用MapedFile对象的commit方法完成消息刷盘操作;最后利用MapedFile对象的fileFromOffset值加上这次刷盘的消息大小得到的总和更新commitWhere值,作为下次刷盘的开始位置,同时更新MapedFileQueue对象的存储时间戳(storeTimestamp)。
https://www.2cto.com/kf/201708/666767.html
https://www.cnblogs.com/guazi/p/6836112.html
相关推荐
1 . IoT终端消息分析 2. 领域模型设计 3. RocketMQ融合架构——RocketMQ-MQTT
所以在实际产环境中,个Topic会设置成多分区的模式,来持多个消费者,参照下图:在互联企业的实际产环境中,Topic数量和分区都会较多,这就要求消息中间件在多T
cd /opt/rocketmq-all-4.3.0-bin-release # nohup sh bin/mqnamesrv & #启动每个服务器的nameserver # tail -f nohup.out The Name Server boot success #输出此类信息,说明启动成功 启动broker 服务器Namserver1...
基于spring-cloud-alibaba套件的微服务架构的商场停车场实战案例 关键技术点应用: ... 服务发现——nacos ...异步消息处理——rocketmq 分布式缓存——redis 客户端负载均衡——openfeign RPC调用——dubbo
万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf
技术文档分享,免费获取请私信博主。
RocketMQ源码分析,分为存储篇、NameServer篇、Broker篇、Producer篇、Consumer篇五大部分进行源码级的讲解。大致如下: 1、讲解commitlog、consumequeue、index、transaction文件等数据结构、数据读写、HA高可用等...
阿里RocketMQ_用户指南_V3.2.4_最新版本.pdf benchmark.pdf Metaq在JDk 7下的异常及解决方案.docx mqvsmq.pdf RocketMQ_原理简介.pdf RocketMQ_admin.pdf RocketMQ_benchmark.pdf RocketMQ_calvinzhan - 类图.pdf ...
rocketmq总结
一、rocketmq入门到精通视频教程目录大纲 001-001_RocketMQ_简介 002-002_RocketMQ_核心概念详解 003-003_RocketMQ_集群构建模型详解(一) 004-004_RocketMQ_集群构建模型详解(二) 005-005_RocketMQ_双主模式集群...
安装 RocketMQ 1. 下载 rocketmq 二进制文件 wget --no-check-certificate https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip 2. 解压缩 rocketmq 将 rocketmq 解压到 /usr/local/ 目录 ...
RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-Connect 二次开发源码。RocketMQ-...
rocketmq设置开机启动脚本
SpringBoot整合RocketMq
2. 高效的 I/O 存储,RocketMQ 追求消息发送的高吞吐量,RocketMQ 的消息存储设计成文件组的概念,组内单个文件固定大小,引入了内存映射机 制,所有主题的消息存储基于顺序读写,极大提高消息写性能,同时为了兼顾...
rocketmq客户端,直接在浏览器中输入地址即可访问。这是个springboot项目,根据实际需要修改对应配置;主要是修改端口号及对应rocketMQ连接地址。
rocketmq可视化控制台最新版
rocketmq-dashboard.zip
Rocketmq可视化工具 使用方法: 打包运行 mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.1.jar
cd docker-rocketmq cd rmq chmod +x start.sh ./start.sh 不能设置权限777的同学可以设置如下 chown 3000:3000 ./rmqs/logs chown 3000:3000 ./rmqs/store chown 3000:3000 ./rmq/logs chown 3000:3000 ./rmq/...