`
jahu
  • 浏览: 58047 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

rq之offset 杂乱之说

 
阅读更多

Consumer(不管pull,push) 获得数据都会发送 topic,queueId,brokerName offset ,请求数据量。

来分析分析传递的参数

topic,这个明显的不会有变化。

brokerName 每个broker是无状态的,如果broker挂了,consumer可以重新拿去其他borker的 MessageQueue 信息。所以影响也不大。

queueId 每个topic都有读写 queue,写queue是一个逻辑行为没有实际行为。一个读队列对应一个写队列。

看下 queueId在Producer的行为,每个线程轮询MessageQueue的list,queue数据不均衡而已。

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }

    @Override
    public String toString() {
        return "ThreadLocalIndex{" +
            "threadLocalIndex=" + threadLocalIndex.get() +
            '}';
    }
}

 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);

PushConsumer模式主动去请求了数据,正常情况不需要我们关心

PullConsumer模式需要开发自己轮询MessageQueue,一个错误行为是: 一个个messagequeue消费完。

关于queueId需要注意的地方 就是queue的增删

1. 读写队列长度必须一致
2. 不要随意增删,如果增删队列需要通知 consumer与Producer 重新拉取 queue信息,如果你有额外的操作,可能需要更多处理,所以不建议增删队列。按照最大并发量与数据库合理初始化最大 queue

offset 最大的问题。broker 不会对 offset进行维护,也无法进行持久化。

client 需要自己维护。而维护的两个类RemoteBrokerOffsetStore与LocalFileOffsetStore。都无法正真解决这些问题。

LocalFileOffsetStore (PullComsumer 默认实现方式)直接保存在本地

RemoteBrokerOffsetStore,(PushConsumer 默认实现方式)

public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if (null == findBrokerResult) {
            // TODO Here may be heavily overhead for Name Server,need tuning
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        }

        if (findBrokerResult != null) {
            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setConsumerGroup(this.groupName);
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setCommitOffset(offset);

            if (isOneway) {
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            } else {
                //发送到服务端
                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
        }
    }

SlaveSynchronize, 会把 salve保存的 offset 数据同步到 master

private void syncConsumerOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null) {
            try {
                ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
                this.brokerController.getConsumerOffsetManager().persist();
                log.info("Update slave consumer offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
            }
        }
}

问题

  1. 但是 broker 不会对 offset数据持久化,对于我们这些小公司来说,就是一个问题。
  2. slave offset 同步 master 是有延迟的。

3. 如果是单Comsumer 使用RemoteBrokerOffsetStore是没有问题的。如果多个comsumer,无法保证原子操作,如果请求重复,需要过滤数据。得不偿失。无法持久化offset

这里几段很有意思的代码

brokerAllowSuspend 永远为true

public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        return this.processRequest(ctx.channel(), request, true);
}
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        ······
        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
        ······
        
        ·····
}        
public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
        final boolean subscription, final boolean classFilter) {
        int flag = 0;

        if (commitOffset) {
            flag |= FLAG_COMMIT_OFFSET;
        }

        if (suspend) {
            flag |= FLAG_SUSPEND;
        }

        if (subscription) {
            flag |= FLAG_SUBSCRIPTION;
        }

        if (classFilter) {
            flag |= FLAG_CLASS_FILTER;
        }

        return flag;
    }
    
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final long subVersion,final long offset,final int maxNums,...

     if (findBrokerResult != null) {
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setSysFlag(sysFlagInner);

public static int clearCommitOffsetFlag(final int sysFlag) {
        return sysFlag & (~FLAG_COMMIT_OFFSET);
}

class PullMessageProcessor{

private RemotingCommand processRequest(final Channel channel ....
    final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());


}    
}
// 第一段代码使  storeOffsetEnable = true
        boolean storeOffsetEnable = brokerAllowSuspend;
        storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
        storeOffsetEnable = storeOffsetEnable
            && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
        if (storeOffsetEnable) {
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

hasCommitOffsetFlag 在client基本被应变成了 偶数。所以 storeOffsetEnable 是 false,不懂移位,所以可能哪里疏忽了。

总结来说,broker 与 client维护offset 太复杂了,太麻烦了。

首先关注 offset的持久化。

  1. 写入MySQL 性能方面复杂可能比较大
  2. 写入redis性能负载比较小,选择redis

操作原子性 首选 redis。

Comsumer设计方案

不管哪个方案,都要对 MessageQueue 进行维护

第一个方案 Producer主导。缺点是需要producer进行主动,一个消息需要一个redis请求,麻烦。

-- 每次启动消费者,获得消费队列,就去缓存看是否存在对应的队列,如果存在什么都不做,如果不存在就初始化为0,目前是足够了。
-- 这样一个生产组,对应一个消费组
-- 目前只支持当maset模式,消费者一个请求线程,几个处理线程
-- 初始化 topic 数据 ,topic+brokerName 值为 队列id,生产多少个

redis:zadd("topic_a",0,"1",1,"2" ,0,"3")

-- 查询操作 得到最大值得队列
local rek = redis:zrevrange( "topic_a" , 0 , 0 , "WITHSCORES" )
redis.log( redis.LOG_NOTICE , rek)
local production = rek[1][2]
if production > 0 then
  --一次消费多少个? 是全部消费,还是固定消费
  local consume    = ( production >= 100 and {100} or { production })[1]
  redis:zincrby("test_topic_a" , 0 - consume, rek[1][1]) 
  --得到 是否
  local queneStr   = redis:hget("")
  local quene      = cjson.decode( queneStr )
  quene["consume"] = consume
  queneStr         = cjson.encode( quene )
  quene["offset"]  = quene["offset"] + consume
  redis:hset( ""   , cjson.encode( quene ) )
  
  
  return queneStr
else
  --1
  return "false"
end

第二个方案 Comsumer 自增轮序

比如有 1个queue,五个comsumer , offset目前值为1000

问题描述

A 去redis 修改 offset 为 1100,A请求的offset是1000,数量是100

B 去redis 修改 offset 为 1200,B请求的offset是1100,数量是100

C 去redis 修改 offset 为 1300,C请求的offset是1200,数量是100

A去请求的时候 broker为1098,A pull 99条数据

A刚刚请求完 broker 为 1012,B pull 1条数据,第1100条数据不会在被读取到。

重复读不行吗?

因为 pull 之后 如果没有pull 100条数据,会在offset 上面键入 100-pull数据量。

A pull 98,当前offset为1300,a pull 完之后会在原有的offset减2 offset值为1298,所以,

b pull 1 , 当前 offset 为1298,offset = 1298-99

c pull 0 , 当前 offset 为1199, offset = 1199-100

offst为1100,下次请求是 1100开始,那么1101这条数据被重复消费。1099数据被无法再次读取把。

所以第二个方案 重复读取,速度丢失。不可用。就算出错概率小,也不敢用。

方案三, comsumer 轮询且阻塞 queue队列。数据结构维护复杂

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics