`

Sping整合ActiveMQ(二.Spring 对JMS的支持 )

阅读更多

Sping整合ActiveMQ(二.Spring 对JMS的支持 )

Spring提供了一个用于简化JMS API使用的抽象框架,用户利用Spring使用JMS可以不用关心connection factory,session等资源的管理.类似于对JDBC的抽象,Spring提供了一个JmsTemplate类,抽象了操作JMS都必须做的公共步骤,留下了callback方法给用户去实现,如提供消息的真正内容等.

本文主要讲解一下怎么使用Spring来发送消息,接受消息和异步接受消息(MessageListener).

一,发送消息:

1,通过JmsTemplate的send方法和提供一个MessageCreator的实现的最简单应用:

JAVA类(用配置文件中配置的默认的queue):

  1. public class JMSsenderBean {  
  2.     private JmsTemplate jmsTemplate;  
  3. public void simpleSend() {  
  4.         this.jmsTemplate.send(new MessageCreator() {  
  5.             public Message createMessage(Session session) throws JMSException {  
  6.                 return session.createTextMessage("hello queue world");  
  7.             }  
  8.         });  
  9.     }  

配置:(本例在weblogic上配置了JMS服务)

  1. <beans>  
  2.   
  3.     <bean id="JMSsenderBean" class="com.test.spring.jms.JMSsenderBean">  
  4.         <property name="jmsTemplate">  
  5.             <ref local="jmsTemplate"></ref>  
  6.         </property>  
  7.         <property name="queue">  
  8.             <ref local="destination1"></ref>  
  9.         </property>  
  10.     </bean>  
  11.   
  12.     <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">  
  13.         <property name="jndiName">  
  14.             <value>jms/jmsconf</value>  
  15.         </property>  
  16.         <property name="jndiTemplate">  
  17.             <ref local="jndiTemplate"></ref>  
  18.         </property>  
  19.     </bean>  
  20.   
  21.     <bean id="destination1" class="org.springframework.jndi.JndiObjectFactoryBean">  
  22.         <property name="jndiName">  
  23.             <value>jms/jmsq1</value>  
  24.         </property>  
  25.         <property name="jndiTemplate">  
  26.             <ref local="jndiTemplate"></ref>  
  27.         </property>  
  28.     </bean>  
  29.   
  30.     <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">  
  31.         <property name="jndiName">  
  32.             <value>jms/jmsq</value>  
  33.         </property>  
  34.         <property name="jndiTemplate">  
  35.             <ref local="jndiTemplate"></ref>  
  36.         </property>  
  37.     </bean>  
  38.   
  39.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  40.         <property name="connectionFactory">  
  41.             <ref local="connectionFactory" />  
  42.         </property>  
  43.         <property name="defaultDestination">  
  44.             <ref local="destination" />  
  45.         </property>  
  46.     </bean>  
  47.   
  48.     <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">  
  49.         <property name="environment">  
  50.             <props>  
  51.                 <prop key="java.naming.factory.initial">  
  52.                     weblogic.jndi.WLInitialContextFactory  
  53.                 </prop>  
  54.                 <prop key="java.naming.provider.url">  
  55.                     t3://localhost:7001  
  56.                 </prop>  
  57.                 <!--  
  58.                     <prop key="java.naming.security.authentication"> weblogic </prop>  
  59.                     <prop key="java.naming.security.credentials"> security </prop>  
  60.                 -->  
  61.             </props>  
  62.         </property>  
  63.     </bean>  
  64.   
  65. </beans>  

2,在发送的时候指定Queue:(配置同上)

  1. public void withQSend() {  
  2.   
  3.     this.jmsTemplate.send(queue, new MessageCreator() {  
  4.         public Message createMessage(Session session) throws JMSException {  
  5.             return session.createTextMessage("hello queue world to jmsq1");  
  6.         }  
  7.     });  
  8. }  

3,通过JmsTemplate的convertAndSend方法和提供一个MessageConverter的实现来将传入的对象转成message:

  1.     public void convertAndSend(MsgObject object) {  
  2.         this.jmsTemplate.setMessageConverter(new MyMessageConverter());  
  3.          
  4.         this.jmsTemplate.convertAndSend(object);  
  5.     }  
  6.   
  7. public class MyMessageConverter implements MessageConverter{  
  8.   
  9.     public Object fromMessage(Message message) throws JMSException, MessageConversionException {  
  10.         TextMessage msg = (TextMessage)message;  
  11.         MsgObject obj = new MsgObject();  
  12.         obj.setName("from message");  
  13.         return obj;  
  14.     }  
  15.   
  16.     public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {  
  17.         MsgObject obj = (MsgObject)object;  
  18.         return session.createTextMessage("from Object MsgObject " + obj.getName() + "  " + obj.getAge());  
  19.     }  
  20. }  

4,实现SessionCallback,利用JmsTemplate的execute方法来对session进行操作,可以发送和接受消息.

  1.     public void sessionCbkSend() {  
  2.   
  3.         this.jmsTemplate.execute(new MySessionCallback(this.jmsTemplate));  
  4.     }  
  5.   
  6.    
  7.   
  8. public class MySessionCallback implements SessionCallback{  
  9.     private JmsTemplate jmsTemplate;  
  10.   
  11.     public MySessionCallback(JmsTemplate jmsTemplate) {  
  12.          
  13.         this.jmsTemplate = jmsTemplate;  
  14.     }  
  15.   
  16.     public Object doInJms(Session session) throws JMSException {  
  17.         jmsTemplate.send(new MessageCreator() {  
  18.             public Message createMessage(Session session) throws JMSException {  
  19.                 System.out.println("..............");  
  20.                 return session.createTextMessage("message from session back");  
  21.             }  
  22.         });  
  23.         System.out.println("..............");  
  24.         return null;  
  25.     }  
  26. }  

二,接受消息

  1. public void receiveMsg() {  
  2.     TextMessage msg = (TextMessage)this.jmsTemplate.receive();  
  3.     try {  
  4.         System.out.println(msg.getText());  
  5.     } catch (JMSException e) {  
  6.         e.printStackTrace();  
  7.     }         
  8. }  

三,异步接受消息:实现MessageListener,配置listenerContainer,默认情况下,Spring容器启动后,Listener就会启动.

  1. public class ExampleListener implements MessageListener {  
  2.     public void onMessage(Message message) {  
  3.         if (message instanceof TextMessage) {  
  4.             try {  
  5.                 System.out.println(((TextMessage) message).getText());  
  6.                  
  7.                 try {  
  8.                     Thread.sleep(5000);  
  9.                 } catch (InterruptedException e) {  
  10.                     e.printStackTrace();  
  11.                 }  
  12.                  
  13.             } catch (JMSException ex) {  
  14.                 throw new RuntimeException(ex);  
  15.             }  
  16.         } else {  
  17.             throw new IllegalArgumentException("Message must be of type TextMessage");  
  18.         }  
  19.   
  20.     }  
  21. }  

配置

  1. <bean id="messageListener" class="com.test.spring.jms.ExampleListener">  
  2. </bean>  
  3.   
  4. <bean id="jmsContainer"  
  5.     class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  6.     <property name="connectionFactory" ref="connectionFactory" />  
  7.     <property name="destination" ref="destination" />  
  8.     <property name="messageListener" ref="messageListener" />  
  9. </bean>  

通过listenerContainer的stop和shutdown方法停止服务.

        jmsContainer.stop();
        jmsContainer.shutdown();

对listener的事务管理:

如果是本地事务,只需要设置listenerContainer的sessionTransacted就可以了.

  1. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  2. ...  
  3. <property name="sessionTransacted" value="true"/>  
  4. </bean>  

如果在onMessage中还有对其他资源,如数据库的操作,需要使用JTA来控制全局事务.

  1. <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>  
  2.   
  3. <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  4.   
  5. ...  
  6.   
  7.   <property name="transactionManager" ref="transactionManager" />  
  8.   
  9. </bean> 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics