本篇主要演示ActiveMQ中的Topic使用
在开始演示前先引入一些概念:
Q:什么是Topic?
A:Topic就是SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。值得注意的是,此模式下,仅对有效的出口即时发送消息,如publisher发布消息Message A到A,B服务器,此时B服务器服务未开启,则仅有A服务器收到消息Message A,事后B服务器开启但乃收不到消息Message A。此时,publisher再次发布消息Message B时,A,B服务器都能收到Message B。
Q:ActiveMQ 支持哪些消息确认模式?
A:下面的表格来自《ActiveMQ in Action》原版CHAPTER 13 Tuning ActiveMQ for performance 内的节选,本人翻译的如有偏差请指出,括号内的是我个人的一些理解仅供参考。
确认模式 |
发送确认 |
描述 |
Session.AUTO_ACKNOWLEDGE |
每条消息消耗时自动发送一条应答消息到ActiveMQ代理。 |
很慢,但常常作为消息消费者的默认的处理机制。 |
Session.DUPS_OK_ACKNOWLEDGE |
允许消费者发送一条消息消耗应答消息到ActiveMQ代理 |
当达到预设限制的50%,一条确认消息将会被发回,最快的消费消息的标准方式。(最快往往意味着你要处理更多的东西,比如侦测和丢弃重发的消息) |
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE |
为每条消息消费发送一条确认消息。 |
准许主控制单独确认已授权的消息,但这会很慢。 |
optimizeAcknowledge |
准许消费者发送一条范围内的确认消息到ActiveMQ代理来确认消息消费。 |
结合Session.AUTO_ACKNOWLEDGE 一条消费消息将会在目标缓冲到65%时发送确认信息到ActiveMQ代理。这是最快的消息消费方式。(虽然作者很推崇,但我认为和上面一样存在一些问题需要处理。) |
下面首先是Publisher(发布者)的例子:
public class Publisher { public static int count = 10;// 每次生成量 private static int total = 0;// 总发送次数 private static String brokerURL = "tcp://localhost:61616";// MQ 接口地址 private static transient ConnectionFactory factory;// JMX连接工厂 private transient Connection connection;// JMX连接 private transient Session session;// JMX会话 private transient MessageProducer producer;// 消息生产这 /** * 发布者构造方法 * * @throws JMSException */ public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } // 送连接获得会话,获得的会话消息确认模式详见上面介绍 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 从会话中获得生产者,由于是Topic,仅用于发布消息并未指定目标,所以为null producer = session.createProducer(null); } /** * 关闭连接 * * @throws JMSException */ public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); String[] a = new String[] { "test" };//生产出来的消息,放到哪个库存里 while (total < 100) { for (int i = 0; i < count; i++) { publisher.sendMessage(a); } total += count; System.out.println("生产了:" + count + ",总共生产了:" + total); try { Thread.sleep(1000); } catch (InterruptedException x) { } } publisher.close(); } /** * 消息发送 * * @param stocks * @throws JMSException */ protected void sendMessage(String[] stocks) throws JMSException { for (String string : stocks) { Destination destination = session.createTopic("STOCKS." + string); Message message = createStockMessage(string, session);//构造消息 System.out.println("发送: " + ((ActiveMQMapMessage) message).getContentMap() + " ,目标为: " + destination); producer.send(destination, message);//将消息发送给目标 } } /** * 消息发送模版 * * @param stock * @param session * @return * @throws JMSException */ protected Message createStockMessage(String stock, Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("stock", stock); message.setString("name", "商品"); message.setDouble("price", 100.00); message.setDouble("offer", 50.00); message.setBoolean("promotion", false); return message; } }
接着是Consumer
public class Consumer { private static String brokerURL = "tcp://localhost:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void close() throws JMSException { if (connection != null) { connection.close(); } } public static void main(String[] args) throws JMSException { Consumer consumer = new Consumer(); String[] stocks=new String[]{"test"};//数据仓库名 for (String stock : stocks) { Destination destination = consumer.getSession().createTopic("STOCKS." + stock); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new MyListener()); } } public Session getSession() { return session; } }
最后是一个消息监听器,十分简单
public class MyListener implements MessageListener { public static int a=0; @Override public void onMessage(Message message) { System.out.println("进入监听"); try { MapMessage map = (MapMessage) message; String stock = map.getString("stock"); String name = map.getString("name"); double price = map.getDouble("price"); double offer = map.getDouble("offer"); boolean promotion = map.getBoolean("promotion"); DecimalFormat df = new DecimalFormat("#,###,###,##0.00"); System.out.println("仓库号:"+stock + ",商品名:"+name+",销售价格:" + df.format(price) + ",供应价格:" + df.format(offer) + ",是否促销:" + (promotion ? "是" : "否")+",共获得"+ ++a); } catch (Exception e) { e.printStackTrace(); } } }
使用方法:(不太喜欢传图片,自己执行代码看结果吧)
启动2个Consumer,成功启动后,启动publisher,可以看到2个Consumer正常打印了publisher传过来的所有数据和条数一一对应。
启动1个Consumer,成功启动后启动publisher,在publisher未生产完所有数据前,启动另一个Consumer,可以看到先启动的一个正常接收了所有数据,另一个只接收了他启动后publisher生产的数据。
相关推荐
一个小例子 关于activeMq的三种收发消息的方法
可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
本次以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,将带着同学们 从零基础入门到熟练掌握ActiveMQ,能够结合Spring/SpringBoot进行实际开发配置并能够 进行MQ多节点集群的部署,最后学习MQ的...
一、JMS基本概念 二、activemq介绍及安装 ...三、activemq简单实例 四、activemq整合spring运用 五、activemq常见问题 5.1 activemq 消息传递 5.2 activemq 消息确认机制 5.3 activemq 持久化机制
java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)
window搭建activeMQ集群(linux系统搭建集群的方式和window的一样),还有自己写的搭建集群的文档和我自己亲手搭建的一个三个mq集群
activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子activeMq的一个小例子...
本套视频以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,带你从零基础入门到熟练掌握ActiveMQ,能够结合Spring/SpringBoot进行实际开发配置并能够进行MQ多节点集群的部署,可以学习到MQ的高级...
中间件技术 实验三 消息中间件应用开发: - CSDN博客 https://blog.csdn.net/lly1122334/article/details/80139790
JMS-activemq 实例(分ppt,eclipse工程,说明三部分) 特别有readme说明,一看就会用 spring实现方式,可运行有jar包
ActiveMQ linux安装包
activeMQ集成SpringMVC,三种方式监听
ACTIVEMQ实战部分翻译,只翻译了第一章到第四章的第三节
ActiveMQ window安装包
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
Apache+ActiveMQ教程.pdf JMS规范教程.pdf JMS简明教程.pdf 三份教程
ActiveMQ_实践之路(一)ActiveMQ_实践之路(二)ActiveMQ_实践之路(三)ActiveMQ_实践之路(四)
中间件技术 实验三 消息中间件应用开发: - CSDN博客 https://blog.csdn.net/lly1122334/article/details/80139790
一: ActiveMQ简介 包括:是什么、能干什么、特点;消息中间件的功能、特点、应用场景等 n 二: ActiveMQ安装和基本使用 包括:通过源码安装、基本的配置示例、启动、测试运行、关闭等 n 三:理解和掌握JMS 包括:...