最近需要用到activemq的topic发布订阅功能
activemq可以到官网下载,下载完成后启动很简单,bin/activemq start
到时候可以访问管理控制台,密码是admin/admin,
http://192.168.91.128:8161/admin/topics.jsp, 128是我的部署activemq的ip地址,用的是redhat6.4
mq broker搭建好之后,我们首先需要开发publish程序,这里我们通过tomcat的jndi来完成,
新建一个web项目,然后META-INF里面创建context.xml,内容如下,
<Context antiJARLocking="true"> <Resource name="jms/ConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://192.168.91.128:61616" brokerName="LocalActiveMQBroker" useEmbeddedBroker="false"/> <Resource name="jms/topic/MyTopic" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MY.TEST.FOO"/> </Context>
完成之后写个servlet来获取jnid中配置的factory和topic信息,代码如下
InitialContext initCtx = new InitialContext(); Context envContext = (Context) initCtx.lookup("java:comp/env"); ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/ConnectionFactory"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer((Destination) envContext.lookup("jms/topic/MyTopic")); Message testMessage = session.createMessage(); testMessage.setStringProperty("testKey", "testValue111"); producer.send(testMessage);
deploy应用之前,需要将activemq-all-5.9.0.jar放入到tomcat/lib下面,启动tomcat,接着部署刚才的 servlet应用,启动tomcat后topic不会被自动创建,然后可以访问这个servlet,会发现topic会被创建,可以观察控制台
接着是消费者subscriber,
下面是普通的main方法来消费topic
package com.activemqtest; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Reciever { protected Topic queue; protected String queueName = "MY.TEST.FOO"; protected String url = "tcp://192.168.91.128:61616"; protected int ackMode = Session.AUTO_ACKNOWLEDGE; public static void main(String[] args) { Reciever rec = new Reciever(); try { rec.run(); } catch (Exception e) { e.printStackTrace(); } } public void run() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url); TopicConnection connection = (TopicConnection) connectionFactory .createTopicConnection(); connection.start(); MessageConsumer consumer = null; Session session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createTopic(queueName); consumer = session.createConsumer(queue); System.out.println(" Waiting for message (max 5) "); for (int i = 0; i < 3; i++) { Message message = consumer.receive(); processMessage(message); } System.out.println("Closing connection"); consumer.close(); session.close(); connection.close(); } public void processMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println(message.getStringProperty("testKey")); } } catch (Exception e) { e.printStackTrace(); } } }
总结:
Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
可以发现topic的消费者需要active,你不能等到publish完成之后,你启动subscriber,这样是接受不到消息的,但是queue的话不一样,不一定active才可以接受消息
另外有个问题是,tomcat只是一个servlet容器,并没有实现很多的j2ee标准比如jms,所以里面是没有javax.jms下面这些package的,需要单独下载J2ee.jar
相关推荐
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
ActiveMQ环境搭建及实例详解的源码 ActiveMQ环境搭建及实例详解的源码
欢迎下载ActiveMQ Topic 实例!
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
message queue,也就是消息队列,也就是处理消息的,也就是处理JMS的,本文介绍了在Linux环境搭建activeMQ的过程。
Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明
在网上找了很多的topic持久化的Demo做了很多的测试,现把熟肉呈上。
activemq 虚拟topic配置,可以将一个 topic转发为多个队列和多个topic或者将一个队列转发为多个topic和多个队列
window搭建activeMQ集群(linux系统搭建集群的方式和window的一样),还有自己写的搭建集群的文档和我自己亲手搭建的一个三个mq集群
本书适合入门学习,所有...2.Eclipse开发环境安装搭建 3.用Bundle对应用组件模块化 4.Camel的应用和例子 Bundle之间的消息传递, Errorhandling 负载均衡 。。。。。。 5.Camel利用ActiveMQ来实现对消息的机制的支撑
NULL 博文链接:https://768992698.iteye.com/blog/2343328
ActiveMQ-Topic订阅发布模式:参考博文:http://blog.csdn.net/ABAP_Brave/article/details/71211334
windows搭建activemq单机版,里面包含有mq文件,只需按照压缩包文档的介绍进行操作即可。
activemq的topic队列模式的maven,spring的demo
Spring 和 activemq 搭建JMS开发系统示例,学习此示例可以让你快速了解JMS,这种方式对服务器没有依赖。
apache-activemq-5.13.0-bin.tar.gz,处理分布式事务
activemq5.8版本搭建master/slave架构所需要的nfsv4的指导文档
文章目录ActiveMQ简介1、ActiveMQ简介2、ActiveMQ下载SpringBoot整合ActiveMQ1、新建SpringBoot项目2、项目结构3、相关配置信息4、ActiveMQ配置类Queue队列模式1、队列生产者2、队列消费者3、测试效果Topic模式1、...
基于zookeeper+activemq+leveldb搭建 高性能 高可用的消息队列
SpringBoot+ActiveMQ-发布订阅 Topic 主题 * 消息消费者(订阅方式)消费该消息 * 消费生产者将发布到topic中,同时有多个消息消费者(订阅)消费该消息 * 这种方式和点对点方式不同,发布到topic的消息会被所有...