`

JMS发布订阅模式

    博客分类:
  • JMS
 
阅读更多
方式一:
package com.deppon.test04.jms.topicpublish;
发布者:
import java.util.Properties;

import javax.jms.DeliveryMode; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.Session; 
import javax.jms.Topic; 
import javax.jms.TopicConnection; 
import javax.jms.TopicConnectionFactory; 
import javax.jms.TopicPublisher; 
import javax.jms.TopicSession; 
import javax.jms.TopicSubscriber; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
 
public class SimplePublisher { 
 
    private TopicPublisher producer; 
    private TopicSession session; 
    private TopicConnection connection; 
    private boolean isOpen = true; 
     
    public SimplePublisher() throws Exception{ 
       
        Properties props = new Properties();
//        props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
//        props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
        props.setProperty("connectionFactoryNames","TopicCF");
        props.setProperty("queue.queue1","jms.queue1");
        props.setProperty("topic.topic1","jms.topic1");
        javax.naming.Context context = new InitialContext(props);
       
//        Context context = new InitialContext(); 
        TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF"); 
        connection = connectionFactory.createTopicConnection(); 
        connection.setClientID("OK111"); 
        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
        Topic topic = (Topic)context.lookup("topic1"); 
        producer = session.createPublisher(topic);//non durable 
        producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
        connection.start(); 
         
    } 
     
     
    public boolean send(String text) { 
        if(!isOpen){ 
            throw new RuntimeException("session has been closed!"); 
        } 
        try{ 
            Message message = session.createTextMessage(text); 
            producer.send(message); 
            return true; 
        }catch(Exception e){ 
            return false; 
        } 
    } 
     
    public synchronized void close(){ 
        try{ 
            if(isOpen){ 
                isOpen = false; 
            } 
            session.close(); 
            connection.close(); 
        }catch (Exception e) { 
            // 
        } 
    } 
     


订阅者:
package com.deppon.test04.jms.topicpublish;

import java.util.Properties;

import javax.jms.Session; 
import javax.jms.Topic; 
import javax.jms.TopicConnection; 
import javax.jms.TopicConnectionFactory; 
import javax.jms.TopicSession; 
import javax.jms.TopicSubscriber; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
   
public class SimpleSubscriber { 
 
    private TopicConnection connection; 
    private TopicSession session; 
    private TopicSubscriber consumer; 
     
    private boolean isStarted; 
     
    public SimpleSubscriber(String clientId) throws Exception{ 
       
        Properties props = new Properties();
        props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        props.setProperty(Context.PROVIDER_URL,"tcp://localhost:61616");
        props.setProperty("connectionFactoryNames","TopicCF");
        props.setProperty("queue.queue1","jms.queue1");
        props.setProperty("topic.topic1","jms.topic1");
        javax.naming.Context context = new InitialContext(props);
       
//        Context context = new InitialContext(); 
        TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF");
        connection = connectionFactory.createTopicConnection(); 
        connection.setClientID(clientId); 
        session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
        Topic topic = (Topic)context.lookup("topic1"); 
        consumer = session.createDurableSubscriber(topic, "Test-subscriber"); 
        consumer.setMessageListener(new TopicMessageListener()); 
    } 
     
     
    public synchronized boolean start(){ 
        if(isStarted){ 
            return true; 
        } 
        try{ 
            connection.start(); 
            isStarted = true; 
            return true; 
       }catch(Exception e){ 
            return false; 
        } 
    } 
     
    public synchronized void close(){ 
        isStarted = false; 
        try{ 
            session.close(); 
            connection.close(); 
        }catch(Exception e){ 
            // 
        } 
    } 


监听消息类:
package com.deppon.test04.jms.topicpublish;

import javax.jms.Message;
import javax.jms.MessageListener;

public class TopicMessageListener implements MessageListener {

    MessageListener paramMessageListener;

    public MessageListener getParamMessageListener() {
        return paramMessageListener;
    }

    public void setParamMessageListener(MessageListener paramMessageListener) {
        this.paramMessageListener = paramMessageListener;
    }

    @Override
    public void onMessage(Message paramMessage) {
//        this.paramMessageListener = paramMessage.get;
        // TODO Auto-generated method stub
        System.out.println("AAAAAAAAA BBBBB");
        System.out.println("AAAAAAAAA CCCCC");
    }
   
}
测试类:
package com.deppon.test04.jms.topicpublish;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SimpleTestMain { 
      /**
     * @param args
     */ 
    public static void main(String[] args) throws Exception{ 
               
        SimpleSubscriber consumer = new SimpleSubscriber("TestClientId");
       
//        consumer.close();
        consumer.start(); 
         
        SimplePublisher productor = new SimplePublisher(); 
       
        for(int i=0; i<10; i++){ 
            productor.send("message content:" + i); 
        } 
        productor.close(); 
        //consumer.close(); 
    } 
     
 


方式二:

发布者:
package com.deppon.test04.jms.topicpublish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class SendTopic {
    private static final int SEND_NUMBER = 5;
    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <=SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq发送的消息" + i);
            //发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
  
    public static void main(String[] args) {
        // ConnectionFactory:连接工厂,JMS用它创建连接
        ConnectionFactory connectionFactory;
        // Connection:JMS客户端到JMS Provider的连接
        Connection connection = null;
        // Session:一个发送或接收消息的线程
        Session session;
        // Destination:消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            //构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            //启动
            connection.start();
            //获取操作连接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //获取session注意参数值FirstTopic是一个服务器的topic(与queue消息的发送相比,这里是唯一的不同)
            destination = session.createTopic("FirstTopic");
            //得到消息生成者【发送者】
            producer = session.createProducer(destination);
            //设置不持久化,此处学习,实际根据项目决定
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

订阅者:
package com.deppon.test04.jms.topicpublish;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ReceiveTopic implements Runnable {
    private String threadName;

    ReceiveTopic(String threadName) {
         this.threadName = threadName;
    }

    public void run() {
         // ConnectionFactory:连接工厂,JMS用它创建连接
         ConnectionFactory connectionFactory;
         // Connection:JMS客户端到JMS Provider的连接
         Connection connection =null;
         // Session:一个发送或接收消息的线程
         Session session;
         // Destination:消息的目的地;消息发送给谁.
         Destination destination;
         //消费者,消息接收者
         MessageConsumer consumer;
         connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
         try {
               //构造从工厂得到连接对象
               connection = connectionFactory.createConnection();
               //启动
               connection.start();
               //获取操作连接,默认自动向服务器发送接收成功的响应
               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
               //获取session注意参数值FirstTopic是一个服务器的topic
               destination = session.createTopic("FirstTopic");
               consumer = session.createConsumer(destination);
               while (true) {
                    //设置接收者接收消息的时间,为了便于测试,这里设定为100s
                    TextMessage message = (TextMessage) consumer
                                .receive(100 * 1000);
                    if (null != message) {
                          System.out.println("线程"+threadName+"收到消息:" + message.getText());
                    } else {
                          continue;
                    }
               }
         } catch (Exception e) {
               e.printStackTrace();
         } finally {
               try {
                    if (null != connection)
                          connection.close();
               } catch (Throwable ignore) {
               }
         }
    }

    public static void main(String[] args) {
          //这里启动3个线程来监听FirstTopic的消息,与queue的方式不一样三个线程都能收到同样的消息
         ReceiveTopic receive1=new ReceiveTopic("thread1");
         ReceiveTopic receive2=new ReceiveTopic("thread2");
         ReceiveTopic receive3=new ReceiveTopic("thread3");
         Thread thread1=new Thread(receive1);
         Thread thread2=new Thread(receive2);
         Thread thread3=new Thread(receive3);
         thread1.start();
         thread2.start();
         thread3.start();
    }
}
参考:
http://activemq.apache.org/jndi-support.html
http://shift-alt-ctrl.iteye.com/blog/1915240
http://coderbase64.iteye.com/blog/2081937
分享到:
评论

相关推荐

    JMS IBM MQ 订阅模式

    JMS 操作IBM MQ,实现订阅发布模式!包含完整的JAR包以及测试文件。

    在spring boot中使用jms集成IBM-MQ和TLQ,包含普通队列和主题订阅两种模式,并实现按需加载

    ##################################1、工程说明##################...3) 实现了普通队列消息发送与监听,实现了基于TOPIC的消息发布与订阅 4) IBM-MQ无需提前创建主题,TongLink需要提前创建主题以及对应的虚拟队列;

    javax.jms-3.1.2.jar.zip

    Java消息服务(JMS Java Message Services)提供了点对点模式(Point-to-Point Queue)和发布-订阅模式(Publish-Subscribe Topics).JMS 开发所需的jar包,以及源代码

    JMS的一个非常好的demo

    自己写的一个JMS的demo,包含点对点,发布/订阅的例子,以及DeliverMode的测试,还有持久的订阅者,解决topic模式下消息接收者离线后接收不到离线时错过的消息的问题。

    JMS 教程 - 消息队列、消息服务

    本教程描述了消息服务的概念和一些应用事例,说明了 JMS 的基本概念和结构,并就消息服务的发布/订阅、点对点模式编写了简单代码,最后讨论的JMS的一些高级问题,如食物、可靠性、可恢复性等。

    JMS pubsub实例

    JMS pubsub实例 提供学习 希望有帮助 (*^__^*) 嘻嘻……

    Message-Driven Bean 培训PPT

    通过JMS 介绍讲解MDB的使用,分别讲解的P2P和发布/订阅等模式的消息通信,并附有参考价值的代码。 1、JMS(Java Message Service) 2、PTP (点对点) 3、Pub/sub (发布/订阅) 4、MDB 的生命周期

    品优购_day13_SpringJms_V1.31

    品优购电商系统开发第13章消息中间件解决方案JMS 课程目标目标1:理解消息中间件、JMS等概念目标2:掌握JMS点对点与发布订阅模式的收发

    基于JMS体系结构的消息服务技术的应用研究

    根据企业应用程序间进行消息服务的实际需要,深入研究Java消息服务(Java Message Service,JMS)技术,结合其中的"发布/订阅式"和"点对点式"两种消息收发模式,提出了一个基于JMS体系结构的全新的消息服务模型,并针对股票...

    JMS+ActiveMQ 完整样例代码

    完整的JMS 与 ActiveMQ 消息提供者的样例代码,对点对点模式、发布订阅者模式都有用例,有一个简单的聊天器,适合初学者

    Spring 实现远程访问详解——jms和activemq

    同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/ 2. 什么是JMS JMS的全称是Java ...

    基于JMS的混合安全消息模型在煤矿应急系统中的应用

    为了解决集团式煤矿企业特殊的上报和下发应急信息的业务要求,根据JMS的点对点和发布/订阅2种消息传递模式,提出了一种分布式混合安全消息传递模型。该模型不仅解决了单个消息传递模式不能解决集团式煤矿企业特殊业务...

    ActiveMQ是Apache软件基金会所研发开源的消息中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信

    ActiveMQ是Apache软件基金会所研发开源的消息中间件,为应用程序提供高效的、可扩展的、稳定的... ActiveMQ提供了两种消息模式:点对点模式(Queue)、发布订阅模式(Topic),这两种模式基本上可以覆盖大部分的需求了

    Spring boot整合消息队列RabbitMQ实现四种消息模式(适合新手或者开发人员了解学习RabbitMQ机制)

    在业务逻辑的异步处理,系统解耦,分布式通信以及控制高并发的场景下,消息队列有着...并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持更多的消息路由和消息模式。

    TIBCO_RV__VS__IBM_MQ和JMS消息中间件的对比分析

    TIBCO Rendezvous(或称为TIBCO RV)产品是一种中间件,它具有发布/订阅(Publish/Subscribe)、基于主题寻址(Subject-Based Addressing) 和自定义数据信息(Self-Describing Data Messages)等专利技术功能,使不同应用...

    ActiveMQ in Action最新版

    股票信息系统使用了发布/订阅模式。发布者会广播股票的价格信息到每一个感兴趣的订阅者。消息被发送到成为“主题”的地址,然后客户端均从这个地址接收消息。 任务队列系统使用了点对点模式。消息生产者发送工作消息...

    《JAVA消息服务》PDF]

    构建应用程序使用点至点和发布与订阅消息传递模式使用情况,如交易和持久订阅的功能,使应用程序的可靠消息内实施企业JavaBeans(EJB)的使用REST风格的应用程序和消息驱动bean使用的JMS Spring应用框架 消息是一个...

    RocketMQ_原理解析

    支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 支持拉(pull)和推(push)两种消息模式 单一队列百万消息的堆积能力 支持多种消息协议,如 JMS、MQTT 等 ...

    基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能

    本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向...并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。AMQP作为比JMS更加高级的消息协议,支持更多的消息路由和消息模式

Global site tag (gtag.js) - Google Analytics