`
liyuandong
  • 浏览: 329769 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JMS简明学习教程(二)

    博客分类:
  • JMS
阅读更多

 实战篇

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/Linyufa/archive/2009/07/24/4375670.aspx

 

 前面对JMS概念的作了一个基本介绍,下面我们看一个具体的例子程序

 

Pub/sub方式的消息传递的例子:

l    HelloPublisher.java

 

package com.jms.test;

 

import java.util.Hashtable;

 

import javax.jms.JMSException;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicPublisher;

import javax.jms.TopicSession;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

 

/**

 * pub/sub方式的消息发送程序

 */

public class HelloPublisher {

   

    TopicConnection topicConnection;// JMS连接,属于Pub/Sub方式的连接

    TopicSession topicSession; //JMS会话,属于Pub/Sub方式的会话

    TopicPublisher topicPublisher; //消息发布者

    Topic topic; //主题

   

    public HelloPublisher(String factoryJNDI, String topicJNDI)

           throws JMSException, NamingException {

       Hashtable<String, String> env = new Hashtable<String, String>();

       //设置好连接JMS容器的属性,不同的容器需要的属性可能不同,需要查阅相关文档

       env.put(Context.INITIAL_CONTEXT_FACTORY,

              "org.jnp.interfaces.NamingContextFactory");

       env.put(Context.PROVIDER_URL, "localhost:1099");

       env.put("java.naming.rmi.security.manager", "yes");

       env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");

 

       //创建连接JMS容器的上下文(context)

       Context context = new InitialContext(env);

 

       //通过连接工厂的JNDI名查找ConnectionFactory

       TopicConnectionFactory topicFactory =

           (TopicConnectionFactory) context.lookup(factoryJNDI);

 

       //用连接工厂创建一个JMS连接

       topicConnection = topicFactory.createTopicConnection();

 

       //通过JMS连接创建一个Session

       topicSession = topicConnection.createTopicSession(false,

              Session.AUTO_ACKNOWLEDGE);

 

       //通过上下文查找到一个主题(topic)

       topic = (Topic) context.lookup(topicJNDI);

 

       //用session来创建一个特定主题的消息发送者

       topicPublisher = topicSession.createPublisher(topic);

    }

   

 

    /**

     * 发布一条文本消息

     * @param msg 待发布的消息

     * @throws JMSException

     */

    public void publish(String msg) throws JMSException {

       //用session来创建一个文本类型的消息

       TextMessage message = topicSession.createTextMessage();

       message.setText(msg);//设置消息内容

       topicPublisher.publish(topic, message);//消息发送,发送到特定主题

    }

 

    public void close() throws JMSException {

       topicSession.close();//关闭session

       topicConnection.close();//关闭连接

    }

 

    public static void main(String[] args)

       throws JMSException, NamingException {

       HelloPublisher publisher =

           new HelloPublisher("ConnectionFactory", "topic/testTopic");

       try {

           for (int i = 1; i < 11; i++) {

              String msg = "Hello World no. " + i;

              System.out.println("Publishing message: " + msg);

              publisher.publish(msg);

           }

           publisher.close();//session和connection用完之后一定记得关闭

       } catch (Exception ex) {

           ex.printStackTrace();

       }

    }

}

程序在控制台输出:

Publishing message: Hello World no. 1

Publishing message: Hello World no. 2

Publishing message: Hello World no. 3

Publishing message: Hello World no. 4

Publishing message: Hello World no. 5

Publishing message: Hello World no. 6

Publishing message: Hello World no. 7

Publishing message: Hello World no. 8

Publishing message: Hello World no. 9

Publishing message: Hello World no. 10

 

 

l        HelloSubscriber.java

 

package com.jms.test;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicSession;

import javax.jms.TopicSubscriber;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

 

/**

 * pub/sub方式下的消息接收器。注意,这个消息接收器可以与上面的消息发送器可以工作

* 在不同的JVM中,只要保证它们各自能够连通JMS容器(JMS Provider)

 *

 */

public class HelloSubscriber implements MessageListener {

    TopicConnection topicConnection;

    TopicSession topicSession;

    TopicSubscriber topicSubscriber;

    Topic topic;

 

    public HelloSubscriber(String factoryJNDI, String topicJNDI)

           throws JMSException, NamingException {

Hashtable<String, String> env = new Hashtable<String, String>();

       //设置好连接JMS容器的属性,不同的容器需要的属性可能不同,需要查阅相关文档

       env.put(Context.INITIAL_CONTEXT_FACTORY,

              "org.jnp.interfaces.NamingContextFactory");

       env.put(Context.PROVIDER_URL, "localhost:1099");

       env.put("java.naming.rmi.security.manager", "yes");

       env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");

       Context context = new InitialContext();

 

       TopicConnectionFactory topicFactory =

           (TopicConnectionFactory) context.lookup(factoryJNDI);

       //创建连接

       topicConnection = topicFactory.createTopicConnection();

       topicSession = topicConnection.createTopicSession(false,

              Session.AUTO_ACKNOWLEDGE);//创建session

       topic = (Topic) context.lookup(topicJNDI);//查找到主题

       //用session创建一个特定queue的消息接收者

       topicSubscriber = topicSession.createSubscriber(topic);

       //注册监听,这里设置的监听是自己,因为本类已经实现了MessageListener接口,

       //一旦queueReceiver接收到了消息,就会调用本类的onMessage方法

       topicSubscriber.setMessageListener(this);

       System.out.println("HelloSubscriber subscribed to topic: "

              + topicJNDI);

       topicConnection.start();//启动连接,这时监听器才真正生效

    }

 

    public void onMessage(Message msg) {

       try {

           if (msg instanceof TextMessage) {

              //把Message 转型成 TextMessage 并提取消息内容

              String msgTxt = ((TextMessage) msg).getText();

              System.out.println("HelloSubscriber got message: " +

                  msgTxt);

           }

       } catch (JMSException ex) {

           System.err.println("Could not get text message: " + ex);

           ex.printStackTrace();

       }

    }

 

    public void close() throws JMSException {

       topicSession.close();

       topicConnection.close();

    }

 

    public static void main(String[] args) {

       try {

           new HelloSubscriber("TopicConnectionFactory",

              "topic/testTopic");

       } catch (Exception ex) {

           ex.printStackTrace();

       }

    }

}

程序在控制台输出:

HelloSubscriber subscribed to topic: topic/testTopic

HelloSubscriber got message: Hello World no. 1

HelloSubscriber got message: Hello World no. 2

HelloSubscriber got message: Hello World no. 3

HelloSubscriber got message: Hello World no. 4

HelloSubscriber got message: Hello World no. 5

HelloSubscriber got message: Hello World no. 6

HelloSubscriber got message: Hello World no. 7

HelloSubscriber got message: Hello World no. 8

HelloSubscriber got message: Hello World no. 9

HelloSubscriber got message: Hello World no. 10

 

 

 

P2P方式下的消息传递

l         HelloQueue.java

 

package com.jms.test;

 

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueConnection;

import javax.jms.QueueSession;

import javax.jms.QueueSender;

import javax.jms.Queue;

import javax.jms.TextMessage;

import javax.jms.Session;

import javax.jms.JMSException;

 

import java.util.Hashtable;

 

public class HelloQueue {

    QueueConnection queueConnection; //queue方式的JMS连接

    QueueSession queueSession; //queue会话

    QueueSender queueSender; //queue消息发送者

    Queue queue; //消息队列

 

    public HelloQueue(String factoryJNDI, String topicJNDI)

            throws JMSException, NamingException {

        //连接JMS Provider的环境参数

        Hashtable<String, String> props = new Hashtable<String, String>();

        props.put(Context.INITIAL_CONTEXT_FACTORY,

                "org.jnp.interfaces.NamingContextFactory");

        //JMS provider的主机和端口

        props.put(Context.PROVIDER_URL, "localhost:1099");

        props.put("java.naming.rmi.security.manager", "yes");

        props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");

        Context context = new InitialContext(props);

       

        //lookup到连接工厂

        QueueConnectionFactory queueFactory =

            (QueueConnectionFactory) context.lookup(factoryJNDI);

        queueConnection = queueFactory.createQueueConnection();//创建连接

        queueSession = queueConnection.createQueueSession(false,

                Session.AUTO_ACKNOWLEDGE);//创建会话

 

        queue = (Queue) context.lookup(topicJNDI);//lookup到特定的消息队列

 

        queueSender = queueSession.createSender(queue);//创建队列消息的发送者

 

    }

 

    public void send(String msg) throws JMSException {

        TextMessage message = queueSession.createTextMessage();

        message.setText(msg);

        queueSender.send(queue, message);

    }

 

    public void close() throws JMSException {

        queueSession.close();

        queueConnection.close();

    }

 

    public static void main(String[] args) {

        try {

            HelloQueue queue = new HelloQueue("ConnectionFactory",

                    "queue/testQueue");

            for (int i = 11; i < 21; i++) {

                String msg = "Hello World no. " + i;

                System.out.println("Hello Queue Publishing message: " + msg);

                queue.send(msg);

            }

            queue.close();

        } catch (Exception ex) {

            System.err.println("An exception occurred " +

"while testing HelloPublisher25: " + ex);

            ex.printStackTrace();

        }

    }

}

 

程序在控制台输出:

 

Hello Queue Publishing message: " Hello World no. 11

Hello Queue Publishing message: " Hello World no. 12

Hello Queue Publishing message: " Hello World no. 13

Hello Queue Publishing message: " Hello World no. 14

Hello Queue Publishing message: " Hello World no. 15

Hello Queue Publishing message: " Hello World no. 16

Hello Queue Publishing message: " Hello World no. 17

Hello Queue Publishing message: " Hello World no. 18

Hello Queue Publishing message: " Hello World no. 19

Hello Queue Publishing message: " Hello World no. 20

 

 

 

l        HelloRecvQueue.java

 

package com.jms.test;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueSession;

import javax.jms.QueueReceiver;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

 

public class HelloRecvQueue implements MessageListener {

    QueueConnection queueConnection;

    QueueSession queueSession;

    QueueReceiver queueReceiver;

    Queue queue;

 

    public HelloRecvQueue(String factoryJNDI, String topicJNDI)

            throws JMSException, NamingException {

        Context context = new InitialContext();

        QueueConnectionFactory queueFactory =

            (QueueConnectionFactory) context.lookup(factoryJNDI);

        queueConnection = queueFactory.createQueueConnection();

        queueSession = queueConnection.createQueueSession(false,

                Session.AUTO_ACKNOWLEDGE);

        queue = (Queue) context.lookup(topicJNDI);

 

        queueReceiver = queueSession.createReceiver(queue);

        queueReceiver.setMessageListener(this);

        System.out.println("HelloReceQueue receiver to queue: " + topicJNDI);

        queueConnection.start();

    }

 

    public void onMessage(Message m) {

        try {

            String msg = ((TextMessage) m).getText();

            System.out.println("HelloReceQueue got message: " + msg);

        } catch (JMSException ex) {

            System.err.println("Could not get text message: " + ex);

            ex.printStackTrace();

        }

    }

 

    public void close() throws JMSException {

        queueSession.close();

        queueConnection.close();

    }

 

    Public ovid main(String[] args) {

    new HelloRecvQueue();

}

}

 

 

 

程序在控制台输出:

 

HelloReceQueue got message: Hello World no. 11

HelloReceQueue got message: Hello World no. 12

HelloReceQueue got message: Hello World no. 13

HelloReceQueue got message: Hello World no. 14

HelloReceQueue got message: Hello World no. 15

HelloReceQueue got message: Hello World no. 16

HelloReceQueue got message: Hello World no. 17

HelloReceQueue got message: Hello World no. 18

HelloReceQueue got message: Hello World no. 19

HelloReceQueue got message: Hello World no. 20

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics