`
meiyx
  • 浏览: 181345 次
  • 性别: Icon_minigender_2
  • 来自: 北京
社区版块
存档分类
最新评论

Activemq 结合Spring应用2

阅读更多
之前虽然谢过类似的代码,但是脑子里还有那么一点不清楚,主要消费者生产者的感念,今天工作不忙,再次复习,那个点还是没过去,请教同事一点通了,呵呵!以前的理解还是有误差的,不过发表的文章没问题!
消费者:
activemq-consumer.xml配置如下:
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

	xsi:schemaLocation="http://www.springframework.org/schema/beans    
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           
           http://activemq.apache.org/schema/core            		   
 		   http://activemq.apache.org/schema/core/activemq-core.xsd">

	<bean id="queueJmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0" />
		<property name="useAsyncSend" value="true" />
	</bean>	
	
	<!--  ActiveMQ destinations  -->

	<!--  使用Queue方式-->
	<amq:queue name="QUEUE0" physicalName="JMS-TEST-QUEUE0" />
	<amq:queue name="QUEUE1" physicalName="JMS-TEST-QUEUE1" />
	<amq:queue name="QUEUE2" physicalName="JMS-TEST-QUEUE2" />

	<!-- converter  -->
	<bean id="defaultMessageConverter" class="com.activemq.common.DefaultMessageConverter" />

	<!--  Message Driven POJO (MDP) -->
	<!-- consumer for queue -->
	<bean id="queueConsumer0" class="com.activemq.consumer.QueueConsumer" scope="prototype"/>
	<bean id="queueConsumer1" class="com.activemq.consumer.QueueConsumer" scope="prototype"/>
	<bean id="queueConsumer2" class="com.activemq.consumer.QueueConsumer2" scope="prototype"/>

	<!-- Message Listener for  -->
	<bean id="queueListener0"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="queueConsumer0" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>
	<bean id="queueListener1"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="queueConsumer1" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>
	<bean id="queueListener2"
		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
		<constructor-arg ref="queueConsumer2" />
		<!--  may be other method -->
		<property name="defaultListenerMethod" value="receive" />
		<!-- custom MessageConverter define -->
		<property name="messageConverter" ref="defaultMessageConverter" />
	</bean>
	
	<!--  listener container,MDP无需实现接口 -->
	<bean id="queueListenerContainer0"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="queueJmsConnectionFactory" />
		<property name="destination" ref="QUEUE0" />
		<property name="messageListener" ref="queueListener0" />
		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
		<property name="receiveTimeout" value="20000" />
	</bean>
	<bean id="queueListenerContainer1"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="queueJmsConnectionFactory" />
		<property name="destination" ref="QUEUE1" />
		<property name="messageListener" ref="queueListener1" />
		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
		<property name="receiveTimeout" value="20000" />
	</bean>
	<bean id="queueListenerContainer2"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="queueJmsConnectionFactory" />
		<property name="destination" ref="QUEUE2" />
		<property name="messageListener" ref="queueListener2" />
		<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE" />
		<property name="receiveTimeout" value="20000" />
	</bean>
</beans>
相应的QueueConsumer类代码(自定义的)
package com.activemq.consumer;



import org.apache.log4j.Logger;

import com.activemq.common.FooMessage;

/**
 * Date: 2008-8-28
 * Time: 17:10:34
 */
public class QueueConsumer {

	int i;
	
	Logger logger=Logger.getLogger(QueueConsumer.class);
	
    public void receive(FooMessage message) {
    	i++;    	
		System.out.println("*************************************** Queue : " + message.getId()+","+System.currentTimeMillis());
    	logger.debug("Queue: " + message.getId()+"-------count"+i+"------------------------------------------------>"+System.currentTimeMillis());
    	/*try {
			Thread.sleep(60000);
		} catch (InterruptedException e) {
			logger.error("time out..............................................");
			e.printStackTrace();
		}*/
	}
    
}

ok接着就可以测试了,测试类:
package com.activemq.consumer;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerTestMain {
	public static void main(String[] args) {
		BeanFactory bf = new ClassPathXmlApplicationContext("classpath:activemq-consumer.xml");
	}
}



查看消息的发送情况
启动activemq服务器,打开http://localhost:8161/admin/选择QUEUE查看
你会发现我们启动了1个消费者,再次运行测试类消费者变为2,然后群殴们启动生成者



生产者的相关配置
activemq-producer.xml
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

	xsi:schemaLocation="http://www.springframework.org/schema/beans    
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           
           http://activemq.apache.org/schema/core            		   
 		   http://activemq.apache.org/schema/core/activemq-core.xsd">	
		
	<bean id="queueJmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616?wireFormat.maxInactivityDuration=0"/>
		<property name="useAsyncSend" value="true"/>	
	</bean>	

	<!--  ActiveMQ destinations  -->

	<!--  使用Queue方式-->
	<amq:queue name="QUEUE0" physicalName="JMS-TEST-QUEUE0" />
	<amq:queue name="QUEUE1" physicalName="JMS-TEST-QUEUE1" />
	<amq:queue name="QUEUE2" physicalName="JMS-TEST-QUEUE2" />	

	<!--  Spring JmsTemplate config -->
	<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<!--  lets wrap in a pool to avoid creating a connection per send -->
			<bean class="org.springframework.jms.connection.SingleConnectionFactory">
				<property name="targetConnectionFactory" ref="queueJmsConnectionFactory" />
			</bean>
		</property>
		<!-- custom MessageConverter -->
		<property name="messageConverter" ref="defaultMessageConverter" />
		<property name="receiveTimeout" value="20000"/>
	</bean>		
	
	<!-- converter  -->
	<bean id="defaultMessageConverter" class="com.activemq.common.DefaultMessageConverter" />	

	<bean id="queueMessageProducer0" class="com.activemq.producer.QueueMessageProducer">
		<property name="template" ref="queueJmsTemplate" />
		<property name="destination" ref="QUEUE0" />
	</bean>
	<bean id="queueMessageProducer1" class="com.activemq.producer.QueueMessageProducer">
		<property name="template" ref="queueJmsTemplate" />
		<property name="destination" ref="QUEUE1" />
	</bean>
	<bean id="queueMessageProducer2" class="com.activemq.producer.QueueMessageProducer">
		<property name="template" ref="queueJmsTemplate" />
		<property name="destination" ref="QUEUE2" />
	</bean>	
</beans>

生成者类:QueueMessageProducer
package com.activemq.producer;


import javax.jms.Queue;

import org.springframework.jms.core.JmsTemplate;

import com.activemq.common.FooMessage;

public class QueueMessageProducer extends Thread {

	private Queue destination;

	private String qmid;

	private JmsTemplate template;

	public Queue getDestination() {
		return destination;
	}

	public String getQmid() {
		return qmid;
	}

	public JmsTemplate getTemplate() {
		return template;
	}

	@Override
	public void run() {
		for (int i = 0; i < 1000000; i++) {
			FooMessage fMessage = new FooMessage();
			fMessage.setId(this.getQmid() + "-" + i);
			this.send(fMessage);
		}
	}

	public void send(FooMessage message) {
		this.getTemplate().convertAndSend(this.getDestination(), message);
	}

	public void setDestination(Queue destination) {
		this.destination = destination;
	}

	public void setQmid(String qmid) {
		this.qmid = qmid;
	}

	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}
}
测试类:
package com.activemq.producer;


import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ProducerTestMain {
	public static void main(String[] args) {
		BeanFactory bf = new ClassPathXmlApplicationContext("classpath:activemq-producer.xml");
		QueueMessageProducer[] qms={
				(QueueMessageProducer)bf.getBean("queueMessageProducer0"),
				(QueueMessageProducer)bf.getBean("queueMessageProducer1"),
				(QueueMessageProducer)bf.getBean("queueMessageProducer2")};
		for (int i = 0; i < qms.length; i++) {			
			qms[i].setQmid(i+"");
			qms[i].start();
		}
	}
}

启动测试类把你的生产者类中的run方法的循环次数的数字写大些比如向我这里一样写成10万,对应的你启动的activemq服务器你就明白了
不明白的可以喝我沟通,我们共同学习,以防我们学习过程中的不足
另外我要向大家说下我的误解:
<!--  使用Queue方式-->  
19.    <amq:queue name="QUEUE0" physicalName="JMS-TEST-QUEUE0" />  
20.    <amq:queue name="QUEUE1" physicalName="JMS-TEST-QUEUE1" />  
21.    <amq:queue name="QUEUE2" physicalName="JMS-TEST-QUEUE2" />       一直以来我误以为这段代码我们创建了三个消费者其实是消费者的三个消费管道,消费者要通过这样的管道接受消费者发送的消息!
分享到:
评论

相关推荐

    ActiveMQ+Spring完整详解例子

    ActiveMq流程并结合Spring应用

    实战ActiveMQ集群与应用视频教程.zip

    网盘文件永久链接 1:ActiveMQ入门和消息中间件 ...6:ActiveMQ结合Spring开发 7:ActiveMQ支持的传输协议 8:ActiveMQ消息存储持久化 9:ActiveMQ的静态网络链接 10:多线程consumer访问集群 ..........

    Spring ActiveMQ安装、配置、打包服务及实例

    ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,...附件资料主要含:ActiveMQ包,安装配置文档,将服务端做成服务文档及与spring框架结合的导入即可运行实例!

    MQ之ActiveMQ.mmap

    从零基础入门到熟练掌握ActiveMQ,能够结合Spring/SpringBoot进行实际开发配置并能够 进行MQ多节点集群的部署,最后学习MQ的高级特性和高频面试题的分析。 希望通过本次的学习, 能够帮助同学们取得更大的进步,加油...

    消息中间件之ActiveMQ视频课程

    本套视频以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,带你从零基础入门到熟练掌握ActiveMQ,能够结合Spring/SpringBoot进行实际开发配置并能够进行MQ多节点集群的部署,可以学习到MQ的高级...

    spring 实践学习案例

    spring 实践学习案例,与其它组件结合如 mybatis、jpa、dubbo、redis、mongodb、memcached、kafka、rabbitmq、activemq、shiro等 #### 软件架构 基于Java 1.8 #### Spring 版本 - 5.0.8.RELEASE #### 模块说明 - ...

    消息队列springmq

    spring mvc结合tomcat月Apache的消息队列ActiveMQ应用

    ActiveMQ.rar

    包括:多种启动Broker的方法、单独应用的开发、结合Spring的开发等 n 五:ActiveMQ的Transport 包括:多种传输协议的功能、配置和使用 六: ActiveMQ的消息存储 包括:队列和topic、KahaDB、AMQ、JDBC、MMS等 n 七:...

    spring boot 实践学习案例,与其它组件整合

    spring boot 实践学习案例,与其它组件结合如 mybatis、jpa、dubbo、redis、mongodb、memcached、kafka、rabbitmq、activemq、elasticsearch、security、shiro等 #### Spring Boot 版本 - 2.0.3.RELEASE #### 模块...

    P2P网络借贷平台项目SSH+Redis+ActiveMQ+POI+Shiro+AngularJS+Nginx+Quartz等

    3、该项目采用了struts2 hibernate spring和 spring data jpa 开源框架完成,并融入了cxf开源webservice框架的应用,而这些技术都是当下流行的技术。 4、在缓存方面运用了互联网的流行技术redis实现缓存存贮,...

    ActiveMQ入门教程

    摘要:本文主要讲述ActiveMQ的基本知识和使用方法,并简单结合spring使用ActiveMQ。 企业消息软件从80年代起就存在,它不只是一种应用间消息传递风格,也是一种集成风格。因此,消息传递可以满足应用间的通知和互相...

    springCloud

    ActiveMQ Kafka RabbitMQ RocketMQ 目前Spring Cloud Bus 支持 RabbitMQ 和 Kafka,spring-cloud-starter-bus-amqp 、spring-cloud-starter-bus-kafka RabbitMQ简介 RabbitMQ是一个开源的AMQP实现,服务器端用...

    realtime-messaging:使用 STOMP over WebSockets 在 Spring Boot 上实时传输数据

    在 Spring Boot 上使用 STOMP Over WebSockets 的实时私人消息演示该项目演示了在 Spring Boot 上使用 over WebSockets 结合和向客户端实时发送数据。 有关更详细的解释,请访问博客页面先决条件跑步安装并运行您的 ...

    JMS与Spring之二(用message listener container异步收发消息)

    SimpleMessageListenerContainer 则适用于简单的消息传递应用,不需要基于可变负载的线程、会话、连接调整。 在配置 message listener container 时,我们可以使用 JNDI 访问连接工厂和 JMS Destinations,也可以...

    Java消息服务(第2版)

    · 使用JMS时结合RESTful应用程序和Spring应用程序框架 消息传递是一个强大的范例,可以更容易地使企业应用程序的不同部分分离。《Java消息服务》第二版将迅速教会你如何运用其背后的关键技术。

    java文集

    Java 6 RowSet 使用完全剖析 结合Spring2.0和ActiveMQ进行异步消息调用 struts+hibernate增删改查(一) AXIS 布署问题 struts+hibernate增删改查(二) MySQL中如何实现Top N及M至N段的记录查询?...

    网络架构师148讲视频课程

    │ 第76节:ActiveMQ结合Spring开发.avi │ 第77节:ActiveMQ支持的传输协议.avi │ 第78节:ActiveMQ消息存储持久化.avi │ 第79节:ActiveMQ的静态网络链接.avi │ 第80节:多线程consumer访问集群.avi │ 第81节...

    java开源包2

    GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java.applet....

    java开源包1

    GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java.applet....

    java开源包11

    GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine extends java.applet....

Global site tag (gtag.js) - Google Analytics