`
manzhizhen
  • 浏览: 289721 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ActiveMQ中消费者是如何接收消息的(一)

阅读更多


       事先说明,本博客关于ActiveMQ的文章都是基于ActiveMQ5.10版本。

       初步用过ActiveMQ但又没去研究过源码的朋友肯定有些好奇ActiveMQ中消费者是如何接收消息的呢?本文我就和大家一起从源码角度来初步探讨消费者接收消息的过程。

       我们知道,消息传送有两种模型:点对点(P2P)和发布订阅(PUB/SUB),队列模式中,消息生产者叫做发送者,消息消费者叫做接收者,而在发布订阅模式中,消息生产者叫发布者,消息消费者叫订阅者。点对点模型中队列(Queue)是消息发送和接收的途径和通道,他保证了一个消息最多只能被一个消费者消费,而发布订阅模型中,消息发送和接收的途径是主题(Topic),所有订阅主题的消费者,都可以接收到该主题发布的消息,所以在这个模型中,消息可以被多个消费者消费。

       1)我们先来看看在点对点模型中消费者是如何接收消息的

       如果直接使用过ActiveMQ API的朋友,一定知道消息接收者可以通过两种方式接收消息,一种是使用同步效果的MessageConsumer#receive() 和异步的使用消息监听器的MessageConsumer#setMessageListener(MessageListener listener) 。值得注意的是,在同一个org.apache.activemq.ActiveMQSession会话对象下面的消费者,如果有的是采用消息监听器接收消息,则那些采用同步receive() 接收消息的消费者会抛出 IllegalStateException("Cannot synchronously receive a message when a MessageListener is set")异常,也就是说,同一个Session下面,要么消费者都使用消息监听器,要么都使用receive() 同步接收。

       这是为什么呢?我们先看下org.apache.activemq.ActiveMQMessageConsumer同步接收的源代码: 

 @Override 
    public Message receive() throws JMSException { 
        checkClosed(); 
        checkMessageListener(); 

        sendPullCommand(0);  // 如果预取数为0,则主动向JMS服务器发送拉取消息的报文
        MessageDispatch md = dequeue(-1); 
        if (md == null) { 
            return null; 
        } 

        beforeMessageIsConsumed(md); 
        afterMessageIsConsumed(md, false);   // 给JMS服务器发送接收消息的应答报文

        return createActiveMQMessage(md);   // 取出消息副本并返回
    } 

   
上面的checkMessageListener()就是去做检查的,请看: 

 protected void checkMessageListener() throws JMSException { 

        // 去调用所属会话的checkMessageListener();方法 
        session.checkMessageListener(); 
    } 

 
   而ActiveMQSession中的源码如下: 

  public void checkMessageListener() throws JMSException { 
        if (messageListener != null) { 
            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 
        } 

        // 遍历由会话创建的消费者中是否有绑定消息监听器的消费者,如果有,则抛异常。 
        for (Iterator i = consumers.iterator(); i.hasNext();) { 
            ActiveMQMessageConsumer consumer = i.next(); 
            if (consumer.hasMessageListener()) { 
                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 
            } 
        } 
    } 

   
如上所示,checkMessageListener() 调用的是该消费者所属会话的checkMessageListener()方法,而会话中的checkMessageListener()方法正是去该会话下面查看所有的消费者看看是否有采用消息监听的,如果有,则立马抛出IllegalStateException异常。至于ActiveMQ为什么要这样限制,第一是为了防止一个消费者同时采用同步和消息监听器两种方式接收消息,第二就是这样导致了无法采用一致的消息分发方式来将该会话接收到的消息合理的分配给下面的消费者,第三就是如果是事务性会话,采用两种方式的消费者是无法管理的。当然,如果你需要采用同步和异步消息接收共存,那也很简单,你只要通过ActiveMQConnection创建两个会话,一个会话下面创建的消费者都是采用同步接收,另一个会话下面创建的消费者都是采用异步接收就行了。 
        下面,我们来看看采用receive() 的内部是如何工作的。 这里,我们先来了解一下org.apache.activemq.ActiveMQMessageConsumer中几个重要的成员属性:

protected final MessageDispatchChannel unconsumedMessages;// 未消费的消息通道,里面用来储存未消费的消息,该通道容纳的最大消息数为预取值

protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();// 分发给该消费者但未应答的消息链表,列表中的消息顺序和被消费的顺序是相反的。

private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;  // 为了对TX的完整性进行验证,我们需要对一个事务中的消息重复发送进行跟踪。

       这里,我们先给出receive()方法的源码:

 

   @Override
    public Message receive() throws JMSException {
        checkClosed(); // 检查unconsumedMessages是否关闭
        checkMessageListener(); // 检查是否有其他消费者使用了消息监听器

        sendPullCommand(0); // 向JMS提供者发送一个拉取命令来拉取消息,为下次消费做准备
        MessageDispatch md = dequeue(-1); // 从unconsumedMessages取出一个消息
        if (md == null) {
            return null;
        }

        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false);

        return createActiveMQMessage(md);
    } 

 

       在ActiveMQ中,通过会话创建一个消费者时,就会为这个消费者创建一个未消费的消息通道,该通道分为两种,如果你采用的是优先级队列,则创建的是SimplePriorityMessageDispatchChannel()简单优先级消息分发的通道,如果不是,则创建的是FifoMessageDispatchChannel()先进先出的分发通道,如果你要问为什么需要有这个东西,第一,消费者处理消息是需要时间的,如果每次处理完一条消息才告知Session我处理完了,你再给我一个,这对于快消费者来说,效率是极低的,所以你得允许Session能够一次性将多条消息分给一个消费者,还记得“预取consumer.prefetchSize”的特性吗?Session将某条消息发送到这个消费者时,会先把消息放入属于这个消费者的未消费的消息通道中,我们每调用一次消费者的receive() 方法,首先要做的是就是去检查这个通道是否被关闭,如果被关闭,则会抛出IllegalStateException("The Consumer is closed");异常,第二步才是去调用上面提到的方法去检查是否有采用消息监听器接收消息的其他消费者“哥们”,如果通过了这两项检查,接下来要做的就是异步向MOM发送一个pull命令消息来拉取消息(注意,只有在预取prefetchSize设置为0且未消费的消息通道unconsumedMessages中已经没消息了才会发送pull命令消息,因为只有这时才需要告诉JMS提供者,消费者我已经把消息处理完了,你得赶紧再给我发一批,当然这个命令的发送过程是异步的,这也是为什么采用receive接收消息可以设置预取为0的原因),在发送这个命令之前,客户端会先清理已分发消息链表deliveredMessages,这一步的处理分为两种,1.Session是非事务的,如果Session的应答模式是CLIENT_ACKNOWLEDGE,也就是需要客户端的消费者主动调用Message#acknowledge()来应答MOM,由于我们这里讨论的是队列,所以只是简单的将deliveredMessages给清空而已(如果是基于主题的,会去遍历deliveredMessages给每个消息调用ActiveMQConnection#rollbackDuplicate做重复回滚处理);如果Session应答模式不是CLIENT_ACKNOWLEDGE,则不管是队列还是主题,都只是清空deliveredMessages而已。2.Session是事务的,则会将遍历deliveredMessages中的消息放入previouslyDeliveredMessages中来为重发做准备,源码如下,false表示还未进行过重发。

for (MessageDispatch delivered : deliveredMessages) {
         previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}

 

接着,消费者就会直接从unconsumedMessages取出一个消息,从上面的源码可以看出,传入的时间毫秒参数是-1,所以表示如果unconsumedMessages为空将一直阻塞,如果想设置超时时间,可以使用如下方法同步接收消息:

public Message receive(long timeout) throws JMSException;

timeout==0表示一点也不阻塞,直接返回,如果是大于零的值则最多阻塞设置的值的毫秒数。

阻塞取消息这一步走完,如果获得的消息分发对象MessageDispatch不为空,这如上面的源代码,将执行beforeMessageIsConsumed(md);方法,如该方法名所示,该方法主要做消费消息前的准备工作,如果应答模式不是DUPS_OK_ACKNOWLEDGE或者是队列模式,则将该消息分发对象放入deliveredMessages列表的开头;如果Session是事务的,则(这里呆会在补充)。接下来调用的afterMessageIsConsumed(md, false);的主要作用是应答MOM,所以,当这个方法执行完,你就可以通过MQ的控制台看到该消息已经在“Messages Dequeued”中了。最后的createActiveMQMessage(md);作用就更简单了,直接从md对象中取出消息的副本进行返回,这样,消息接收者客户端就完成了一条消息的同步接收。

       接着,我们来看看采用消息监听器是如何接收消息的。 消费者可以调用public void setMessageListener(MessageListener listener) throws JMSException;方法来给自己设置一个消息监听器,下面给出源码:

@Override
    public void setMessageListener(MessageListener listener) throws JMSException {
        checkClosed();
        if (info.getPrefetchSize() == 0) {
            throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
        }
        if (listener != null) {
            boolean wasRunning = session.isRunning();
            if (wasRunning) {
                session.stop();
            }

            this.messageListener.set(listener);
            session.redispatch(this, unconsumedMessages);

            if (wasRunning) {
                session.start();
            }
        } else {
            this.messageListener.set(null);
        }
    }

 

注意看加粗部分代码,可以看出,采用消息监听器接收消息的消费者,预取数必须大于0,JMS给出的说法是异步消费者不支持。我们来一行行分析代码,该方法首先的工作和采用同步接收消息的方法一样去检查unconsumedMessages是否关闭,如果没有关闭,且listener不为空,则看会话Session是否已经Running,在ActiveMQSession中,有一个叫started的AtomicBoolean,他在Session调用自己的启动方法start()方法时会设置成true,而session.isRunning()方法返回的正是此值,下面给出start()方法的源码:

    protected void start() throws JMSException {
        started.set(true);
        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
            ActiveMQMessageConsumer c = iter.next();
            c.start();
        }
        executor.start();
    }

 

可以看出,该方法不是公用的,因为默认是在ActiveMQSession构造函数中调用的:

        if (connection.isStarted()) {
            start();
        }

有人会感到奇怪,我在通过ActiveMQConnection创建ActiveMQSession之前并没有调用ActiveMQConnection的start()方法啊,所以Session的构造函数里面也并没有启动Session自己啊?不用着急,因为你随后调用的ActiveMQConnection的start()方法里面也会去调用Session的start()方法,源码如下:

    @Override
    public void start() throws JMSException {
        checkClosedOrFailed();
        ensureConnectionInfoSent();
        if (started.compareAndSet(false, true)) {
            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
                ActiveMQSession session = i.next();
                session.start();
            }
        }
    }

 

Connection在启动时会主动去遍历其下创建的Session,挨个让Session启动。经常使用JMSAPI的人应该知道,如果Connection没有调用start()方法时,即使队列中有消息,该Connection下面的消费者都是无法获取到该消息的(发消息不同,即使Connection没有启动,消息发送者仍然可以发送消息到JMS服务器),这下你们都知道原因了吧。好,回归正题,如果发现Session已经启动,它会主动去“关闭”该会话,这是当然的,ActiveMQ得保证该会话下面所有消费者都做好消息接收准备工作再启动自己。所以,如果我们直接使用ActiveMQ的API,最好是所有工作都做好后,再去调用ActiveMQConnection的start()方法。再保证了此时Session没有启动后,很显然我们得保存这个listener,因为我们后面还会去调用它。接着是session.redispatch(this, unconsumedMessages);,这是去消费该消费者unconsumedMessages中遗留的消息并将unconsumedMessages清空,因为我们是新创建的消费者,所以这一步就根本什么也没做。接着,如果Session是刚开始是启动的,由于刚才我们关闭过,所以我们会再次去启动它。这样,设置消息监听器的工作就作完了。

ActiveMQ中消费者是如何接收消息的(二)http://manzhizhen.iteye.com/blog/2102119

 

      

分享到:
评论
2 楼 xiajunhust 2016-05-11  
LZ写的比较清楚,看来是读过源码的。赞一个!
1 楼 小哥1900 2014-10-08  
写得屌爆了!!清楚明了~~

相关推荐

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。

    Spring平台整合消息队列ActiveMQ实现发布订阅、生产者消费者模型(适合新手或者开发人员了解学习ActiveMQ机制)

    3.填入要发送的消息,点击生产消息可以向消息队列添加一条消息,我们可以试着添加了四条消息,并观察控制台结果,可以发现每个消息只被某一个消费者接收; 4.重复以上操作发布四条消息,可以看到订阅者的输出结果,...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置

    ActiveMQ in Action最新版

    消息生产者发送工作消息到 JMS 队列,消费者从这个队列中接收消息并处理。点对点模式不需要生产者和消费者同时在线。队列会一直保留收到的消息,直到有消费者把它消费掉。当消费者可用时,队列会把消息发给每一个...

    SpringBoot集成ActiveMQ实例详解.docx

    JMS的消息机制有2种模型,一种是队列的形式(Point to Point—)发送的消息只能被一个消费者消费;一种是订阅(Topic)模式,可以被多个订阅者订阅,订阅者都会接收到同样的消息。 而ActiveMQ就是对JMS的实现之一。

    JAVA编程之Spring-activeMQ基础开发

    3.填入要发送的消息,点击生产消息可以向消息队列添加一条消息,我们可以试着添加了四条消息,并观察控制台结果,可以发现每个消息只被某一个消费者接收; 4.重复以上操作发布四条消息,可以看到订阅者的输出结果,...

    Spring 实现远程访问详解——jms和activemq

    把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。 JMS 支持两种消息传递模型: 点对点(point-to-point,简称 PTP)...

    SpringBoot2整合Jms超级简单攻略(基于ActiveMQ)

    SpringBoot2整合Jms超级简单攻略(基于ActiveMQ).。消息主题(Topic),需要一个固定的主题,例如大家都是在谈论外卖,...3.在控制台就可以看到mq打印消息,因为消费者2运算量太大,所以一直等待输出,消费者1消费两个消息

    ActiveMQ——Java连接ActiveMQ(点对点)

    发送者发送后不见得接收者会立即收看(3)消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息创建Maven工程关于怎么创建Maven工程这里就不详细讲解了,不会的朋友网上找一下添加相关依赖消息...

    boot-example-activemq-topic-2.0.5

    * 消费生产者将发布到topic中,同时有多个消息消费者(订阅)消费该消息 * 这种方式和点对点方式不同,发布到topic的消息会被所有订阅者消费 * 当生产者发布消息,不管是否有消费者,都不会保存消息,如果对订阅...

    activemq.rar

    对于消息的传递有两种类型:一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

    JMS与SPRING的整合实例(基于Apache ActiveMQ)

    JMS与SPRING的整合实例(基于Apache ActiveMQ) 定义JMS连接工厂 ...定义消费者(接收端) 定义发布者 JAVA核心代码一般由三个部分组成: 监听器(Listener),发布端(Publisher), 消息生产者(Creator)

    PHP使用ActiveMQ实例

    只有一个消费者可以接收到消息 不能重复消费 生产者producer.php代码: &lt;?php try { // 1.建立连接 $stomp = new Stomp('tcp://47.52.119.21:61613'); // 2.实例化类 $obj = new Stdclass(); // 3.获取...

    RabbitMQ与SpringBoot整合.docx

    RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可以用生产...

    rabbit安装包和对应的erlang环境

    使用消息队列,消息生产者在产生消息后,会将消息保存在消息队列中,直到消息消费者来取走它,即消息的发送者和接收者不需要同时与消息队列交互。 使用消息队列可以有效实现服务的解耦,并提高系统的可靠性以及可...

    node-mq:节点-MQ

    node消息队列 优化系统性能最管用的2招 ...MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。 其中较为成熟的MQ产品有 IBM WEBSPHERE MQ(老牌企业

    camel-aggregator

    相应的“direct:start”消费者然后根据名称为“aggregatorid”的标头聚合与消息相关的消息,消息计数条件固定为 3(​​对应从 3 个队列接收消息)。 聚合器实现将在单独行上收到的每个相关消息的内容写入单个文件...

    cloud-security:权限管理和数据权限架构

    把消费者接收到的数据 存放进入数据库,进行action权限的目录管理 并开发 action权限目录管理 平台. 对于平台的action,提前将信息由@ConParameter注解进行注册进入数据库 开发平台用户权限 已提供登录平台. #最终阶段...

Global site tag (gtag.js) - Google Analytics