`

jms 学习笔记2

 
阅读更多

JMS 异步消息传输,客户端将消息发给消息中介,可以保证消息被可靠的投递,即使在服务器服务中断,当服务恢复正常时,仍然可以恢复消息队列.  之前学习了,JMS简单送消息,但是为了方便操作以及代码的简化,Spring提供的消息转化器来简化消息发送和接收时的转化过程:

消息转化器接口: org.springframework.jms.support.converter.MessageConverter ,些接口简单明了,只有两个方法:将消息转化成对象和将圣像转化为消息,下面是个简单的例子:

package com.kesn.jms.converter;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

import com.kesn.jms.entity.Motorist;

/**
 * Spring 提供的消息转化器
 * @author LEPING LI
 */
public class MotoristConverter implements MessageConverter {

 /**
  * 将消息报转化为对象
  */
 public Object fromMessage(Message message) throws JMSException,
   MessageConversionException {
  if(!(message instanceof MapMessage)){
   throw new MessageConversionException("Message is not a MapMessage!");
  }
  MapMessage map=(MapMessage)message;
  Motorist m=new Motorist();
  m.setName(map.getString("name"));
  m.setEmail(map.getString("email"));
  m.setAge(map.getString("age"));
  return m;
 }

 /**
  * 将对象转化为消息
  */
 public Message toMessage(Object obj, Session session) throws JMSException,
   MessageConversionException {
  if(!(obj instanceof Motorist)){
   throw new MessageConversionException("obj is not a Motorits!");
  }
  Motorist m=(Motorist)obj;
  MapMessage message=session.createMapMessage();
  message.setString("name", m.getName());
  message.setString("email", m.getEmail());
  message.setString("age", m.getAge());
  return message;
 }

}

将消息转化器置入JmsTemplate

 

<!-- JMS模板 -->
 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageConverter" ref="motoristConverter"/>
 </bean>

 

如此设置,有发送消息和接收消息时使用相关方法可以消除消息的转化过程

package com.kesn.jms.service.impl;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;

import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;

import com.kesn.jms.entity.Motorist;
import com.kesn.jms.service.JmsTemplateReceiveService;
/**
 * 使用JmsTemplate 发送消息
 * @author LEPING LI
 *
 */
public class KesnJmsTemplateMessageReceiver implements JmsTemplateReceiveService{
 
 private JmsTemplate jmsTemplate;
 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }
 private Destination destination;
 public void setDestination(Destination destination) {
  this.destination = destination;
 }

 @Override
 public Motorist receiveMessage() {
//  try {
//   MapMessage message=(MapMessage) jmsTemplate.receive(destination);
//   Motorist m=new Motorist();
//   m.setName(message.getString("name"));
//   m.setEmail(message.getString("email"));
//   m.setAge(message.getString("age"));
//   return m;
//  } catch (JmsException e) {
//   e.printStackTrace();
//  }
//  return null;
  return (Motorist)jmsTemplate.receiveAndConvert(destination);
 }
}

package com.kesn.jms.service.impl;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.kesn.jms.entity.Motorist;
import com.kesn.jms.service.JmsTemplateSendService;
/**
 * 使用JmsTemplate 发送消息
 * @author LEPING LI
 *
 */
public class KesnJmsTemplateMessageSender implements JmsTemplateSendService{
 private JmsTemplate jmsTemplate;
 public void setJmsTemplate(JmsTemplate jmsTemplate) {
  this.jmsTemplate = jmsTemplate;
 }
 private Destination destination;
 public void setDestination(Destination destination) {
  this.destination = destination;
 }
 
 @Override
 public void sendMessage(final Motorist m) {
//  jmsTemplate.send(destination, new MessageCreator(){
//   @Override
//   public Message createMessage(Session session) throws JMSException {
//    MapMessage message=session.createMapMessage();
//    message.setString("name",m.getName());
//    message.setString("email",m.getEmail());
//    message.setString("age", m.getAge());
//    return message;
//   }
//  });
  jmsTemplate.convertAndSend(destination, m);
 }
}

在发送和接收的消息时,JmsTemplate 方法自动应该消息转化器

 

 

以上都是采用注入JmsTemplate 的方式来使用操作消息,和其它模板一样,Spring也提供了JmsTemplate 对应的支持类 JmsGatewaySupport ,通过继承该类,可能通过getJmsTemplate 方法获取到JmsTemplate 模板

 

通常的编程模式我们都更倾向于基于Pojo的编程,EJB2中MDB消息驱动Bean必须实现 javax.ejb.MessageDrivenBean 接口,EJB3不再需要实现该接口,但是还是要去实现一个监听器接口:MessageListener ,更像POJO,但还是POJO,Spring2中提供的消息驱动Bean采用了EJB3类型的方式,我们称Spring消息驱动Pojo为MDP ,下面是一个简单的例子:

 

创建一个消息驱动Pojo

package com.kesn.jms.mdp;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.log4j.Logger;

import com.kesn.jms.entity.Motorist;
/**
 * 创建消息监听器
 * @author LEPING LI
 *
 */
public class MarkingMdp implements MessageListener {
 private Logger log=Logger.getLogger(MarkingMdp.class);
 @Override
 public void onMessage(Message message) {
  MapMessage map=(MapMessage)message;
  Motorist m=new Motorist();
  try {
   m.setName(map.getString("name"));
   m.setEmail(map.getString("email"));
   m.setAge(map.getString("age"));
   log.info("name:"+m.getName());
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}

上面MDP就是实现一个MessageListener 监听器接口,了队列和主题有消息时,系统为自动调用MDP的onMessage方法来处理消息

但是JMS是如何自动调用onMessage方法来处理消息呢?消息监听的处理方式是这样的:消息监听器容器一个用于查看JMS目标,等待消息的特 殊Bean,一量消息到达,就会获取消息,并通过onMessage方法来处理消息;Spring提供的 消息监听器容器有以下三种:

SimpleMessageListenerContainer 最简单的消息监听容器,处理有限数据量的消息,不支持事务

DefaultMessageListenerContainer 支持事务

ServerSessionMessageListenerConatainer  支持更强大的功能,支持事务,允许动态管理JMS会话

 

消息监听器有Spring中的配置如下:

<!-- 消息监听器 -->
 <bean id="markingMdp" class="com.kesn.jms.mdp.MarkingMdp"/>
 <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="destination" ref="kesnQDestination"/>
  <property name="messageListener" ref="markingMdp"/>
 </bean>

 

如果需要事务可以使用:DefaultMessageListenerConatiner 配置如下

<!-- 消息监听器 -->
 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="destination" ref="kesnQDestination"/>
  <property name="messageListener" ref="markingMdp"/>
  <property name="transactionManager" ref="jmsTransactionManager"/>
 </bean>
 
 <!-- JMS事务管理器 -->
 <bean  id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="connectionFactory"/>
 </bean>

 

 

下面学习一下如何用Spring创建一个纯POJO的MDP

上面的方式或多或不都使用了一定的JMS API,如何实现一个更POJO的MDP呢,Spring提供了一适配器 MessageListenerAdpater ,是一个MessageListener ,可能委派Bean和指定的方法,所谓适配器简单的说就是包装目标对象,增强其功能;我们将采用spring 配置的方式对之前的MDP(或是一个简单的POJO)经过适配器包装后形成新的监听器,配置如下:

<!-- 纯POJO MDP Bean -->
    <bean id="markingPojoMdp" class="com.kesn.jms.pojomdp.MarkingMdp"/>
   
    <bean id="purePojoMdp" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="markingPojoMdp"/>
        <property name="defaultListenerMethod" value="handleMessage"/>
    </bean>
   
    <!-- 配置监听器容器 -->
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="kesnQDestination"/>
        <property name="messageListener" ref="purePojoMdp"/>
    </bean>

 

 

通过在监控器中转入消息转化器,可以简单化消息接收的发送代码

<bean id="purePojoMdp" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="markingPojoMdp"/>
        <property name="defaultListenerMethod" value="handleMessage"/>
        <property name="messageConverter" ref="motoristConverter"/>
    </bean>

 

接收消息的代码可以转化为:

public void handleMessage(Motorist m) {
        log.info("name:"+m.getName());
    }

 

至些我们建立的MDP已经是完成基于POJO的了

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics