Consumer(不管pull,push) 获得数据都会发送 topic,queueId,brokerName offset ,请求数据量。
来分析分析传递的参数
topic,这个明显的不会有变化。
brokerName 每个broker是无状态的,如果broker挂了,consumer可以重新拿去其他borker的 MessageQueue 信息。所以影响也不大。
queueId 每个topic都有读写 queue,写queue是一个逻辑行为没有实际行为。一个读队列对应一个写队列。
看下 queueId在Producer的行为,每个线程轮询MessageQueue的list,queue数据不均衡而已。
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
@Override
public String toString() {
return "ThreadLocalIndex{" +
"threadLocalIndex=" + threadLocalIndex.get() +
'}';
}
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
PushConsumer模式主动去请求了数据,正常情况不需要我们关心
PullConsumer模式需要开发自己轮询MessageQueue,一个错误行为是: 一个个messagequeue消费完。
关于queueId需要注意的地方 就是queue的增删
1. 读写队列长度必须一致
2. 不要随意增删,如果增删队列需要通知 consumer与Producer 重新拉取 queue信息,如果你有额外的操作,可能需要更多处理,所以不建议增删队列。按照最大并发量与数据库合理初始化最大 queue。
offset 最大的问题。broker 不会对 offset进行维护,也无法进行持久化。
client 需要自己维护。而维护的两个类RemoteBrokerOffsetStore与LocalFileOffsetStore。都无法正真解决这些问题。
LocalFileOffsetStore (PullComsumer 默认实现方式)直接保存在本地
RemoteBrokerOffsetStore,(PushConsumer 默认实现方式)
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
// TODO Here may be heavily overhead for Name Server,need tuning
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
//发送到服务端
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
SlaveSynchronize, 会把 salve保存的 offset 数据同步到 master
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
问题
- 但是 broker 不会对 offset数据持久化,对于我们这些小公司来说,就是一个问题。
- slave offset 同步 master 是有延迟的。
3. 如果是单Comsumer 使用RemoteBrokerOffsetStore是没有问题的。如果多个comsumer,无法保证原子操作,如果请求重复,需要过滤数据。得不偿失。无法持久化offset
这里几段很有意思的代码
brokerAllowSuspend 永远为true
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
······
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
······
·····
}
public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
final boolean subscription, final boolean classFilter) {
int flag = 0;
if (commitOffset) {
flag |= FLAG_COMMIT_OFFSET;
}
if (suspend) {
flag |= FLAG_SUSPEND;
}
if (subscription) {
flag |= FLAG_SUBSCRIPTION;
}
if (classFilter) {
flag |= FLAG_CLASS_FILTER;
}
return flag;
}
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,...
if (findBrokerResult != null) {
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setSysFlag(sysFlagInner);
public static int clearCommitOffsetFlag(final int sysFlag) {
return sysFlag & (~FLAG_COMMIT_OFFSET);
}
class PullMessageProcessor{
private RemotingCommand processRequest(final Channel channel ....
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
}
}
// 第一段代码使 storeOffsetEnable = true
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
hasCommitOffsetFlag 在client基本被应变成了 偶数。所以 storeOffsetEnable 是 false,不懂移位,所以可能哪里疏忽了。
总结来说,broker 与 client维护offset 太复杂了,太麻烦了。
首先关注 offset的持久化。
- 写入MySQL 性能方面复杂可能比较大
- 写入redis性能负载比较小,选择redis
操作原子性 首选 redis。
Comsumer设计方案
不管哪个方案,都要对 MessageQueue 进行维护
第一个方案 Producer主导。缺点是需要producer进行主动,一个消息需要一个redis请求,麻烦。
-- 每次启动消费者,获得消费队列,就去缓存看是否存在对应的队列,如果存在什么都不做,如果不存在就初始化为0,目前是足够了。
-- 这样一个生产组,对应一个消费组
-- 目前只支持当maset模式,消费者一个请求线程,几个处理线程
-- 初始化 topic 数据 ,topic+brokerName 值为 队列id,生产多少个
redis:zadd("topic_a",0,"1",1,"2" ,0,"3")
-- 查询操作 得到最大值得队列
local rek = redis:zrevrange( "topic_a" , 0 , 0 , "WITHSCORES" )
redis.log( redis.LOG_NOTICE , rek)
local production = rek[1][2]
if production > 0 then
--一次消费多少个? 是全部消费,还是固定消费
local consume = ( production >= 100 and {100} or { production })[1]
redis:zincrby("test_topic_a" , 0 - consume, rek[1][1])
--得到 是否
local queneStr = redis:hget("")
local quene = cjson.decode( queneStr )
quene["consume"] = consume
queneStr = cjson.encode( quene )
quene["offset"] = quene["offset"] + consume
redis:hset( "" , cjson.encode( quene ) )
return queneStr
else
--1
return "false"
end
相关推荐
d2rq官方下载的zip包,倒腾了大半天才下载下来。
RQ3.0修正版 RQ3.0修正版 RQ3.0修正版 RQ3.0修正版 RQ3.0修正版
d2rq-0.8.1,tar.zip 下载
d2rq.zip安装包,官网上下不下来的,我用github下下来的,需要的朋友可以下载。 D2RQ exposes the contents of relational databases as RDF. It consists of: The D2RQ Mapping Language. Use it to write ...
联想服务器RQ940用户手册,免费下载
联想服务rq940
松下超薄磁带随身听RQ-SX70v说明书PDF版,随身听是指体积小、重量轻便于随身携带的媒体播放器,由索尼创造的Walkman为代表,松下,爱华等日系品牌,飞利浦等欧系品牌,Bose,苹果等美系品牌见证了随身听的由盛到衰。...
D2RQ0.81版本,是最新版本。windows和linux两种版本都有,解压即可得到。因为官网有时无法访问,所以下下来之后放到CSDN上供大家下载
RQ3E120AT P CH-30V -39A Power Mosfet Vdss -30V Rds 8毫欧
d2rq-0.8.1里面有linux与windows版本,
excel转换为rq文件,excel文件需预先保存为CVS文件,适合esale入库导入
QFN32 Allergro封装(FT232RQ) 带有散热焊盘的.dra文件。可以直接用到pcb
瑞基资料RQ系列接线 刚学上传资料,,希大家多多指点。 。。 。。
RQ940系统用户手册
开利中央空调30RB、30RQ电气维修手册
好不容易下载来,里面包含JENA包、d2rq包。
松下RQ-SX77V维修电路图
明纬RQ-85开关电源规格书pdf,明纬RQ-85开关电源规格书
SIMATIC ET 200SP 数字量输出模块 RQ 4x120VDC-230VAC/5A NO MA ST[手册]pdf,
生成RQ码的通用com组件,已破解 经过亲测,很好用