直接上代码:
发送:
public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS // Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } }
接收:
public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) { // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } }
基于Spring的ActiveMQ:
XML文件:
<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="clientID" value="clientId_007" /> </bean> <bean id="deviceListener" class="net.coc.activemq.listener.DeviceMessageListener"> <property name="deviceAdapter" ref="deviceAdapter"></property> </bean> <bean id="deviceTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="Assets.Device.Topic" /> </bean> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="topicListenConnectionFactory" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> <property name="destination" ref="deviceTopic" /> <property name="subscriptionDurable" value="true" /> <property name="clientId" value="deviceListenerclientId_007" /> <property name="durableSubscriptionName" value="deviceListenerclientId_007" /> <property name="messageListener" ref="deviceListener" /> </bean>
Java:
public class DeviceMessageListener implements MessageListener { private DeviceAdapter deviceAdapter; public DeviceAdapter getDeviceAdapter() { return deviceAdapter; } public void setDeviceAdapter(DeviceAdapter deviceAdapter) { this.deviceAdapter = deviceAdapter; } @Override public void onMessage(Message message) { TextMessage textMsg = (TextMessage) message; try { JSONObject msg = JSONObject.fromObject(textMsg.getText()); deviceAdapter.execute(msg); deviceAdapter.finish(); } catch (JMSException e) { e.printStackTrace(); throw new RuntimeException(e); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } }
deviceAdapter 是具体解析消息的处理类,这个需要根据各自的需求自己定制;
相关推荐
该demo主要用于activeMQ初学,队列消息监听和Topic消息监听
本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...
基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...
此文档专门针对ActiveMQ初学者,内容清晰易懂,适合所有开发人员用
ActiveMQ初学时写的小demo,可以直接导入eclipse运行,压缩包内包含window版的ActiveMQ(内含有ActiveMQ完整包activemq-all-5.12.1.jar)
ActiveMQ是Apache开发的开源消息中间件,相比重量级收费的IBM 的消息中间件MQ, ActiveMQ更适合于初学者!
ActiveMQ 开发例子初学者可以看看
用于ACtiveMq 配置插件配置使用,配置介绍等,适合初学者
NULL 博文链接:https://zhaoshijie.iteye.com/blog/2090954
该资源是spring与activemq的基础整合,适合消息中间件的初学者
虽然ActiveMQ目前已经不是开发时的主要消息中间件, 但是对于简单使用JMS的场景而言, ActiveMQ仍然是一个比较成熟、稳定的框架,可以供初学者、小微企业快速上手。 虽然Apache ActiveMQ目前有多种安装方式, 使用...
此代码为自己找的一个activemq的小实例,集成spring直接以单元测试的形式展示各个调用情景,初学好帮手
使用c# winfrom 做了一个ActiveMQ一个服务端多个客户端的小程序,希望可以帮到初学者
使用ActiveMQ类使用官方例程编写简单的例子,带有头文件和类库,方便初学者
适合不同的应聘者:这份面试专题资料既适合有一定经验的ActiveMQ开发人员,也适合初学者和想要了解ActiveMQ的人使用。初学者可以从基础知识开始学习,逐渐掌握更多的知识,而有经验的开发人员可以通过这份面试专题...
本资源为百度云盘的链接地址,包括ActiveMQ从入门到精通的视频教程,资料文档及实例代码。适合初学者入门上手。
此demo结合了网上纷乱的资源进行讲解。少量操作即可直接跑通,适合初学者进行学习。所需jar包在demo中也都具有。
使用C# winfrom做了一个ActiveMQ一个服务端对应一个客户端的即时收发消息的小程序,注释写得很详细,希望能帮到初学者。
ActiveMQ的队列模式,初学ActiveMQ的,一个很好的体验、入门,代码注释很清楚,JMS的java使用步骤
这是我学习消息队列的总结而编写的activemq的使用代码,使用SpringBoot集成activemq以及简化使用代码,让初学者更容易接受,同时让高级开发看起来更加舒服