`

ActiveMQ 实践之路(二) 使用Queue或者Topic发送/接受消息

    博客分类:
  • jms
阅读更多

本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。
    
     一.JMS回顾
       因为ActiveMQ是一个JMS Provider的实现,因此在开始实作前,有必要复习下JMS的基础知识
    Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS 1.0.2b和1.1. 1.1已经成为主流的JMS Provider事实上的标准了.
      *1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受
Topic消息。
      
       在JMS中间主要定义了2种消息模式Point-to-Point (点对点),Publich/Subscribe Model (发布/订阅者),
    其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式。
    
     下面是JMS规范基本的接口和实现
     JMS Common Interfacse PTP-Specific Interface   Pub/Sub-specific interfaces
     ConnectionFactory     QueueConnectionFactory   TopicConnectionFactory
     Connection            QueueConnection          TopicConnection
     Destination           Queue                    Topic
     Session               QueueSession             TopiSession
     MessageProducer       QueueSender              TopicPublisher
     MessageConsumer       QueueReceiver/QueueBrwer TopicSubscriber


     二.使用Queue

         下面以ActiveMQ example的代码为主进行说明
        
        1. 使用ActiveMQ的Connection,ConnectionFactory 建立连接,注意这里没有用到pool
       

java 代码
  1. import org.apache.activemq.ActiveMQConnection   
  2. import org.apache.activemq.ActiveMQConnectionFactory   

        //建立Connection

java 代码
  1. protected Connection createConnection() throws JMSException, Exception {   
  2.      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);   
  3.      Connection connection = connectionFactory.createConnection();   
  4.      if (durable && clientID!=null) {   
  5.          connection.setClientID(clientID);   
  6.      }   
  7.      connection.start();   
  8.      return connection;   
  9.     }  

        //建立Session
  

java 代码
  1. protected Session createSession(Connection connection) throws Exception {   
  2.          Session session = connection.createSession(transacted, ackMode);   
  3.          return session;   
  4.         }   

        2。发送消息的代码
 //建立QueueSession
 

java 代码
  1. protected MessageProducer createProducer(Session session) throws JMSException {   
  2.         Destincation destination = session.createQueue("queue.hello");   
  3.         MessageProducer producer = session.createProducer(destination);   
  4.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  5.            
  6.         if( timeToLive!=0 )   
  7.             producer.setTimeToLive(timeToLive);   
  8.         return producer;   
  9.         }   

         //使用Producer发送消息到Queue
    

java 代码
  1. producer.send(message);   

       
        3。接受消息,在JMS规范里面,你可以使用
  

java 代码
  1. QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口   
  2.  public void onMessage(Message message) {   
  3.  //process message   
  4.  }   
  5.           
  6.  //set MessageListner ,receive message   
  7.  Destincation destination = session.createQueue("queue.hello");   
  8.  consumer = session.createConsumer(destination);   
  9.  consumer.setMessageListener(this);   

       
        以上就是使用jms queue发送接受消息的基本方式

 
     三 Topic

        1. 建立连接
   

java 代码
  1. protected Connection createConnection() throws JMSException, Exception {      
  2.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);      
  3.         Connection connection = connectionFactory.createConnection();      
  4.         //如果你要使用DurableSubScription 方式,你必须为connection设置一个ClientID      
  5.         if (durable && clientID!=null) {      
  6.             connection.setClientID(clientID);      
  7.         }      
  8.         connection.start();      
  9.         return connection;      
  10.        }      

       2. 建立Session

java 代码
  1. protected Session createSession(Connection connection) throws Exception {      
  2.         Session session = connection.createSession(transacted, ackMode);      
  3.         return session;      
  4.         }    

 3.创建Producer 发送消息到Topic   
       

java 代码
  1. //create topic on  session   
  2.        topic = session.createTopic("topic.hello");   
  3.  producer = session.createProducer(topic);   
  4.        //send message    
  5.        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  6.  producer.send(message);   


 4.创建Consumer接受消息(基本上和Queue相同)

java 代码
  1. Destincation destination  = session.createTopic("topic.hello");      
  2. MessageConsumer consumer = session.createConsumer(destination);      
  3. consumer.setMessageListener(this);      
  4.            
  5.      //如果你使用的是Durable Subscription方式,你必须在建立connection的时候      
  6.      //设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName。      
  7.  //connection.setClientID(clientId);      
  8.  //consumer = session.createDurableSubscriber((Topic) destination, consumerName);   

       
 四:连接ActiveMQ的方式
        ActiveMQConnectionFactory 提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html

 常见的有
 vm://host:port     //vm
 tcp://host:port    //tcp
 ssl://host:port    //SSL
 stomp://host:port  //stomp协议可以跨语言,目前有很多种stomp client 库(java,c#,c/c++,ruby,python...);

分享到:
评论

相关推荐

    Apache ActiveMQ Queue Topic 详解

    Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    Queue与Topic的比较

    Queue与Topic的比较,学习JMS和activemq必须看的资料之一

    springboot2整合activemq的demo内含queue消息和topic消息

    springboot2整合activemq的demo内含queue消息和topic消息,需要使用者修改application.yml的连接地址信息就可以了,端口号是默认的可以不用修改

    spring集成activemq演示queue和topic 持久化

    本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用

    ActiveMQ的队列、topic模式

    ActiveMQ的两种消息模式的例子,queue和topic,注释很详细,告诉你每步做什么,非springboot整合

    JAVA编程之Spring boot-activeMQ示例

    # Springboot-activeMQ 本项目基于Spring boot这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。...3.主题类型topic,创建主题,生产者发送主题消息,以及消费着消费主题消息

    Springboot-activeMQ

    ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...activemq的queue和topic  JMS中定义了两种消息模型:  点对点(point to point, queue)  发布/订阅(publish/subscribe,topic)。

    Springboot整合ActiveMQ(Queue和Topic两种模式)

    文章目录ActiveMQ简介1、ActiveMQ简介2、ActiveMQ下载SpringBoot整合ActiveMQ1、新建SpringBoot项目2、项目结构3、相关配置信息4、ActiveMQ配置类Queue队列模式1、队列生产者2、队列消费者3、测试效果Topic模式1、...

    spring-boot-activemq-demo

    spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。

    activemq-web-console-5.11.2

    activemq-web-console的默认使用方式是通过在activemq.xml中导入jetty.xml配置一个jetty server来实现的。其实activemq-web-console完全可以和activemq-broker分开来部署。 activemq-web-console包含3个apps, 1.一...

    Spring 实现远程访问详解——jms和activemq

    把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。 JMS 支持两种消息传递模型: 点对点(point-to-point,简称 PTP)...

    python 发送和接收ActiveMQ消息的实例

    ActiveMQ是java开发的消息中间件服务。可以支持多种协议(AMQP,MQTT,OpenWire,Stomp),默认的是OpenWire。而python与ActiveMQ的通信使用的是Stomp协议。而如果你的服务没有开启则需要配置开启。 首先需要安装python...

    详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    主要介绍了详解Springboot整合ActiveMQ(Queue和Topic两种模式),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    java springboot整合activemq工程

    #默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,...

    activeMQ JMS 3种创建方式

    activeMQ JMS 3种创建方式 公共方式 QUEUE TOPIC

    springboot整合activemq案例

    springboot整合activemq案例,queue,topic两种模式 定时运行和controller请求运行两种方式

    ActiveMQ详细入门使用教程_java_MQ_

    MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...

    消息队列学习(springboot+kafka+activemq)

    启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer 使用异同点,demo仅仅是最简化代码,演示通信和使用,无法对两者的集群模式进行测试,如果有对集群模式有兴趣,可自行扩展...

    activemq-all-5.13.2.jar

    activemq的Java依赖包,课构建Queue/topic 生产、消费

Global site tag (gtag.js) - Google Analytics