`

ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息

    博客分类:
  • java
阅读更多
ActiveMQ5.0实战一: 安装配置ActiveMQ5.0
ActiveMQ5.0实战二: 基本配置

简介

实战一 , 实战二 介绍了ActiveMQ的基本概念和配置方式.

本篇将通过一个实例介绍使用spring发送,消费topic, queue类型消息的方法. 不懂topic和queue的google 之.

 

如图示, TOPIC和QUEUE分别代表一个topic和一个queue消息通道.

  1. TopicMessageProducer向topic发送消息, TopicConsumerA和TopicConsumerB则从topic消费消息.
  2. QueueMessageProducer向Queue发送消息, QueueConsumer从Queue中消费消息

Spring整合JMS

就像对orm, web的支持一样, spring同样支持jms, 为整合jms到已有的项目提供了很多便利的方法. 本篇主要讲实战, 是所以先从配置开始, spring配置jms基本上需要8个部分.

  1. ConnectionFactory. 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
  2. Destination. 有topic和queue两种方式.
  3. JmsTemplate. spring提供的jms模板.
  4. MessageConverter. 消息转换器.
  5. MessageProducer. 消息生产者.
  6. MessageConsumer. 消息消费者.
  7. MessageListener. 消息监听器
  8. MessageListenerContainer. 消息监听容器

下面以实例的方式介绍上面8个部分.

1. ConnectionFactory

<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

 brokerURL是指要连接的activeMQ server的地址, activeMQ提供了多种brokerURL, 集体可参见文档.一般我们使用嵌套的ActiveMQ server. 配置如下, 这个配置使用消息的存储机制, 服务器重启也不会丢失消息.

<!--  embedded ActiveMQ Broker -->
	<amq:broker useJmx="false" persistent="true">
		<amq:persistenceAdapter>
			<amq:amqPersistenceAdapter directory="d:/amq"/>
		</amq:persistenceAdapter>
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616" />
                       <amq:transportConnector uri="vm://localhost:0" />
		</amq:transportConnectors>
	</amq:broker>

 2. Destination

 在实例中我们使用了两种destination

<!--  ActiveMQ destinations  -->
<!--  使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />
<!--  使用Queue方式-->
<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />

 3. JmsTemplate

<!--  Spring JmsTemplate config -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!--  lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

  4. MessageConverter

   MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能. DefaultMessageConverter的实现见附件.

<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />

  5. MessageProducer

   实例拥有两个消息生产者, 消息生产者都是POJO, 实现见附件.

<!-- POJO which send Message uses  Spring JmsTemplate -->
	<bean id="topicMessageProducer" class="com.andyao.activemq.TopicMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="TOPIC" />
	</bean>
	<bean id="queueMessageProducer" class="com.andyao.activemq.QueuMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="QUEUE" />
	</bean>

 6. MessageConsumer

 TOPIC通道有两个消息消费者, QUEUE有一个消息消费者

<!--  Message Driven POJO (MDP) -->
    <!-- consumer1 for topic a -->
    <bean id="topicConsumerA" class="com.andyao.activemq.TopicConsumerA" />
    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.andyao.activemq.TopicConsumerB" />
    <!-- consumer for queue -->
    <bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />

  7. MessageListener

每一个消息消费者都对应一个MessageListener

<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerA" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

	<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerB" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="queueConsumer" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>

 8. MessageListenerContainer

 有几个MessageListener既有几个MessageListenerContainer

<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerA" />
	</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerB" />
	</bean>
    
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="QUEUE" />
		<property name="messageListener" ref="queueListener" />
	</bean>

  Summary

写spring配置文件的时候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer几个地方弄清楚:

  1. 可以有一个或者多个消息生产者向同一个destination发送消息.
  2. queue类型的只能有一个消息消费者.
  3. topic类型的可以有多个消息消费者.
  4. 每个消费者对应一个MessageListener和一个MessageListenerContainer.

 

 

 

 

分享到:
评论
46 楼 y806839048 2014-10-20  
启动activemq我试了,也不行
45 楼 y806839048 2014-10-20  
是不是要另外启动activemq
44 楼 y806839048 2014-10-20  
为什么我的两个应用中,不能实现通信
43 楼 westboy172887564 2013-04-24  
Queue支持存在多个消费者,楼主是不是最后那块写的有问题?
42 楼 zh286091487 2012-02-28  
4年前的文章,2012才享受到,悲剧,写楼主了
41 楼 xiaxiaorui2003 2012-02-22  
我的这个GenericBeanFactoryAccessor也是找不到,spring和activemq的所有jar都导入了,还是找不到
40 楼 bluethink 2011-11-20  
TestMain里面的
org.springframework.beans.factory.generic.GenericBeanFactoryAccessor;

依赖的是哪个包,我现在用的是Spring 3.0.5,并且加载了spring的所有包,还是提示
The import org.springframework.beans.factory.generic cannot be resolved,我下载的是
xbean-spring-3.5.jar 仍然找不到这个类,麻烦楼主把所有需要的jar包给列出来,谢谢
39 楼 wmj2003 2009-08-17  

package com.work.activemq;

public interface OrderNotifyInterface {
	/**
	 *  通知用户订单已经发送!生成者接口
	 * @param order
	 */
	public void notifyOrder(Order order);
	
	/**
	 * 通过topic的方式发送信息
	 * @param order
	 */
	public void notifyTopic(Order order);
}

实现如下:
package com.work.activemq;

/**
 * 用来测试mq的。
 * @author wangmingjie
 *
 */
public class OrderNotifyImpl implements OrderNotifyInterface {
	private OrderMessageProducer orderMessageProducer;
	public void setOrderMessageProducer(OrderMessageProducer orderMessageProducer) {
		this.orderMessageProducer = orderMessageProducer;
	}
	
	private TopicMessageProducer  topicMessageProducer;

	public void setTopicMessageProducer(TopicMessageProducer topicMessageProducer) {
		this.topicMessageProducer = topicMessageProducer;
	}
	
	/**
	 * 向顾客的邮箱发送订单通知,使用JMS发送.
	 */
	public void notifyOrder(Order order) {
		orderMessageProducer.send(order);
	}
	

	public void notifyTopic(Order order){
		topicMessageProducer.send(order);
	}
}
38 楼 wmj2003 2009-08-17  
package com.work.activemq;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.support.converter.MessageConverter;

/**
 * 订单消息转换类. 使用数据库持久化消息的时候使用。
 * <p>实现MessageConverter使得JMS的发送与接收者可以直接发送POJO而不是JMS Message.</p>
 *
 * @author wangmj
 * @see MessageConverter
 */

public class OrderMessageConverter implements MessageConverter {
	private static final Log log = LogFactory.getLog(OrderMessageConverter.class);
	/*
	 * (non-Javadoc)
	 *
	 * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object,
	 *      javax.jms.Session)
	 */
	public Message toMessage(Object obj, Session session) throws JMSException {
		//check Type
		if (obj instanceof Order) {
			ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
			   HashMap<String, byte[]> map = new HashMap<String, byte[]>();  
			try{
				// POJO must implements Seralizable
				ByteArrayOutputStream bos = new ByteArrayOutputStream();
				ObjectOutputStream oos = new ObjectOutputStream(bos);
				oos.writeObject(obj);
				map.put("Order", bos.toByteArray());
				objMsg.setObjectProperty("Map", map);
			} catch (IOException e) {
				log.error("toMessage(Object, Session)", e);
			}
			return objMsg;
		} else {
			throw new JMSException("Object:[" + obj + "] is not Order");
		}

	}

	/*
	 * (non-Javadoc)
	 *
	 * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)
	 */
	@SuppressWarnings("unchecked")
	public Object fromMessage(Message msg) throws JMSException {
		if (msg instanceof ObjectMessage) {
			 HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map"); 
			try {
				// POJO must implements Seralizable
				ByteArrayInputStream bis=new ByteArrayInputStream((byte[])map.get("Order") );
				ObjectInputStream ois = new ObjectInputStream(bis);
				Object returnObject = ois.readObject();
				return returnObject;
			} catch (IOException e) {
				log.error("fromMessage(Message)", e);

			} catch (ClassNotFoundException e) {
				log.error("fromMessage(Message)", e);
			}
			
			return null;
		} else {
			throw new JMSException("Msg:[" + msg + "] is not Map");
		}
	}

}


package com.work.activemq;

import javax.jms.Topic;

import org.springframework.jms.core.JmsTemplate;

/**
 * 奇怪啊,topic方式,消息被重复消费了。每次都消费两次。
 * 一旦更换为DefaultMessageConverter.java ,控制台就看不到打印信息了。<br>
 * 但是在activemq的监控界面,可以看到消息被消费了,而是web发送信息页面也提示成功。
 * 估计是和Serializable有关系。
 * @author wangmingjie
 * @date 2009-7-26上午11:21:32
 */
public class TopicMessageProducer {
    
    private JmsTemplate template;

	private Topic destination;

	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}

	public void setDestination(Topic destination) {
		this.destination = destination;
	}

	/**
	 * 发送信息
	 * @param message
	 */
	public void send(Order message) {
		template.convertAndSend(this.destination, message);
	}
}


package com.work.activemq;
/**
 * @author wangmingjie
 * @date 2009-7-26上午11:27:00
 */

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 使用多线程接受消息
 */
public class TopicConsumerA {

	ExecutorService exec = Executors.newFixedThreadPool(10);

	public void receive(final Order message) {
		//System.out.println("线程名称"+Thread.currentThread().getName());
		exec.submit((new Runnable() {
			public void run() {
				//stem.out.println(Thread.currentThread().getName());
				System.out.println("**** Topic A : " + message.getId()+message.getName());
			}
		}));
	}

}


package com.work.activemq;
/**
 * @author wangmingjie
 * @date 2009-7-26上午11:27:05
 */
public class TopicConsumerB {

	public void receive(Order message) {
		System.out.println("**** Topic B : " + message.getId()+message.getName());
	}
}


37 楼 andyao 2009-08-17  
TO: wmj2003

把你的java代码也贴出来看看
36 楼 wmj2003 2009-07-27  
我的消息每次都会重复被接受,这是为什么呢?
结果如下
13:43:43,921 INFO  [STDOUT] ****************** Topic B : ID001BUG管理需求和设计.
doc
13:43:43,921 INFO  [STDOUT] ****************** Topic B : ID001BUG管理需求和设计.
doc
13:43:43,921 INFO  [STDOUT] ************** Topic A : ID001BUG管理需求和设计.doc
13:43:43,921 INFO  [STDOUT] ************** Topic A : ID001BUG管理需求和设计.doc

=====================spring配置文件如下============================
<beans
		xmlns="http://www.springframework.org/schema/beans"
		xmlns:amq="http://activemq.org/config/1.0"
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0-SNAPSHOT.xsd" >
 <!-- 
    	推荐版本,使用spring的listenerContainer,消息用数据库持久化保存,服务器重启不会丢失
     -->

	<!--  embedded ActiveMQ Broker -->
	<amq:broker useJmx="false" persistent="true">
		<amq:persistenceAdapter>
			<amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#dataSource" createTablesOnStartup="true"
										useDatabaseLock="false"/>
			<!-- 
				Mysql can setup useDatabaseLock="true",this is defualt
				HSQLDB,MSSQL plz setup useDatabaseLock="false",
				if u setup useDatabaseLock="true",u will catch error:
				MSSQL Error Info:FOR UPDATE clause allowed only for DECLARE CURSOR 
				HSQLDB Error Info:FOR in statement [SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE]

				see http://www.nabble.com/ActiveMQ-JDBC-Persistence-with-SQL-Server-tf2022248.html#a5560296
			-->
		</amq:persistenceAdapter>
		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:0"/>
		</amq:transportConnectors>
	</amq:broker>
     <!-- 连接外部的activeMQ
	<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
	  ActiveMQ connectionFactory  -->
	  
	<!--  ActiveMQ connectionFactory  连接内部的-->
	<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost"/>

	<!--  ActiveMQ destinations  使用Queue方式 -->
	<amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/>
	<!--  使用topic方式-->
	<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

	<!-- The msSQL Datasource that will be used by the Broker
	<bean id="mssql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
		<property name="driverClassName" value="net.sourceforge.jtds.jdbc.Driver"/>
		<property name="url">
			<value>jdbc:jtds:sqlserver://localhost:1433/wmjqxgl;SelectMethod=cursor;charset=GBK;tds=8.0;lastupdatecount=true</value>
		</property>
		<property name="username" value="sa"/>
		<property name="password" value="sa"/>
		<property name="poolPreparedStatements" value="true"/>
	</bean>	
 	-->
	<!--  Spring JmsTemplate config -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!--  lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<property name="messageConverter" ref="orderMessageConverter"/>
	</bean>

	<!--  OrderMessage converter  -->
	<bean id="orderMessageConverter" class="com.work.activemq.OrderMessageConverter"/>

	<!-- POJO which send Message uses  Spring JmsTemplate -->
	<bean id="orderMessageProducer" class="com.work.activemq.OrderMessageProducer">
		<property name="template" ref="jmsTemplate"/>
		<property name="destination" ref="destination"/>
	</bean>
	<!-- topic 方式信息发送者 -->
	<bean id="topicMessageProducer" class="com.work.activemq.TopicMessageProducer">
		<property name="template" ref="jmsTemplate" />
		<property name="destination" ref="TOPIC" />
	</bean>
		
    <!-- consumer1 for topic a 消息消费者 -->
    <bean id="topicConsumerA" class="com.work.activemq.TopicConsumerA" />

    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.work.activemq.TopicConsumerB" />	
    
    
    <!-- Message Listener for  -->
	<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerA" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="orderMessageConverter" />
	</bean>

	<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="topicConsumerB" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="orderMessageConverter" />
	</bean>
    
	<!--  Message Driven POJO (MDP)  通过queue的方式发送消息,一个发送一个接收-->
	<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg>
			<bean class="com.work.activemq.OrderMessageConsumer">
				<!-- <property name="mailService" ref="mailService"/>  -->
			</bean>
		</constructor-arg>
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="sendEmail"/>
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="orderMessageConverter"/>
	</bean>

	<!--  listener container,MDP无需实现接口 -->
	<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory"/>
		<property name="destination" ref="destination"/>
		<property name="messageListener" ref="messageListener"/>
	</bean>
	
	<!--  listener container,MDP无需实现接口 -->
	<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerA" />
	</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="TOPIC" />
		<property name="messageListener" ref="topicListenerB" />
	</bean>
	
	<bean id="orderNotify" class="com.work.activemq.OrderNotifyImpl">
		<property name="orderMessageProducer" ref="orderMessageProducer" />
		<property name="topicMessageProducer" ref="topicMessageProducer" />
	</bean>
	<!--  -->

	
</beans>

==================
35 楼 chinaway 2009-05-22  
谢谢博主,我的问题解决了,要想在http://127.0.0.1:8161/admin/queues.jsp中看到,必须把<!--  embedded ActiveMQ Broker --> 
    <amq:broker useJmx="false" persistent="true"> 
        <amq:persistenceAdapter> 
            <amq:amqPersistenceAdapter directory="d:/amq"/> 
        </amq:persistenceAdapter> 
        <amq:transportConnectors> 
            <amq:transportConnector uri="tcp://localhost:61616" /> 
                       <amq:transportConnector uri="vm://localhost:0" /> 
        </amq:transportConnectors> 
    </amq:broker>
这一段中的<amq:transportConnectors> 
            <amq:transportConnector uri="tcp://localhost:61616" /> 
                       <amq:transportConnector uri="vm://localhost:0" /> 
        </amq:transportConnectors>
注释掉,即使用外部的ActiveMQ服务
34 楼 chinaway 2009-05-22  
谢谢博主,可能是我运行的有问题,或者对ActiveMQ理解不够,只能看到消息发送、接收在控制台的输出,如果不单独启动ActiveMQ,连http://localhost:8161/admin都无法访问。
我的理解是Spring整合ActiveMQ后,运行TestMain后,ActiveMQ的broker会自动启动,但是为什么不能通过http://localhost:8161/admin访问。
33 楼 andyao 2009-05-21  
chinaway 写道
博主你好,
我怎么在http://127.0.0.1:8161/admin/topics.jsp中看不到JMS-TEST-TOPIC消息传递的效果呢,消息数量和名称等都看不到
附加本例子所需要的jar包
activemq-all-5.2.0.jar
commons-logging-1.1.jar
log4j-1.2.14.jar
spring2.5.5.jar
xbean-spring-2.8.jar(2.8以上)
jdk
jee

这些jar包都可以在activemq的解压包中找到


给个贴图看看
32 楼 chinaway 2009-05-20  
博主你好,
我怎么在http://127.0.0.1:8161/admin/topics.jsp中看不到JMS-TEST-TOPIC消息传递的效果呢,消息数量和名称等都看不到
附加本例子所需要的jar包
activemq-all-5.2.0.jar
commons-logging-1.1.jar
log4j-1.2.14.jar
spring2.5.5.jar
xbean-spring-2.8.jar(2.8以上)
jdk
jee

这些jar包都可以在activemq的解压包中找到
31 楼 creativity 2009-02-05  
谢谢楼主,已解决,原因是我的TestBean忘了序列化接口(我用了defaultMessageConvertor),在我加上LOG后才发现问题,实现了序列化接口就好了,谢谢!!
30 楼 andyao 2009-02-05  
http://127.0.0.1:8161/admin/这个界面中你可以查看queue和topic的信息列表

Name  topic或者queue的名字 
Number Of Pending Messages   pend消息的数目
Number Of Consumers   消费者的数目
Messages Sent   发送的消息数目
Messages Received  接收的消息数目

你要看有多少消息被接受了, 看Messages Received  条目
29 楼 creativity 2009-02-03  
楼主你好,我看了你的例子,完全可以跑通,我改了下,现有A,B2个web应用系统,都起在一个web服务器(tomcat)下,A应用作为producer,B应用consumer,现在可以看到A应用发的消息,但是在B应用怎么看到已经得到或没有得到这个消息呢?有些不明白怎么处理,我不知道我这样写对不对,请指教,配置如下:
A应用:applicationContext-activemq.xml

<amq:broker useJmx="false" persistent="true">
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="c:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>

<!-- 连接外部的activeMQ -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />

<!--  ActiveMQ destinations  -->
<!--  使用topic方式-->
<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!--  lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!-- converter  -->
<bean id="defaultMessageConverter" class="com.me.jms.DefaultMessageConverter" />

<!-- POJO which send Message uses  Spring JmsTemplate -->
<bean id="topicMessageProducer" class="com.me.jms.TopicMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="TOPIC" />
</bean>
A应用producer
public class TopicMessageProducer {
   
    private JmsTemplate template;

private Topic destination;

public void setTemplate(JmsTemplate template) {
this.template = template;
}

public void setDestination(Topic destination) {
this.destination = destination;
}

public void send(TestBean message) {
System.out.println("=====> message send! ");
template.convertAndSend(this.destination, message);
}
}
A应用中的Action
public class TestAction extends ActionSupport{

private TestBean tb;
private TopicMessageProducer topicMessageProducer;

public String test(){
System.out.println("===> username:" + tb.getUserName());
System.out.println("===> username:" + tb.getPassWord());
topicMessageProducer.send(tb);

return "success";
}
...geter(),seter();
}

B应用applicationContext-activemq.xml
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://localhost:61616" />

<amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />

<!-- converter  -->
<bean id="defaultMessageConverter"
class="com.me.jms.DefaultMessageConverter" />

<!--  Message Driven POJO (MDP) -->
    <!-- consumer1 for topic a -->
    <bean id="topicConsumerA" class="com.me.jms.TopicConsumerA" />

    <!-- consumer2 for topic a -->
    <bean id="topicConsumerB" class="com.me.jms.TopicConsumerB" />
   
    <!-- Message Listener for  -->
<bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerB" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!--  listener container,MDP无需实现接口 -->
<bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerA" />
</bean>

    <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="TOPIC" />
<property name="messageListener" ref="topicListenerB" />
</bean>
B应用的consumerA和consumerB都一样
public class TopicConsumerA {
public void receive(TestBean message) {
System.out.println("************************************** Topic A userName: " + message.getUserName());
System.out.println("************************************** Topic A passWord: " + message.getPassWord());
}
}
目前是在tomcat控制台和http://127.0.0.1:8161/admin/中可以看消息已发送了,但是在tomcat控制台中和http://127.0.0.1:8161/admin/上看不到是否已经接受到了消息,我该如何查看或修改,谢谢!!!
28 楼 cys6736873 2008-10-31  
楼主能否给个 文档 链接或者下载,感激不尽啊
27 楼 cys6736873 2008-10-30  
andyao 写道
philyes 写道
A应用
<amq:broker useJmx="false" persistent="true">

<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="d:/amq"/>
</amq:persistenceAdapter>

<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0" />
</amq:transportConnectors>
</amq:broker>

  
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!--  lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

<!-- converter  -->
<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />

<bean id="queueMessageProducer" class="com.andyao.activemq.QueueMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="QUEUE" />
</bean>
B应用
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

<amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />
 
 
<bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />
    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<!--  may be other method -->
<property name="defaultListenerMethod" value="receive" />
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="defaultMessageConverter" />
</bean>

   <!-- converter -->
<bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />
   
   
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="QUEUE" />
<property name="messageListener" ref="queueListener" />
</bean>
启动tomcat时,出现如下信息
信息: ActiveMQ JMS Message Broker (localhost, ID:6b184e6c407c476-1632-1221635612890-0:0) started(此应是是A应用 brokerID)
2008-9-17 15:13:34 org.springframework.web.context.ContextLoader initWebApplicationContext
......
信息: ActiveMQ JMS Message Broker (localhost, ID:6b184e6c407c476-1642-1221635632312-0:0) started
2008-9-17 15:13:53 org.apache.activemq.broker.TransportConnector start
信息: Connector vm://localhost Started(此应是是B应用 brokerID)
.......
我在通过页面得提交给一个servlet一个信息,如123,在servlet调用如下代码
String text = request.getParameter("text");
FooMessage foo = new FooMessage();
int msg = Integer.parseInt(text);
foo.setId(msg);
System.out.println("********start send*********");
WebApplicationContext   ctx=WebApplicationContextUtils.getRequiredWebApplicationContext(this.getServletContext());
QueueMessageProducer sender   =   (QueueMessageProducer)ctx.getBean( "queueMessageProducer");
sender.send(foo);

System.out.println("******end send ***********");
结果显示
********start send*********
queue send start
2008-9-17 15:14:16 org.apache.activemq.broker.TransportConnector start
信息: Connector vm://localhost Started
2008-9-17 15:14:16 org.springframework.jms.connection.SingleConnectionFactory initConnection
信息: Established shared JMS Connection: ActiveMQConnection {id=ID:6b184e6c407c476-1632-1221635612890-2:0,clientId=null,started=false}
queue send end
******end send ***********
B应用没有接收到,这是为什么,B应用的配置对吗,对的话为什么broker是两个不同的id,又为什么收不到A的消息呢,恳请楼主指点


A,B没有链接同一个activeMq

A 中改为
<amq:transportConnector uri="tcp://localhost:61616" />


B 中
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />

改为
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://localhost:61616" />



前面你也说到
transportConnector是定義鏈接activeMQ的brokerURL. 你可以為一個activeMQ server指定多個transportconnector, 比如vm, tcp, ssl, stomp, xmpp等.
那我A多定义几个,B是不是也有不同的brokerURL?那他们又是怎么链接同一个activeMq?

相关推荐

Global site tag (gtag.js) - Google Analytics