通过Spring对ActiveMQ进行配置开发,发布订阅模式,支持消息的持久化。
需要Spring2.5版本以上,如果有多个订阅者,每个订阅者需要指定不同的 clientId 。
发布者的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <!-- 配置JMS连接工厂 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> <!-- 是否异步发送 --> <property name="useAsyncSend" value="true" /> </bean> </property> </bean> <!-- 发送消息的目的地(一个主题) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息主题的名字 --> <constructor-arg index="0" value="Online.Notice.Topic" /> </bean> <!-- 配置JMS模版 --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="defaultDestination" ref="myDestination" /> <!-- 订阅发布模式 --> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
发布者的代码:
package com.xikang.jms; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class SimpleJMSSender { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-send.xml"); JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("myJmsTemplate"); for (int i = 0; i < 10; i++) { jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // 设置消息属性 msg.setStringProperty("phrCode", "C001"); // 设置消息内容 msg.setText("Hello World!"); return msg; } }); } } }
订阅者的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <!-- 配置JMS连接工厂 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID --> <property name="clientId" value="client_119" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> </bean> <!-- 发送消息的目的地(一个主题) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息主题的名字 --> <constructor-arg index="0" value="Online.Notice.Topic" /> </bean> <!-- 生产消息配置 (自己定义)--> <bean id="myTopicConsumer" class="com.xikang.jms.SimpleJMSReceiver" /> <!-- 消息监听器 --> <bean id="myTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="myTopicConsumer" /> <!-- 接收消息的方法名称 --> <property name="defaultListenerMethod" value="receive" /> <!-- 不进行消息转换 --> <property name="messageConverter"><null/></property> </bean> <!-- 消息监听容器 --> <bean id="myListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <!-- 发布订阅模式 --> <property name="pubSubDomain" value="true"/> <!-- 消息持久化 --> <property name="subscriptionDurable" value="true"/> <property name="receiveTimeout" value="10000"/> <!-- 接收者ID --> <property name="clientId" value="client_119" /> <property name="durableSubscriptionName" value="client_119"/> <property name="destination" ref="myDestination" /> <property name="messageListener" ref="myTopicListener" /> </bean> </beans>
订阅者的代码:
package com.xikang.jms; import javax.jms.JMSException; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.JmsException; public class SimpleJMSReceiver { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-receive.xml"); while(true) { } } public void receive(TextMessage message) throws JmsException, JMSException { System.out.println(message.getStringProperty("phrCode")); System.out.println(message.getText()); } }
相关推荐
activemqactivemq示例代码:使用spring配置activemq的发布订阅模式。
分别实现生产者-消费者模式和发布-订阅模式,作为java编程发送消息和消费消息的基础示例。 源码主要包含如下内容: 1.spring boot配置初始化activeMQ 2.队列类型queue,生产者发送队列消息,以及消费者消费相关队列...
同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/ 2. 什么是JMS JMS的全称是Java ...
Spring Boot Artemis集群主题一个示例项目,通过集群模式下的topic (发布-订阅),通过Apache ActiveMQ Artemis 2.4.0演示了两个Spring Boot应用程序生产者和使用者之间的异步通信。介绍Apache ActiveMQ Artemis是...
004-p2p模式+pulish-subscribe发布订阅模式+与spring集成;005-集群部署1;006-集群部署2;007-集群部署3;activemq集群配置文档.pdf;ActiveMQ(中文)参考手册.doc;ActiveMQ集群:网络连接模式(network connector)...
springboot整合activemq完整示例,以及mq界面简单配置。涵点对点消息模式、Top订阅模式
PublisherSubscriber(发布订阅者)消息模式开发流程 详细讲解企业项目中ActiveMQ使用经验 ActiveMQ与Tomcat整合 分布式ActiveMQ集群开发详解 ActiveMQ集群配置方法 在Spring下集成ActiveMQ ActiveMQ中间件实现实时...
ActiveMQ Kafka RabbitMQ RocketMQ 目前Spring Cloud Bus 支持 RabbitMQ 和 Kafka,spring-cloud-starter-bus-amqp 、spring-cloud-starter-bus-kafka RabbitMQ简介 RabbitMQ是一个开源的AMQP实现,服务器端用...
ActiveMQ基于Spring完成分布式消息队列实战 Kafka Kafka基于Zookeeper搭建高可用集群实战 kafka消息处理过程剖析 Java客户端实现Kafka生产者与消费者实例 kafka的副本机制及选举原理剖析 基于kafka实现应用...