java代码
package com.yanzhi.system; import com.yanzhi.test.TestObject; import com.yanzhi.tools.C; import com.yanzhi.tools.Global; import com.yanzhi.tools.StringUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.*; import org.apache.activemq.pool.PooledConnectionFactory; import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.*; import java.io.Serializable; import java.util.Date; /** * Created by xiaoyunlian on 2016/2/24. */ public class MQProducer { public static Connection connection; public Connection getConnection() { if (connection == null) { connection = getConnectionObject(); } return connection; } public static Connection getConnectionObject() { try { //连接 ActiveMQConnectionFactory targetConnectionFactory = new ActiveMQConnectionFactory(); targetConnectionFactory.setBrokerURL(Global.getBrokerURL()); targetConnectionFactory.setTrustAllPackages(true); SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); connectionFactory.setTargetConnectionFactory(targetConnectionFactory);//根据applicationContext.xml文件配置连接 Connection connection = connectionFactory.createConnection(); connection.start(); return connection; } catch (Exception e) { e.printStackTrace(); } return null; } public Session session; public Session getSession() { try { if (session == null) { Connection connection = getConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } return session; } catch (JMSException e) { e.printStackTrace(); } return session; } public ActiveMQQueue getActiveMQQueue() { try { Session session = getSession(); return (ActiveMQQueue) session.createQueue("testQueue1,testQueue2,testQueue3"); } catch (JMSException e) { e.printStackTrace(); } return null; } public ActiveMQDestination getActiveMQDestination(String destinationName) { return getDestination(destinationName, getActiveMQQueue()); } public MessageProducer messageProducer; /** * 获取生产者 * * @return */ public MessageProducer getMessageProducer(String destinationName) { Session session = getSession(); ActiveMQDestination activeMQDestination = getActiveMQDestination(destinationName); try { if (messageProducer == null) { messageProducer = session.createProducer(activeMQDestination); } return messageProducer; } catch (JMSException e) { e.printStackTrace(); } return messageProducer; } /** * 获取某个队列:默认获取临时队列templateQueue * * @return */ public static ActiveMQDestination getDestination(String destinationName, Destination yanzhiQueueDestination) { ActiveMQDestination destination = (ActiveMQDestination) yanzhiQueueDestination; ActiveMQDestination[] destinations = destination.getCompositeDestinations(); if (StringUtils.isBlank(destinationName)) { return null; } ActiveMQDestination mqDestination = null; for (ActiveMQDestination activeMQDestination : destinations) { String name = activeMQDestination.getPhysicalName(); if (destinationName.equals(name)) { mqDestination = activeMQDestination; break; } } return mqDestination; } public static void sendMessage(String msgType, Session session, MessageProducer producer) { try { // 发送文本消息 if (C.ACTIVEMQ_MSG_TYPE_TEXT.equalsIgnoreCase(msgType)) { String textMsg = "~~~~~~~~~~~~~~测试消息 ActiveMQ Text Message!~~~~~~~~~~~~~~" + new Date() + "," + AppicationManager.getServerIP(); TextMessage msg = session.createTextMessage(); msg.setText(textMsg); producer.send(msg); } // 发送Map消息 if (C.ACTIVEMQ_MSG_TYPE_MAP.equalsIgnoreCase(msgType)) { MapMessage msg = session.createMapMessage(); msg.setBoolean("boolean", true); msg.setShort("short", (short) 0); msg.setLong("long", 123456); msg.setString("MapMessage", "ActiveMQ Map Message!"); producer.send(msg); } // 发送流消息 if (C.ACTIVEMQ_MSG_TYPE_STREAM.equalsIgnoreCase(msgType)) { String streamValue = "ActiveMQ stream Message!"; StreamMessage msg = session.createStreamMessage(); msg.writeString(streamValue); msg.writeBoolean(false); msg.writeLong(1234567890); producer.send(msg); } // 发送对象消息 if (C.ACTIVEMQ_MSG_TYPE_OBJECT.equalsIgnoreCase(msgType)) { TestObject object = new TestObject(); object.setName("对象名称"); object.setType(1); object.setFaceValue(45678); ObjectMessage msg = session.createObjectMessage(); msg.setObject(object); producer.send(msg); } // 发送字节消息 if (C.ACTIVEMQ_MSG_TYPE_BYTES.equalsIgnoreCase(msgType)) { String byteValue = "字节消息"; BytesMessage msg = session.createBytesMessage(); msg.writeBytes(byteValue.getBytes()); producer.send(msg); } } catch (Exception e) { e.printStackTrace(); } } /** * 发送对象消息 * * @param session * @param producer * @param object */ public static void sendObjectMessage(Session session, MessageProducer producer, Object object) { try { ObjectMessage msg = session.createObjectMessage(); msg.setObject((Serializable) object); producer.send(msg); } catch (Exception e) { e.printStackTrace(); } } }
测试代码:
在单元测试中放入如下代码:
MQProducer mqProducer = new MQProducer(); MessageProducer producer = mqProducer.getMessageProducer("testQueue1"); MQProducer.sendObjectMessage(mqProducer.getSession(),producer,recordList);
其中,Global.getBrokerURL()的值是:tcp://192.168.199.149:61616?wireFormat.maxInactivityDuration=0&connectionTimeout=0&keepAlive=true
相关推荐
springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用
使用SpringBoot开发的ActiveMQ生产者/消费者的demo。相关内容可以看这里: https://my.oschina.net/noryar/blog/1575003
activemq生产者和消费者案例代码.zipactivemq生产者和消费者案例代码.zip
activeMQ实现生产者和消费者的代码
使用ActiveMQ类使用官方例程编写简单的例子,带有头文件和类库,方便初学者
先到官网下载activemq,启动后,就可以直接运行项目文件了
一套完整的activemq生产者、消费者。同shh整合开发完整的项目。注释清楚,包括activemq的消息重试,重复消费等解决方案。可供新手学习使用。
activemq生产者,消费者api,以及activemq整合spring配置
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。...配合 producer 生产者demo使用。
java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况
ActiveMQ集群及生产者和消费者Java代码
activeMQ 详细教程与源码(包含消费者与生产者),下载可根据文档修改配置文件后进行测试
2.在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;...
使用activeMQ实现生产者消费者
activemq linux下的多线程客户端,在windows下同样适用,包含生产消息客户端,和消费者消息客户端,以及生产者消息多线程客户端
*生产者启动程序,并发送消息给amq服务器(broker) */ public class Producer { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //加载spring...
自己写的ActiveMQ简单demo,包括生产者、消费者之间发送消息、持久化到文件和持久化到数据库,期中持久化需要修改activemq.xml文件
Spring整合ActiveMQ实现队列和主题发布订阅通信、一个完整的DEMO
ActiveMQ 的安装与使用,ActiveMQ与spring整合,生产都、消费者、测试类等。
消息生产者发送工作消息到 JMS 队列,消费者从这个队列中接收消息并处理。点对点模式不需要生产者和消费者同时在线。队列会一直保留收到的消息,直到有消费者把它消费掉。当消费者可用时,队列会把消息发给每一个...