`

JMS 异步传输与Spring结合实例

 
阅读更多
在Spring 2.0之前,Spring的JMS的作用局限于产生消息。这个功能(封装在 JmsTemplate 类中)当然是很好的,但是,它没有描述完整的JMS堆栈,比如像消息的 异步 产生和消耗。JMS堆栈缺少的这一部分已经被添加,Spring 2.0现在提供对消息异步消耗的完整支持。
让我们从一个例子开始。
首先我们打开ActiveMQ。从ActiveMQ的安装路径上的bin目录,那里有一个ActiveMQ.bat,双击执行即可。不过要注意必须先设置java_home环境变量。ActiveMQ默认的服务端口是61616。
然后我们开始配置Spring配置文件。我起名为spring-jms.xml

首先要配置一个ConnectionFactory代码如下
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>

这里用到的ConnectionFactory是ActiveMQ提供的工厂,为了能使用这个工厂,我们必须在项目中添加以下几个jar文件:
geronimo-jms_1.1_spec-1.0.jar,
activeio-core-3.0-beta3.jar,
activemq-core-4.0.1.jar,
backport-util-concurrent-2.1.jar,
commons-logging-1.0.4.jar,
geronimo-j2ee-management_1.0_spec-1.0.jar
以上这些Jar文件都存在于ActiveMQ安装目录的lib目录下,这些可是我一个一个试验出来的,累个半死。。

然后应该配置一个Queue(我使用的是点对点方式),不过ActiveMQ只要提供一个名字就可以自动创建队列,因此这一步省了,呵呵
下 面就轮到Spring的支持类了,首先是JmsTemplate。这个类提供了大量的方法简化我们对JMS的操作。常用的有两个, org.springframework.jms.core.JmsTemplate102和 org.springframework.jms.core.JmsTemplate,这两个类分别支持JMS的1.02版本和1.1版本。现在比较常用 的还是1.02版本。配置如下
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="timeToLive" value="86400000"/>
<property name="defaultDestinationName" value="cmpp" />
<property name="messageConverter" ref="messageConverter" />
<property name="receiveTimeout" value="30000" />
</bean>

上 面的配置中用到了第一步配置的connectionFactory以及一个消息转换的类messageConverter,这个类实现了 org.springframework.jms.support.converter.MessageConverter接口,可以在消息发送之前和接 受之后进行消息类型转换。具体的看最后的实例代码。配置代码如下:

<!-- Spring JMS SimpleConverter -->
<bean id="simpleConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<!-- Message Converter -->
<bean id="messageConverter"
class="com.liangj.apmgt.jms.ApmgtMessageConverter">
<property name="converter">
<ref local="simpleConverter" />
</property>
</bean>


这里还配置了发送的消息的存在时间timeToLive,目标Queue的名字defaultDestinationName,接受消息超时时间receiveTimeout

配置发送代码
<!-- Message porducer -->
<bean id="producer"
class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>


接着配置监听器,这是Spring2.0新增的功能,配置如下:

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean
class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="onMessage" />
<property name="messageConverter" ref="messageConverter" />
</bean>

<!-- and this is the attendant message listener container -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="${jms.destinationName.cmpp}" />
<property name="messageSelector" value="${jms.messageSelector}" />
<property name="messageListener" ref="messageListener" />
</bean>

Spring配置监听器有很多种选择,在这里我选择这回种MessageListenerAdapter方法主要是因为这个方法比较灵活。实现他只要一个很普通的java类即可,和JMS以及Spring的耦合度最低。其中方法onMessage可以随便修改方法名,只要在配置文件中对应的修改就好了。
MessageListenerAdapter还有一个功能就是如果处理方法(我这里是onMessage)返回一个非空值,它将自动返回一个响应消息。这个消息会返回给JMS Reply-To属性定义的目的地(如果存在),或者是MessageListenerAdapter设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。
这样我们的配置就都完成了。接下来我们来实现对应的Java文件
先是接口文件发送消息接口IApmgtMessageProducer.java

public interface IApmgtMessageProducer {
public abstract void sendMessage(ApmgtMessageData messageData);
}

接受消息接口IApmgtMessageListener.java
public interface IApmgtMessageListener {
public void onMessage(ApmgtMessageData message);
}

发消息的文件DefaultApmgtMessageProducer.java
public class DefaultApmgtMessageProducer implements IApmgtMessageProducer {

private JmsTemplate jmsTemplate;

public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

public void sendMessage(ApmgtMessageData messageData) {
this.jmsTemplate.convertAndSend(messageData);
}
}

收消息文件DefaultApmgtMessageListener.java

public class DefaultApmgtMessageListener implements IApmgtMessageListener {
public void onMessage(ApmgtMessageData message) {
System.out.println("监听到消息:"+message);
}
}

消息转换类ApmgtMessageConverter.java
public class ApmgtMessageConverter implements MessageConverter {

private Log log = LogFactory.getLog(ApmgtMessageConverter.class);

private SimpleMessageConverter converter;

public void setConverter(SimpleMessageConverter converter) {
this.converter = converter;
}

public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (message instanceof ObjectMessage) {
ObjectMessage o_message = (ObjectMessage)message;
MessageHeader header = new MessageHeader();
header.setId(message.getLongProperty("id"));
header.setReceiver(message.getIntProperty("receiver"));
header.setSender(message.getIntProperty("sender"));
header.setSendPerson(message.getStringProperty("sendPerson"));
header.setType(message.getIntProperty("type"));
Serializable messageContent = o_message.getObject();
ApmgtMessageData<Serializable> messageData = new ApmgtMessageData<Serializable>();
messageData.setMessageContent(messageContent);
messageData.setMessageHeader(header);
return messageData;
}
return null;
}

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
if (object instanceof ApmgtMessageData) {
ApmgtMessageData data = (ApmgtMessageData) object;
Message message = converter.toMessage(data.getMessageContent(), session);
message.setLongProperty("id", data.getMessageHeader().getId());
message.setIntProperty("receiver", data.getMessageHeader().getReceiver());
message.setIntProperty("sender", data.getMessageHeader().getSender());
message.setIntProperty("type", data.getMessageHeader().getType());
message.setStringProperty("sendPerson", data.getMessageHeader().getSendPerson());
log.info("发送消息[MessageSender]:\n" + message);
return message;
} else {
return null;
}
}

}

消息类文件 消息父类:ApmgtMessageData.java
public class ApmgtMessageData<T extends Serializable>{

protected T messageContent;

protected MessageHeader messageHeader;

public T getMessageContent() {
return this.messageContent;
}

public MessageHeader getMessageHeader() {
return this.messageHeader;
}

public void setMessageContent(T messageContent) {
this.messageContent = messageContent;
}
public void setMessageHeader(MessageHeader messageHeader) {
this.messageHeader = messageHeader;
}

}

消息属性的一个类MessageHeader.java
public class MessageHeader {

/**
* 消息ID
*/
private long id;

/**
* 消息类型
*/
private int type;

/**
* 消息发送方,发送消息的模块
*/
private int sender;

/**
* 消息接收方,接收消息的模块
*/
private int receiver;

/**
* 消息发送者,具体的用户
*/
private String sendPerson;

public MessageHeader(){
this.id = System.currentTimeMillis() ;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public String getSendPerson() {
return sendPerson;
}

public void setSendPerson(String sendPerson) {
this.sendPerson = sendPerson;
}

public int getReceiver() {
return receiver;
}

public void setReceiver(int receiver) {
this.receiver = receiver;
}

public int getSender() {
return sender;
}

public void setSender(int sender) {
this.sender = sender;
}

public int getType() {
return type;
}

public void setType(int type) {
this.type = type;
}

}

消息体的类ModPasswordRequest.java
public class ModPasswordRequest implements Serializable{


private static final long serialVersionUID = 1L;

/**
* 旧密码
*/
private String oldPassword;

/**
* 新密码
*/
private String newPassword;

public String getNewPassword() {
return newPassword;
}

public void setNewPassword(String newPassword) {
this.newPassword = newPassword;
}

public String getOldPassword() {
return oldPassword;
}

public void setOldPassword(String oldPassword) {
this.oldPassword = oldPassword;
}

}

消息类:ApmgtModPasswordRequest.java
public class ApmgtModPasswordRequest extends ApmgtMessageData<ModPasswordRequest> {



private static final int REQ_MODPASSWORD = 0;
private static final int INTF = 1;
private static final int APMGT = 2;

public void init(){
messageHeader = new MessageHeader();
messageContent = new ModPasswordRequest();
messageHeader.setType(REQ_MODPASSWORD);
messageHeader.setSender(INTF);
messageHeader.setReceiver(APMGT);
messageContent.setNewPassword("123456");
messageContent.setOldPassword("654321");
}

}

最后是测试类Main.java
public class Main {

public static void main(final String[] args) throws Exception {

PropertyConfigurator.configure("log4j.properties");

AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] { "spring-jms.xml" });
// ctx.registerShutdownHook();

IApmgtMessageProducer producer = (IApmgtMessageProducer)ctx.getBean("producer");

ApmgtModPasswordRequest messageData = new ApmgtModPasswordRequest();
messageData.setMessageHeader(new MessageHeader());
messageData.setMessageContent(new ModPasswordRequest());
messageData.init();

producer.sendMessage(messageData);
}
}

还有两个配置文件,第一个spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/s ... beans.xsd">


<bean id="propertyConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>apmgt.properties</value>
</list>
</property>
</bean>

<!-- ####################################### -->
<!-- JMS Spring Beans -->
<!-- ####################################### -->

<!-- Jms ConnectionFactory -->
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerURL}" />
</bean>

<!-- Spring JMS SimpleConverter -->
<bean id="simpleConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter" />

<!-- JMS Queue Template -->
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate102">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="timeToLive" value="${jms.timeToLive}"/>
<property name="defaultDestinationName" value="${jms.destinationName.cmpp}" />
<property name="messageConverter" ref="messageConverter" />
<property name="receiveTimeout" value="${jms.receiveTimeout}" />
</bean>

<!-- Message Converter -->
<bean id="messageConverter"
class="com.liangj.apmgt.jms.ApmgtMessageConverter">
<property name="converter">
<ref local="simpleConverter" />
</property>
</bean>

<!-- Message porducer -->
<bean id="producer"
class="com.liangj.apmgt.jms.DefaultApmgtMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean
class="com.liangj.apmgt.jms.DefaultApmgtMessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="onMessage" />
<property name="messageConverter" ref="messageConverter" />
</bean>

<!-- and this is the attendant message listener container -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="${jms.destinationName.cmpp}" />
<property name="messageSelector" value="${jms.messageSelector}" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>

apmgt.properties

#jms properties
jms.brokerURL=tcp://localhost:61616
jms.receiveTimeout=3000
jms.destinationName.cmpp=cmpp
jms.messageSelector=receiver=2
#one day is 86400000 ms. 0 is means that it lives forever.
jms.timeToLive=86400000

分享到:
评论

相关推荐

    JMS-Spring

    JMS 异步传输与Spring结合实例,文件传输结合spring jms

    tomcat spring jms 异步消息传递入门实例

    tomcat spring jms 异步消息传递入门实例

    spring jms tomcat 异步消息传递入门实例

    spring jms tomcat 异步消息传递入门实例

    Spring 实现远程访问详解——jms和activemq

    本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...

    基于ActiveMQ的jms通讯

    简单的实现了jms的发送与接收,实现了异步通讯的功能 是一个与spring相结合的代码实例

    基于Spring+JMS+ActiveMQ+Tomcat整合

    基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

    spring-int-jms:使用多层、ActiveMQ 和 Netty 的 Spring 集成项目

    该项目试图解决的主要挑战是通过调用 TCP 异步服务断开请求回复的实现,从而原始 jms 请求标头将无法传递到通过基于 TCP 的网络的消息中。 Spring Integration 示例使用网关描述了以 JMS 为中心的请求回复,但在...

    Spring集成ActiveMQ配置

    Spring 集 成ActiveMQ 配置 异步RPC框架 Missian ActiveMq-JMS简单实例使用tomcat

    Spring-Reference_zh_CN(Spring中文参考手册)

    2.4.3. 异步的JMS 2.4.4. JDBC 2.5. Web层 2.5.1. Spring MVC的表单标签库 2.5.2. Spring MVC合理的默认值 2.5.3. Portlet 框架 2.6. 其他特性 2.6.1. 动态语言支持 2.6.2. JMX 2.6 .3. 任务规划 2.6.4. 对Java 5...

    Spring 2.0 开发参考手册

    2.4.3. 异步的JMS 2.4.4. JDBC 2.5. Web层 2.5.1. Spring MVC的表单标签库 2.5.2. Spring MVC合理的默认值 2.5.3. Portlet 框架 2.6. 其他特性 2.6.1. 动态语言支持 2.6.2. JMX 2.6.3. 任务规划 2.6.4. 对...

    Spring中文帮助文档

    2.4.4. 异步的JMS 2.4.5. JDBC 2.5. Web层 2.5.1. Spring MVC合理的默认值 2.5.2. Portlet 框架 2.5.3. 基于Annotation的控制器 2.5.4. Spring MVC的表单标签库 2.5.5. 对Tiles 2 支持 2.5.6. 对JSF 1.2支持...

    Spring API

    2.4.4. 异步的JMS 2.4.5. JDBC 2.5. Web层 2.5.1. Spring MVC合理的默认值 2.5.2. Portlet 框架 2.5.3. 基于Annotation的控制器 2.5.4. Spring MVC的表单标签库 2.5.5. 对Tiles 2 支持 2.5.6. 对JSF 1.2支持...

    spring chm文档

    2.4.3. 异步的JMS 2.4.4. JDBC 2.5. Web层 2.5.1. Spring MVC的表单标签库 2.5.2. Spring MVC合理的默认值 2.5.3. Portlet 框架 2.6. 其他特性 2.6.1. 动态语言支持 2.6.2. JMX 2.6.3. 任务规划 2.6.4. 对...

    JavaEE开发的颠覆者SpringBoot实战[完整版].part3

    涵盖使用Spring Boot 进行Java EE 开发的绝大数应用场景,包含:Web 开发、数据访问、安全控制、批处理、异步消息、系统集成、开发与部署、应用监控、分布式系统开发等。 第一部分 点睛Spring 4.x 第1 章 Spring ...

    JavaEE开发的颠覆者SpringBoot实战[完整版].part2

    涵盖使用Spring Boot 进行Java EE 开发的绝大数应用场景,包含:Web 开发、数据访问、安全控制、批处理、异步消息、系统集成、开发与部署、应用监控、分布式系统开发等。 第一部分 点睛Spring 4.x 第1 章 Spring ...

    JavaEE开发的颠覆者SpringBoot实战[完整版].part1

    涵盖使用Spring Boot 进行Java EE 开发的绝大数应用场景,包含:Web 开发、数据访问、安全控制、批处理、异步消息、系统集成、开发与部署、应用监控、分布式系统开发等。 第一部分 点睛Spring 4.x 第1 章 Spring ...

    JAVA上百实例源码以及开源项目源代码

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

    从Java走向Java+EE+.rar

    第13章 Struts和Hibernate实例——两个与登录有关的实例 166 13.1 Struts和Hibernate的开发环境配置 166 13.1.1 数据库的安装和管理 166 13.1.2 Hibernate的安装 168 13.1.3 Struts的安装 169 13.2 实例一...

    JAVA上百实例源码以及开源项目

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

Global site tag (gtag.js) - Google Analytics