ActiveMQ入门示例
ActiveMQ有两种模式,点对点和发布/订阅模式,点对点中消息只能被一个消费者消费,而发布订阅中,消息可以被一群消费者消费,很好理解。下面的例子是点对点的
安装ActiveMQ很简单就不说了,客户端使用API只需添加以下依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
代码:
Sender.java:
package cc.lixiaohui.test.jms.activemq.p2p; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import cc.lixiaohui.test.jms.activemq.Constants; public class Sender { //发送间隔 private long interval = 1 * 1000; private ConnectionFactory factory; private Connection conn; private Destination dest; private Session session; private MessageProducer producer; // 单线程池负责发送 private ExecutorService worker = Executors.newSingleThreadExecutor(); private volatile boolean stop = false; private static final Logger logger = Logger.getLogger(Sender.class); public Sender(String brokenURL, String user, String passwd, String queueName) throws JMSException { factory = new ActiveMQConnectionFactory(user, passwd, brokenURL); conn = factory.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); dest = session.createQueue(queueName); producer = session.createProducer(dest); } public void setInterval(long l) { interval = l; } public void start() { worker.submit(new SendTask()); } public synchronized void stop() { stop = true; worker.shutdown(); } private TextMessage randomMsg() { String uuid = UUID.randomUUID().toString(); TextMessage msg = null; try { msg = session.createTextMessage(uuid);//把uuid作为消息 msg.setJMSCorrelationID(uuid); } catch (JMSException e) { e.printStackTrace(); } return msg; } private class SendTask implements Runnable { public void run() { logger.info("Send task begin..."); while (!stop) { try { // 发送 TextMessage msg = randomMsg(); producer.send(msg); session.commit(); // commit后消息才会发送到服务端 logger.info("Send text message : " + msg.getText()); // 间隔 Thread.sleep(interval); } catch (Exception e) { e.printStackTrace(); break; } } logger.info("Send task finished..."); } } public static void main(String[] args) throws JMSException { Sender sender = new Sender(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE); sender.start(); } }
Reciever.java:
package cc.lixiaohui.test.jms.activemq.p2p; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import cc.lixiaohui.test.jms.activemq.Constants; public class Reciever { public static final int RECIEVE_MODE_SYNC = 0; public static final int RECIEVE_MODE_ASYNC = 1; private ConnectionFactory factory; private Connection conn; private Destination dest; private Session session; private MessageConsumer consumer; private ExecutorService worker = Executors.newSingleThreadExecutor(); private volatile boolean stop = false; private long interval = 3 * 1000; private static final Logger logger = Logger.getLogger(Reciever.class); // 同步/异步接收模式,默认同步 private int mode = RECIEVE_MODE_SYNC; public Reciever(String brokenURL, String user, String passwd, String queueName) throws JMSException { factory = new ActiveMQConnectionFactory(user, passwd, brokenURL); conn = factory.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); dest = session.createQueue(queueName); consumer = session.createConsumer(dest); } public void setInterval(long l) { interval = l; } public void setMode(int mode) { this.mode = mode; } public void start() throws JMSException { if (mode == RECIEVE_MODE_ASYNC) {// 异步 logger.info("Recieved task begin in async mode..."); // 由activemq组件回调 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { handleRecievedMessage(message); try { session.commit(); } catch (JMSException e) { e.printStackTrace(); stop = true; } } }); } else if (mode == RECIEVE_MODE_SYNC) {// 同步, 由于另起线程, 这里也不阻塞 worker.submit(new RecieveTask()); } } private void handleRecievedMessage(Message recievedMsg) { if (recievedMsg instanceof TextMessage) { TextMessage msg = (TextMessage) recievedMsg; try { logger.info("Recieved message : " + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public void stop() { stop = true; } private class RecieveTask implements Runnable { public void run() { logger.info("Recieved task begin in sync mode..."); while (!stop) { try { Message msg = consumer.receive(); handleRecievedMessage(msg); session.commit(); Thread.sleep(interval); } catch (Exception e) { e.printStackTrace(); break; } } logger.info("Recieve task finished..."); } } public static void main(String[] args) throws JMSException { Reciever reciever = new Reciever(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE); reciever.setMode(RECIEVE_MODE_ASYNC); reciever.start(); } }
测试结果:
1.Reciever在ASYNC模式,注意看日志时间,可以看到Sender一旦了消息,Reciever就会接受到消息(忽略网络)
2.Reciever在ASYNC模式下,Sender每秒发一个消息,而接收者每3秒接收一个消息:可以看到Reciever接受的时间是和Sender发送的时间是无联系的。
相关推荐
Spring整合JMS(activeMQ)的示例,开发环境为eclipse+maven
这是一个activemq应用的简单示例代码,使用maven搭建的.适合刚刚开始学习activemq的程序员
ActiveMQ简单入门示例,采用点对点的通信方式
使用activeMQ实现生产者消费者
memcached 和 activeMQ 的入门级示例代码,JAVA eclipse工程
activeMQ消息中间件,简单的的生产者和消费者测试消息传递。
本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...
005-集群部署1;006-集群部署2;007-集群部署3;activemq集群配置文档.pdf;ActiveMQ(中文)参考手册.doc;ActiveMQ集群:网络连接模式(network connector)详解.docx;ActiveMQ集群:网络连接模式(network connector)...
Apache ActiveMQ入门 如何运行应用程序: 提炼 跑步 bin/activemq start 启动 JMS术语 提供者,即面向消息的中间件或充当代理的应用程序,示例之一是Apache ActiveMQ,RabitMQ,Hive MQTT,IBM MQ,JBoss ...
ActiveMQ 入门该项目包括许多示例配置,显示了各种 ActiveMQ 高可用性场景。 其中包括:主从、经纪人网络和主从经纪人网络。 该项目还包括一个简单的 JMS 队列(点对点)消息生产者和消息消费者。 笔记: 所有说明都...
简单的Java消息服务示例我发现JMS入门有点复杂。 甚至O'Reilly的“ Java Messaging Service”之类的好书也没有提供完整的项目示例。 在这里,我提供了第2章中的聊天示例,其中包含完整的构建系统和说明。建造生成...
dubbo集群架构中用到的技术activemq,redis,fastdfs的入门代码示例
在此示例中,我们将使用两个容器,一个容器作为EnMasse实例运行,另一个容器作为运行Camel路由的代理的客户端。 此快速入门要求首先部署和运行EnMasse。 要将EnMasse安装到OpenShift或Kubernetes中,请遵循。 该...
Spring Boot,Camel和ActiveMQ快速入门 此快速入门展示了如何将Spring-Boot应用程序连接到A-MQ xPaaS消息代理,以及如何使用OpenShift在两条骆驼路线之间使用JMS消息传递。 建筑 这个例子可以用 mvn clean install ...
Spring Boot,Camel和ActiveMQ快速入门本快速入门介绍了如何使用OpenShift将Spring-Boot应用程序连接到A-MQ xPaaS消息代理,以及如何在两条骆驼路线之间使用JMS消息传递。建造这个例子可以用mvn clean install在...
xml java系统源码 Polaris【北极星】企业级云原生微服务框架 Polaris【北极星】企业级云原生微服务框架 ① Polaris【北极星】企业级云原生微服务框架文档 ...除上述《精进版》功能外,涉及到指南篇里的ActiveMQ、M
友情提示:因为提供了50000+行示例代码,所以艿艿替换注释了所有Maven模块。胖友可以根据自己的需要,修改立即。一个涵盖六个主流技术栈的正经仓库:作为一个热爱深夜撸码的18岁头发茂密的可爱小男孩,希望大佬能够...
5个目标文件,演示Address EJB的实现,创建一个EJB测试客户端,得到名字上下文,查询jndi名,通过强制转型得到Home接口,getInitialContext()函数返回一个经过初始化的上下文,用client的getHome()函数调用Home接口...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...