java代码:
package com.yanzhi.system; import com.yanzhi.tools.C; import com.yanzhi.tools.Global; import com.yanzhi.tools.StringUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.pool.PooledConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.connection.SingleConnectionFactory; import javax.jms.*; import java.sql.Timestamp; import java.util.List; /** * Created by xiaoyunlian on 2016/2/14. * activemq消费者实例 */ public class ConsumerApp implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class); public static void start(){ try { //连接 ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory(); targetConnectionFactory.setBrokerURL(Global.getBrokerURL()); targetConnectionFactory.setTrustAllPackages(true); SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接 Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //队列 ActiveMQQueue yanzhiQueueDestination = (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3"); ConsumerApp consumerMessageListener = new ConsumerApp(); MessageConsumer consumer = session.createConsumer(yanzhiQueueDestination); consumer.setMessageListener(consumerMessageListener); System.err.println("~~~~~~~~~~~~~~~~~~~~~~ active MQ 消费者监听器 启动成功~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); }catch (Exception e){ e.printStackTrace(); } } @Override public void onMessage(Message msg) { try { Destination dest = msg.getJMSDestination(); if (msg instanceof TextMessage) { ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)msg; String name = activeMQTextMessage.getDestination().getPhysicalName(); TextMessage message = (TextMessage) msg; System.err.println("队列:"+name+"接收者接到一个String消息:"+message.getText()); } else if (msg instanceof MapMessage) { ActiveMQMapMessage activeMQMapMessage = (ActiveMQMapMessage) msg; String destinationName = activeMQMapMessage.getDestination().getPhysicalName(); } else if (msg instanceof StreamMessage) { StreamMessage message = (StreamMessage) msg; System.out.println("------Received StreamMessage------"); System.out.println(message.readString()); System.out.println(message.readBoolean()); System.out.println(message.readLong()); } else if (msg instanceof ObjectMessage) { ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage)msg; String destinationName = activeMQObjectMessage.getDestination().getPhysicalName(); if (Global.getPkQuene().equals(destinationName)){ ObjectMessage objectMessage = (ObjectMessage) msg; Object object = objectMessage.getObject(); if (object instanceof List){ // do something } } if (Global.getFaceValueReportQuene().equals(destinationName)){ // do something } if (Global.getRegUserQuene().equals(destinationName)){ // do something } } else if (msg instanceof BytesMessage) { System.out.println("------Received BytesMessage------"); BytesMessage message = (BytesMessage) msg; byte[] byteContent = new byte[1024]; int length = -1; StringBuffer content = new StringBuffer(); while ((length = message.readBytes(byteContent)) != -1) { content.append(new String(byteContent, 0, length)); } System.out.println(content.toString()); } else { System.out.println(msg); } } catch (JMSException e) { LOGGER.error("error {}", e); } } }
调用上面的start()方法,即可启动消息队列的消费者。
相关推荐
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
动态创建ActiveMQ消费者
使用SpringBoot开发的ActiveMQ生产者/消费者的demo。相关内容可以看这里: https://my.oschina.net/noryar/blog/1575003
ActiveMQ整合Spring(多消费者)
activemq生产者和消费者案例代码.zipactivemq生产者和消费者案例代码.zip
am_spring_consumer ActiveMQ 消费者实例
springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用
先到官网下载activemq,启动后,就可以直接运行项目文件了
activeMQ实现生产者和消费者的代码
activemq linux下的多线程客户端,在windows下同样适用,包含生产消息客户端,和消费者消息客户端,以及生产者消息多线程客户端
java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况
2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;...
activeMQ 详细教程与源码(包含消费者与生产者),下载可根据文档修改配置文件后进行测试
ActiveMQ集群及生产者和消费者Java代码
使用activeMQ实现生产者消费者
activemq通过IP或特征码授权插件
一套完整的activemq生产者、消费者。同shh整合开发完整的项目。注释清楚,包括activemq的消息重试,重复消费等解决方案。可供新手学习使用。
ActiveMQ 不仅实现了 JMS 规范中定义的所有特性,也额外提供了一些特有且有用的特性。...当消费者可用时,队列会把消息发给每一个消费者,但每个消息只会发给一个消费者,队列使用轮询算法向每一个消费者分发消息。
ActiveMQ 的安装与使用,ActiveMQ与spring整合,生产都、消费者、测试类等。
使用ActiveMQ作为jms服务器,创建生成者和消费者对象。