`

利用JMS Topic发布/订阅消息

    博客分类:
  • jms
阅读更多

2003年3月11日那一期(使用JMS Queue) 解释了如何使用Java消息服务(Java Messaging Service,JMS)Queue进行点到点的消息发送。下面这一技巧将解释如何使用JMS Topic实现发布/订阅式的消息发送。

发布/订阅式的消息发送

发布/订阅式的消息发送中,一个发布者利用一个方法调用将每条消息发送给多个预订者。介于发布者和预订者之间的是一台消息服务器。在JMS中,消息服务器被叫做“JMS提供者”。发布者发送消息到JMS提供者,预订者从JMS提供者接收消息。

下图演示了这一方案。

在JMS中,发布/订阅式的消息发送使用JMS管理的一个叫做Topic的对象来管理发布者到预订者的消息流。JMS发布者又叫做消息生产者,而JMS预订者又叫做消息消费者。消息生产者获得服务器上一个JMS Topic的引用,并向该Topic发送消息。当消息到达时,JMS提供者负责通知所有预订了该Topic的消息消费者。JMS提供者每次发送消息后(可选地)将接收到消息收据的确认。

这一过程描述如下:

使用JMS Topic的发布/订阅式消息发送在几方面类似于点到点的消息发送。以下是两种消息发送方式共同的特点:

  • 消息发送可以是面向对象的,允许将整个的对象作为消息发送。
  • 消息发送可以是事务性的。
  • 消息发送可以是同步或异步的。
  • 消息发送可以与基础的第三方产品集成。
  • 消息可以发送给在消息发送时不在运行的消息消费者(即QueueReceiver或TopicSubscriber)。
  • 消息一旦被递送到队列或主题,发送消息的函数调用就会立即返回。
  • 可以显式地或自动地确认收到消息。

发布/订阅式消息发送与点到点消息发送之间也有几点不同:

  • 发布/订阅式消息发送是一对多的,而点到点消息发送是一对一的。
  • 发布的消息只递送给Topic当前的预订者。客户只能接收到他向一个Topic预订了的那些消息。而在点到点消息发送中,永久的消息将一直在Queue中,直到它超时或者某个接收者来取走该消息。
  • 发布/订阅式消息发送中的永久消息是由“耐久的预订”提供的。JMS提供者存储由于预订者出于某种原因不可用而无法递送给预订者的消息。在下次预订者连接上的时候,这些存储的消息将会被递送给他。这确保了客户预订一个Topic之后,所有发布的消息都会递送给他,哪怕该预订者不是总在运行。如果预订不是耐久的,在预订者掉线时发布的任何消息都不会递送到预订者。

发布/订阅消息和点到点消息发送没有优劣之分,它们是相互补充的工具,各自用于不同的目的。点到点消息发送通常用于消息接收者在一个系统内有惟一的标识的情况下。发布/订阅式消息发送更多地用于一个系统中的几个代理需要知道某个事件或条件何时出现这种情况下。

JMS消息发送模型非常类似于常规的Java 2编程中的事件侦听器。点到点消息发送就像一个单播事件侦听器模型,而发布/订阅消息就像一个组播侦听器模型。传统的Java事件侦听器与JMS(不同于编程语法)之间的差别是,事件源和侦听器分别叫做消息生产者和消费者。JMS消息生产者和消费者可以运行在不同的地址空间,甚至是在不同的机器上。JMS消息发送还提供比传统的事件侦听器模型所实现的更高级别的服务。不过基本的消息发送模型是相同的。

该技巧的示例代码由三个程序组成:

  1.  
    1. 一个servlet,名字叫做PublishWeatherServlet,它向JMS Topic发布一个XML格式的天气报告。
    2. 一个命令行Java应用程序,名字叫做WeatherReceiver,它向Topic发出预订并打印接收到的XML消息。
    3. 一个GUI应用客户端,名字叫做WeatherClient,它解析并以图形方式显示XML消息中的数据。

下面是用于发布天气报告的HTML页面和Web表单、运行命令行预订者的终端会话和GUI应用的屏幕快照:

发布消息到Topic

名叫PublishWeatherServlet的servlet从HTML表单接收POST参数,并转化为XML格式,然后使用JMS Topic将产生的XML文档发布到所有的侦听器。该servlet中的大多数代码用于接收POST参数并将它们转化为XML文档。代码的有趣部分在于发布方法。该方法接收一个String参数,其中包含有将发布的XML文本。下面我们来仔细研究发布方法,看它是如何发布到JMS Topic的:

    1.Get a TopicConnectionFactory and a Topic.

                  protected void publish(String text) {              

                     TopicConnectionFactory tcf = null;
                      Topic topic = null;
          

                        try {                 
                         Context jndiContext = new InitialContext();
                         tcf = (TopicConnectionFactory)10.
                         jndiContext.lookup(
    11.
                         "java:comp/env/jms/TopicConnectionFactory");
    12.
                      topic = (Topic) jndiContext.lookup(
    13.
                         "java:comp/env/jms/Topic");
    14.
                   } catch (NamingException nameEx) {

                      System.err.println(nameEx.toString());
    16.
                   }

该代码使用Java命名和目录接口(Naming and Directory Interface,JNDI)API在JMS提供者上查找两个对象:Topic和TopicConnectionFactory。该servlet将发送消息到Topic。TopicConnectionFactory用于创建一个到JMS提供者的连接。请注意该servlet用于查找这些对象的名称。记住,J2EE应用中所有对象的JNDI API名称都应该以java:comp/env/打头

    Create a Connection, Session, and Publisher.

                TopicConnection tc = null;                       

            try {  21.
               tc = tcf.createTopicConnection();
              TopicSession ts =
                 tc.createTopicSession(
                    false, Session.AUTO_ACKNOWLEDGE);
              TopicPublisher tp =
                 ts.createPublisher(topic);

该代码使用从JNDI API得到的TopicConnectionFactory来创建TopicConnection。TopicConnection可用于创建TopicSession。用于创建TopicSession的参数告诉连接创建一个不是事务性的TopicSession,并且自动地确认消息收到。(如果消息递送是事务性的,那么在同一TopicSession中发送的所有消息将形成一个工作单元,该单元可以被提交或回滚)。然后,使用Session创建TopicPublisher,TopicPublisher充当消息发布的通道的角色。

注意,J2EE 2.0规范指出,JMS消息的事务和确认是由J2EE容器管理的。这意味着如果代码运行在容器中,那么这些参数就可以被忽略。遗憾的是,并不是所有的提供商都会按照这一要求去做。如果事务、确认或者这二者对您的应用很重要,请务必检查您的产品文档,或者自己测试这些参数的行为。这些参数应该像在容器外工作一样。

    Create and publish the message.

                   TextMessage textMessage =
                      ts.createTextMessage();

                   textMessage.setText(text);

                   tp.publish(textMessage);

                   ...

            } // End of method publish

发布消息很简单。TopicSession充当创建新的TextMessage的工厂。该代码将TextMessage的文本设置为包含将发送的XML的字符串,然后使用TopicPublisher将消息发布到Topic。

发布方法到此就介绍完了。JMS提供者负责将消息递送给所有的预订者。

Topic预订消息和接收消息

命令行程序WeatherReceiver向Topic预订消息并打印出从该Topic接收到的任何消息。为了简化,预订Topic的过程被封装在辅助类SubscriptionHelper中。WeatherReceiver类充当一个异步消息接收者,并实际执行输出操作。

  1. 向一个Topic预订消息

    以下的代码来自类SubscriptionHelper,创建对一个Topic的预订:
      

    protected TopicConnection _tc;
        ...
       
        public SubscriptionHelper(String tcfName,
                               String topicName,
                               MessageListener listener) {

           // Get references to topic connection factory
           // and topic. \
           _tc = null;

           TopicConnectionFactory tcf = null;
           Topic topic = null;

           try {
              InitialContext ic = new InitialContext();
              tcf = (TopicConnectionFactory)
                 ic.lookup(tcfName);
              topic = (Topic) ic.lookup(topicName);
           } catch (NamingException e) {
              System.err.println(e.toString());
              e.printStackTrace(System.err);
           }

           try {
              // Create a connection and so on

              // Subscribe self to topic--messages will be
              // delivered to this.onMessage()
              _tc = tcf.createTopicConnection();
              TopicSession ts =

                 _tc.createTopicSession(
                   false, Session.AUTO_ACKNOWLEDGE);
              TopicSubscriber tsub =

               ts.createSubscriber(topic);
              tsub.setMessageListener(listener);
           } catch (JMSException e) {
              System.err.println(e.toString());

              e.printStackTrace(System.err);
              close();
           }
        }

SubscriptionHelper类的大部分等同于发布者代码。它使用JNDI API来获得对Topic 和TopicConnectionFactory的引用,并创建TopicConnection和TopicSession对象。但是该类不是创建一个TopicPublisher,而是创建一个TopicSubscriber,并将TopicSubscriber的消息侦听器设置为已经传递进来的MessageListener。从此以后,每当该Topic接收到一条消息,该消息就会被递送到MessageListener的onMessage方法。因为这种方式中使用了一个回调,因此该例子演示了异步消息接收。

2.Receiving Messages

接收消息惟一的要求是类实现接口javax.jms.MessageListener。WeatherReceiver类本身是一个MessageListener。MessageListener接口只有一个方法:onMessage。WeatherReceiver的onMessage方法出现在下面:

       

    public class WeatherReceiver implements
           MessageListener {
           // Print a weather message when it is received
           public void onMessage(Message message) {
              try {
                  if (message instanceof TextMessage) {
                     TextMessage m = (TextMessage) message;
                     System.out.println(
                        "--- Received weather report");
                     System.out.println(m.getText());
                     System.out.println("----------");
                  } else {
                     System.out.println(
                        "Received message of type " +
                         message.getClass().getName());
                  }
             } catch (JMSException e) {
                System.err.println(e.toString());
                e.printStackTrace(System.err);
             }
          }
          ...
          public static void main(String[] args) {
             if (args.length != 2) {
                System.out.println(
                   "Usage: WeatherReceiver " +
                   "topicConnectionFactorName topicName");
                System.exit(1);
             }


            // Create a receiver, then set it up to listen
           // for messages on the topic. Then wait for
            // messages and print them as they come in.
            WeatherReceiver wr = new WeatherReceiver();
            SubscriptionHelper sh =
               new SubscriptionHelper(args[0], args[1], wr);
       
           // Wait for publications...
            System.out.println(
              "Waiting for publications to topic " +
              args[1]);
            sh.waitForMessages();
         }

WeatherReceiver的主方法创建一个WeatherReceiver实例和一个SubscriptionHelper实例。它向SubscriptionHelper传递应用将会使用的WeatherReceiver、Topic名称和TopicConnectionFactory(这些参数在命令行指定)。SubscriptionHelper实例创建预订。然后主方法将WeatherReceiver注册为来自Topic的消息的消费者。

onMessage方法只是在适当的地方将接收到的Messages转化为类TextMessage,并输出接收到的XML文档。

注意,在Web层使用JMS侦听器是一个坏主意。实际上,J2EE 1.3参考实现不允许这样做。服务器端JMS侦听器在EJB层被适当模型化为消息驱动Bean。

部署Web应用

有了JMS,发布/订阅式消息发送的代码就很容易。但是,部署描述文件提出了一个需要解决的问题。PublishWeatherServlet是一个使用JNDI API查找外部组件的Web组件。Web组件使用编码名称查找外部资源(比如Topics和TopicConnectionFactories)。部署描述文件必须将这些编码名称定义为资源引用或资源环境引用。下面从Web应用的部署描述文件web.xml抽取出的代码定义了servlet使用的编码名称(该代码出现在web.xml中的<welcome-file-list>之后):

   <!-- JMS topics and connection factories used -->
  <resource-env-ref>
    <resource-env-ref-name>
      jms/Topic
    </resource-env-ref-name>
    <resource-env-ref-type>
      javax.jms.Topic
    </resource-env-ref-type>
  </resource-env-ref>
  <resource-ref>
    <res-ref-name>
      jms/TopicConnectionFactory

     </res-ref-name>
    <res-type>
      javax.jms.TopicConnectionFactory
    </res-type>
    <res-auth>
      Container
    </res-auth>
  </resource-ref>

resource-env-ref块定义名称“jms/Topic”的类型为javax.jms.Topic。字符串“jms/Topic”是用于查找Topic ("java:comp/env/jms/Topic")的字符串,其中“java:comp/env/”部分删除了。产品的部署工具允许应用部署人员将这一名称映射到环境中的一个Topic。

在J2EE参考实现中,这一映射已经预先配置在文件META-INF/sun-j2ee-ri.xml中的Web档案中。该文件是Web应用的运行时部署描述文件。部署描述文件在名称和内容方面都是特定于提供商的。

resource-ref块定义了TopicConnectionFactory的名称、类型和授权模式。通常,部署人员会使用部署工具将编码名称jms/TopicConnectionFactory与平台中的TopicConnectionFactory相关联。J2EE参考实现预配置了在JNDI中命名空间中叫做jms/TopicConnectionFactory的TopicConnectionFactory。

  • 大小: 12.1 KB
  • 大小: 7.7 KB
  • 大小: 23.4 KB
分享到:
评论
1 楼 qinxcb 2011-10-26  
能否将实例代码发布一下,谢谢。

相关推荐

Global site tag (gtag.js) - Google Analytics