`
Donald_Draper
  • 浏览: 950917 次
社区版块
存档分类
最新评论

Spring与ActiveMQ的集成

阅读更多
JMS与MQ详解:http://www.fx114.net/qa-48-91234.aspx
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在前文中我们讲过ActiveMQ的PTP和PUB/SUB模式实例,今天我们来看一下ActiveMQ与Spring的集成,Spring为4.0.4,ActiveMQ为5.12.1,在做下面的测试之间,要先添加ActiveMQ的用户:
如下:
修改ActiveMQ安装目录下的配置文件夹下的jetty.xml
[root@zabbix conf]# vim jetty.xml 

 <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="user,admin" />
        <!-- set authenticate=false to disable login -->
        <property name="authenticate" value="true" />
    </bean>
    <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="admin" />
         <!-- set authenticate=false to disable login -->
        <property name="authenticate" value="true" />
    </bean>

保证authenticate属性为true;
再添加用户信息,修改jetty-realm.properties文件,我的如下:
[root@zabbix conf]# more jetty-realm.properties 
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
## 
## http://www.apache.org/licenses/LICENSE-2.0
## 
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
#用户名:密码,角色
admin: admin, admin
user: 123456, user
[root@zabbix conf]# 


重启ActiveMQ,即可;

需要引入的jar包,



注意不要直接引入activemq-all-5.12.1,这种方式容易产生包冲突;

1.我们先来测试JMSTemplate,操纵ActiveMQ的方式
ActiveMQ属性配置文件activemq.properties
# ActiveMQ settings
activemq.brokerURL=tcp://192.168.126.128:61616
activemq.userName=user
activemq.password=123456
activemq.queueName=testQueue
activemq.topicName=testTopic

 
引入属性文件:
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">  
		<property name="locations">  
			<list>  
                 <value>/WEB-INF/classes/jdbc.properties</value>  
                 <value>/WEB-INF/classes/redis.properties</value>
                 <value>/WEB-INF/classes/activemq.properties</value>
                 
            </list>  
        </property>  
</bean> 

ActiveMQ配置文件activemq-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans	 xmlns="http://www.springframework.org/schema/beans" 
		   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		   xmlns:p="http://www.springframework.org/schema/p" 
		   xmlns:tx="http://www.springframework.org/schema/tx"
		   xmlns:context="http://www.springframework.org/schema/context"
		   xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.0.xsd
			   ">
         <!-- 配置JMS连接工厂 -->  
        <bean id="connectionFactoryMQ"  class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="${activemq.brokerURL}" />  
            <property name="userName" value="${activemq.userName}" />  
            <property name="password" value="${activemq.password}" />  
        </bean>  
        
    <!-- 消息类型转换 -->  
    <bean id="msgConverter" class="com.activemq.help.MsgConverterHelper"/>
    <!-- 配置Jms模板 -->  
        <!-- 发送消息的目的地(队列) -->  
        <bean id="testQueue"  class="org.apache.activemq.command.ActiveMQQueue">  
            <!-- 设置消息队列的名字 -->  
            <constructor-arg index="0" value="${activemq.queueName}" />  
        </bean>  
        <!-- 配置Jms模板  -->  
        <bean id="jmsQueueTemplate"  class="org.springframework.jms.core.JmsTemplate">  
            <property name="connectionFactory" ref="connectionFactoryMQ" />  
            <property name="defaultDestination" ref="testQueue" />  
            <!-- 接收消息时的超时时间 -->
            <!--<property name="receiveTimeout" value="10000" />  --> 
            <!-- 消息类型转换 -->  
        	<property name="messageConverter" ref="msgConverter"></property> 
        </bean>           
        <!-- 发送消息的目的地(主题) -->  
        <bean id="testTopic"  class="org.apache.activemq.command.ActiveMQTopic">  
            <!-- 设置消息队列的名字 -->  
            <constructor-arg index="0" value="${activemq.topicName}" />  
        </bean>  
        <!-- 配置TopicJms模板  -->  
        <bean id="jmsTopicTemplate"  class="org.springframework.jms.core.JmsTemplate">  
            <property name="connectionFactory" ref="connectionFactoryMQ" />  
            <property name="defaultDestination" ref="testTopic" />  
            <!-- 配置是否为发布订阅者模式,默认为false -->  
            <property name="pubSubDomain" value="true"/>  
            <!-- 接收消息时的超时时间 -->
            <!--<property name="receiveTimeout" value="10000" />  -->  
             <!-- 消息类型转换 -->  
        	<property name="messageConverter" ref="msgConverter"></property>
        </bean>  
</beans>			 


引用配置文件:
<import resource="activemq-context.xml" />


新建订单实体类:

package com.enity;

import java.io.Serializable;

public class Order implements Serializable{
	
	/**
	 * 
	 */
	private static final long serialVersionUID = -343247274477730446L;
	private Integer id;//订单id
	private Double amount;
	private Integer goodsId;//商品id
	private Integer goodsAmount;//商品数量
	private Integer shopId;//店铺名
	public Integer getId() {
		return id;
	}
	public void setId(Integer id) {
		this.id = id;
	}
	public Double getAmount() {
		return amount;
	}
	public void setAmount(Double amount) {
		this.amount = amount;
	}
	public Integer getGoodsId() {
		return goodsId;
	}
	public void setGoodsId(Integer goodsId) {
		this.goodsId = goodsId;
	}
	public Integer getGoodsAmount() {
		return goodsAmount;
	}
	public void setGoodsAmount(Integer goodsAmount) {
		this.goodsAmount = goodsAmount;
	}
	public Integer getShopId() {
		return shopId;
	}
	public void setShopId(Integer shopId) {
		this.shopId = shopId;
	}
	public String toString(){
		return "订单id:"+this.id+","+"金额(元):"+this.amount+","+"商品id:"+
	          this.shopId+","+"商品数量:"+this.goodsAmount+","+"店铺id:"+this.shopId;
	}
	
}

队列消息生产者:

package com.activemq.service.imp;

import java.util.Date;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import com.activemq.service.QueueProducerService;
import com.enity.Order;
/**
 * 生产Queue
 * @author donald
 * @date 2016-12-27
 * @time 上午10:19:27
 */
@Service
public class QueueProducerServiceImp implements QueueProducerService{
	private static final Logger log = LoggerFactory.getLogger(QueueProducerServiceImp.class);
	@Resource(name="jmsQueueTemplate")
	JmsTemplate jmsTemplate;
	@Resource(name="testQueue")
	Destination testQueue;
	/**
	 * 发送队列消息
	 */
	public void send() {
		MessageCreator messageCreator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage();
				message.setText("QueueProducerService发送消息:" + new Date());
				return message;
			}
		};
		jmsTemplate.send(this.testQueue, messageCreator);
		log.info("========向ActiveMq testQueue发送信息");
	}
	/**
	 * 发送并转换队列消息
	 */
	@Override
	public void convertAndSend() {
		Order order = new Order();
		order.setId(1);
		order.setAmount(150.62);
		order.setGoodsId(15);
		order.setGoodsAmount(2);
		order.setShopId(5656);
		jmsTemplate.convertAndSend(this.testQueue, order);  
	}
}


package com.activemq.service;

public interface QueueProducerService {
	public void send();
	public void convertAndSend();
}


队列消息消费者:

package com.activemq.service.imp;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import com.activemq.service.QueueConsumerService;
import com.enity.Order;
/**
 * Queue消费
 * @author donald
 * @date 2016-12-27
 * @time 上午10:16:58
 */
@Service
public class QueueConsumerServiceImp implements QueueConsumerService{
	private static final Logger log = LoggerFactory.getLogger(QueueConsumerServiceImp.class);
	@Resource(name="jmsQueueTemplate")
	JmsTemplate jmsTemplate;
	/**
	 * 接受队列消息
	 */
	public String receive() {
		String result = null;
		TextMessage message = (TextMessage) jmsTemplate.receive();
		try {
			log.info("======QueueConsumerService收到消息:" + message.getText());
			result = message.getText();
		} catch (JMSException e) {
			e.printStackTrace();
		}
		return result;
	}
	/**
	 * 接受并转换队列消息
	 */
	@Override
	public Object receiveAndConvert() {
		Order order = (Order)jmsTemplate.receiveAndConvert();
		return order;
	}
}

package com.activemq.service;

public interface QueueConsumerService {
	public String receive();
	public Object receiveAndConvert();
}


订阅主题消息发布者:

package com.activemq.service.imp;


import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import com.activemq.service.TopicPublisherService;
import com.enity.Order;

@Service
public class TopicPublisherServiceImp implements TopicPublisherService {
	private static final Logger log = LoggerFactory
			.getLogger(TopicPublisherServiceImp.class);
	@Resource(name = "jmsTopicTemplate")
	JmsTemplate jmsTemplate;
	@Resource(name = "testTopic")
	Destination testTopic;
    /**
     * 发送订阅主题消息
     */
	public void send() {
		MessageCreator messageCreator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				ObjectMessage message = session.createObjectMessage();
				Order order = new Order();
				order.setId(1);
				order.setAmount(150.62);
				order.setGoodsId(15);
				order.setGoodsAmount(2);
				order.setShopId(5656);
				// 我们也可以将Object转换为Json String,作为TextMessage来传送,
				//在消费再反Json String 为Obejct
				message.setObject(order);
				return message;
			}
		};
		jmsTemplate.send(this.testTopic, messageCreator);
		log.info("========向ActiveMq testTopic发送订单信息");
	}
	/**
	 * 发送并转换订阅主题消息
	 */
	@Override
	public void convertAndSend() {
		Order order = new Order();
		order.setId(1);
		order.setAmount(150.62);
		order.setGoodsId(15);
		order.setGoodsAmount(2);
		order.setShopId(5656);
		jmsTemplate.convertAndSend(this.testTopic, order);  
	}
}

package com.activemq.service;

public interface TopicPublisherService {
	public void send();
	public void convertAndSend();
}



订阅主题消息订阅者:

package com.activemq.service.imp;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import com.activemq.service.TopicSubscriberService;
import com.enity.Order;
@Service
public class TopicSubscriberServiceImp implements TopicSubscriberService{
	private static final Logger log = LoggerFactory.getLogger(TopicSubscriberServiceImp.class);
	@Resource(name="jmsTopicTemplate")
	JmsTemplate jmsTemplate;
	/**
	 * 接受订阅主题消息
	 */
	public Order receive() {
		ObjectMessage objMessage = (ObjectMessage) jmsTemplate.receive();
		Order order = null;
		try {
			order = (Order)objMessage.getObject();
			log.info("==========TopicSubscriberService收到订单信息:"+ order.toString());
		} catch (JMSException e) {
			e.printStackTrace();
		}
		return order;
	}
	/**
	 * 接受并转换订阅主题消息
	 */
	@Override
	public Object receiveAndConvert() {
		Order order = (Order)jmsTemplate.receiveAndConvert();
		return order;
	}
}

package com.activemq.service;

import com.enity.Order;

public interface TopicSubscriberService {
	public Order receive();
	public Object receiveAndConvert();
}



消息转化器:

package com.activemq.help;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

import util.JsonUtil;

import com.enity.Order;
/**
 * 消息转换器
 * @author donald
 * @date 2016-12-27
 * @time 上午11:46:06
 */
public class MsgConverterHelper implements MessageConverter{
	private static final Logger log = LoggerFactory.getLogger(MsgConverterHelper.class);
	@Override
	public Object fromMessage(Message mess) throws JMSException,
			MessageConversionException {
		Order order  = null;
		if(!(mess instanceof TextMessage)) {  
            throw new MessageConversionException("Message is not TextMessage");  
        }  
		else{
			TextMessage tMess = (TextMessage)mess;
			order = JsonUtil.fromJson(tMess.getText(), Order.class);
			log.info("=====转换JSON字符串为订单信息=======");
		}
		return order;
	}

	@Override
	public Message toMessage(Object obj, Session session) throws JMSException,
			MessageConversionException {
		TextMessage textMessage = null;  
		 if (!(obj instanceof Order)) {  
	            throw new MessageConversionException("Object is not Order");  
	       }  
		 else {
			    textMessage = session.createTextMessage();  
			    Order order = (Order) obj;  
		        textMessage.setText(JsonUtil.toJson(order));  
		        log.info("=====转换订单信息为JSON字符串======");
		 }
		return textMessage;
	}  
  
   
}  


测试主类:
package com.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.activemq.service.QueueConsumerService;
import com.activemq.service.QueueProducerService;
import com.activemq.service.TopicPublisherService;
import com.activemq.service.TopicSubscriberService;
import com.controller.base.BaseController;
import com.enity.Order;

import util.JsonUtil;

/**
 * 测试PTP&PUB/SUB
 * @author donald
 * @date 2016-12-27
 * @time 上午11:03:57
 */
@Controller
@RequestMapping(value="/activemq")
public class ActiveMqController extends BaseController{
	private static final Logger log = LoggerFactory.getLogger(ActiveMqController.class);
	@Autowired
	private QueueProducerService queueProducerService;
	@Autowired
	private QueueConsumerService queueConsumerService;
	@Autowired
	private TopicPublisherService topicPublisherService;
	@Autowired
	private TopicSubscriberService topicSubscriberService;
     
	/**
	 * 发送队列消息
	 * @return
	 */
	@RequestMapping("/qsend")
	@ResponseBody
	public String queueSend(){
		queueProducerService.send();
		return "queueProducerService send message ok!";
	}
	/**
	 * 接受队列消息
	 * @return
	 */
	@RequestMapping("/qrecevie")
	@ResponseBody
	public String queueRecevie(){
		String message = queueConsumerService.receive();
		return message;
	}
	/**
	 * 发送队列消息(消息转化器)
	 * @return
	 */
	@RequestMapping("/qsendConver")
	@ResponseBody
	public String queueSendConver(){
		queueProducerService.convertAndSend();
		return "queueProducerService convertAndSend message ok!";
	}
	/**
	 * 接受队列消息(消息转化器)
	 * @return
	 */
	@RequestMapping("/qrecevieConver")
	@ResponseBody
	public String queueRecevieConver(){
		Order order  = (Order) queueConsumerService.receiveAndConvert();
		return JsonUtil.toJson(order);
	}
	/**
	 * 发送订阅主题消息
	 * @return
	 */
	@RequestMapping("/tsend")
	@ResponseBody
	public String topicSend(){
		topicPublisherService.send();
		return "topicPublisherService send order info ok!";
	}
	/**
	 * 接受订阅主题的消息
	 * @return
	 */
	@RequestMapping("/trecevie")
	@ResponseBody
	public String topicRecevie(){
		Order order = topicSubscriberService.receive();
		return JsonUtil.toJson(order);
	}
	/**
	 * 发送订阅主题消息(消息转化器)
	 * @return
	 */
	@RequestMapping("/tsendConver")
	@ResponseBody
	public String topicSendConver(){
		topicPublisherService.convertAndSend();
		return "topicPublisherService convertAndSend order info ok!";
	}
	/**
	 * 接受订阅主题的消息(消息转化器)
	 * @return
	 */
	@RequestMapping("/trecevieConver")
	@ResponseBody
	public String topicRecevieConver(){
		Order order = (Order) topicSubscriberService.receiveAndConvert();
		return JsonUtil.toJson(order);
	}
}


package com.controller.base;

public abstract class BaseController {
  
}

启动应用测试,我们用RestClient测试:

访问http://localhost:8080/test/activemq/qsend

RestClient ReponseBody 显示:
queueProducerService send message ok!
控制台输出:
[ INFO] 2016-12-27 16:51:11  QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息

访问http://localhost:8080/test/activemq/qrecevie

RestClient ReponseBody 显示:
QueueProducerService发送消息:Tue Dec 27 16:51:11 CST 2016
控制台输出:
[ INFO] 2016-12-27 16:51:31  QueueConsumerServiceImp:32 :======QueueConsumerService收到消息:QueueProducerService发送消息:Tue Dec 27 16:51:11 CST 2016


访问http://localhost:8080/test/activemq/qsendConver

RestClient ReponseBody 显示:
queueProducerService convertAndSend message ok!
控制台输出:
[ INFO] 2016-12-27 16:53:50  MsgConverterHelper:50 :=====转换订单信息为JSON字符串======

访问http://localhost:8080/test/activemq/qrecevieConver

RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 16:55:42  MsgConverterHelper:34 :=====转换JSON字符串为订单信息=======

访问http://localhost:8080/test/activemq/tsend

RestClient ReponseBody 显示:
topicPublisherService send order info ok!
控制台输出:
[ INFO] 2016-12-27 16:56:46  TopicPublisherServiceImp:48 :========向ActiveMq testTopic发送订单信息

访问http://localhost:8080/test/activemq/trecevie

RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 16:58:30  TopicSubscriberServiceImp:27 :==========TopicSubscriberService收到订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656

访问http://localhost:8080/test/activemq/tsendConver

RestClient ReponseBody 显示:
topicPublisherService convertAndSend order info ok!
控制台输出:
[ INFO] 2016-12-27 17:06:37  MsgConverterHelper:50 :=====转换订单信息为JSON字符串======

访问http://localhost:8080/test/activemq/trecevieConver

RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 17:09:41  MsgConverterHelper:34 :=====转换JSON字符串为订单信息=======

上面的测试只能,进行手动接受队列和订阅主题消息,如何自动接受队列和订阅主题消息,这就要用到MessageListener、org.springframework.jms.listener.DefaultMessageListenerContainer,
实现MessageListener并配置DefaultMessageListenerContainer的消息监听器messageListener即可

2.消息监听器

队列消息监听器:

package com.activemq.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Queue监听器
 * @author donald
 * @date 2016-12-27
 * @time 上午10:15:57
 */
public class QueueConsumerMessageListener implements MessageListener {
	private static final Logger log = LoggerFactory.getLogger(QueueConsumerMessageListener.class);
	public void onMessage(Message msg) {
		if (msg instanceof TextMessage) {
			TextMessage textMessage = (TextMessage) msg;
			try {
				log.info("--队列 MessageListener收到信息:"+ textMessage.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}


订阅主题消息监听器:


package com.activemq.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.enity.Order;

/**
 * Topic 监听器
 * @author donald
 * @date 2016-12-27
 * @time 上午10:16:02
 */
public class TopicSubscriberMessageListener implements MessageListener {
	private static final Logger log = LoggerFactory.getLogger(TopicSubscriberMessageListener.class);
	public void onMessage(Message msg) {
		if (msg instanceof ObjectMessage) {
			ObjectMessage objMessage = (ObjectMessage) msg;
			try {
				Order order = (Order)objMessage.getObject();
				log.info("--订阅者 MessageListener收到订单信息:"+ order.toString());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}


ActiveMQ配置文件activemq-context.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans	 xmlns="http://www.springframework.org/schema/beans" 
		   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		   xmlns:p="http://www.springframework.org/schema/p" 
		   xmlns:tx="http://www.springframework.org/schema/tx"
		   xmlns:context="http://www.springframework.org/schema/context"
		   xsi:schemaLocation="
			http://www.springframework.org/schema/beans 
			http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
			http://www.springframework.org/schema/tx 
			http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
			http://www.springframework.org/schema/context
			http://www.springframework.org/schema/context/spring-context-3.0.xsd
			   ">
         <!-- 配置JMS连接工厂 -->  
        <bean id="connectionFactoryMQ"  class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="${activemq.brokerURL}" />  
            <property name="userName" value="${activemq.userName}" />  
            <property name="password" value="${activemq.password}" />   
        </bean>  
        <!-- 消息类型转换 -->  
        <bean id="msgConverter" class="com.activemq.help.MsgConverterHelper"/>
        <!-- 发送消息的目的地(队列) -->  
        <bean id="testQueue"   class="org.apache.activemq.command.ActiveMQQueue">  
            <!-- 设置消息队列的名字 -->  
            <constructor-arg index="0" value="${activemq.queueName}" />  
        </bean>  
        <!-- 配置Jms模板  -->  
        <bean id="jmsQueueTemplate"  class="org.springframework.jms.core.JmsTemplate">  
            <property name="connectionFactory" ref="connectionFactoryMQ" />  
            <property name="defaultDestination" ref="testQueue" />  
            <!-- 接收消息时的超时时间 -->
            <!--<property name="receiveTimeout" value="10000" />  --> 
            <!-- 消息类型转换 -->  
        	<property name="messageConverter" ref="msgConverter"></property>  
        </bean>  
        <!-- 消息监听方式 -->  
        <bean id="queueConsumMesListener" class="com.activemq.listener.QueueConsumerMessageListener"/> 
        <bean id="testMsgQueuelistenerContainer"  
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
            <property name="connectionFactory" ref="connectionFactoryMQ" />  
            <property name="destination" ref="testQueue" />  
            <property name="messageListener" ref="queueConsumMesListener" />  
            <property name="receiveTimeout" value="10000" />  
        </bean>  
        <!-- 发送消息的目的地(主题) -->  
        <bean id="testTopic"  class="org.apache.activemq.command.ActiveMQTopic">  
            <!-- 设置消息队列的名字 -->  
            <constructor-arg index="0" value="${activemq.topicName}" />  
        </bean>  
        <!-- 配置TopicJms模板  -->  
        <bean id="jmsTopicTemplate"  class="org.springframework.jms.core.JmsTemplate">  
            <property name="connectionFactory" ref="connectionFactoryMQ" />  
            <property name="defaultDestination" ref="testTopic" />  
            <!-- 配置是否为发布订阅者模式,默认为false -->  
            <property name="pubSubDomain" value="true"/>  
            <!-- 接收消息时的超时时间 -->
            <!--<property name="receiveTimeout" value="10000" />  --> 
            <!-- 消息类型转换 -->  
        	<property name="messageConverter" ref="msgConverter"></property>  
        </bean>  
        <!-- 消息监听方式 -->
        <bean id="topicSubMesListener" class="com.activemq.listener.TopicSubscriberMessageListener"/>  
        <bean id="testMsgTopiclistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
            <property name="connectionFactory" ref="connectionFactoryMQ" />  
            <property name="destination" ref="testTopic" />  
            <property name="messageListener" ref="topicSubMesListener" />  
            <property name="pubSubDomain" value="true" />  
            <property name="receiveTimeout" value="10000" />  
        </bean>  
</beans>			


启动应用,测试消息监听器:

访问http://localhost:8080/test/activemq/tsend

RestClient ReponseBody 显示:
topicPublisherService send order info ok!
控制台输出:
[ INFO] 2016-12-27 17:20:25  TopicSubscriberMessageListener:26 :--订阅者 MessageListener收到订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
[ INFO] 2016-12-27 17:20:25  TopicPublisherServiceImp:48 :========向ActiveMq testTopic发送订单信息

访问http://localhost:8080/test/activemq/qsend

RestClient ReponseBody 显示:
queueProducerService send message ok!
控制台输出:
[ INFO] 2016-12-27 16:51:11  QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
[ INFO] 2016-12-27 17:20:41  QueueConsumerMessageListener:23 :--队列 MessageListener收到信息:QueueProducerService发送消息:Tue Dec 27 17:20:41 CST 2016
[ INFO] 2016-12-27 17:20:41  QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
  • 大小: 35.5 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics