- 浏览: 950914 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
这篇主要是测试PTP模式下的回复消息,具体测试代码如下:
队列生产者2(消费者回复消息):
队列消费者3消费消息,并回复消息:
消费者4,消费消费者回复消息
开启消费则3,4监听,再启动生产者2;
控制台输出
生产者2:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.
消费者3:
3消费消息:向ActiveMq发送的Queue消息1
3消费消息:向ActiveMq发送的Queue消息2
3消费消息:向ActiveMq发送的Queue消息3
3消费消息:向ActiveMq发送的Queue消息4
3消费消息:向ActiveMq发送的Queue消息5
消费者4:
4消费者回复消息:向ActiveMq发送的Queue消息1
4消费者回复消息:向ActiveMq发送的Queue消息2
4消费者回复消息:向ActiveMq发送的Queue消息3
4消费者回复消息:向ActiveMq发送的Queue消息4
4消费者回复消息:向ActiveMq发送的Queue消息5
队列生产者2(消费者回复消息):
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue(点对点)方式 生存者Producer * @author donald * */ public class QueueProducer2 { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://192.168.126.128:61616"; private static String qname = "testQueue"; private static String replyQueueName = "replyQueue"; static { } public static void main(String[] args)throws Exception { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); // Connection 启动 connection.start(); System.out.println("Connection is start..."); // Session: 一个发送或接收消息的线程 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // Queue :消息的目的地;消息发送给谁. Queue destination = session.createQueue(qname); //消费者接受消息,回复消息到replyQueue队列 Queue replyQueue = session.createQueue(replyQueueName); // MessageProducer:消息发送者 MessageProducer producer = session.createProducer(destination); // 设置持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer,replyQueue); session.commit(); connection.close(); System.out.println("send text ok."); } public static void sendMessage(Session session, MessageProducer producer,Queue replyQueue) throws Exception { for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行 TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i); message.setJMSReplyTo(replyQueue); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i); producer.send(message); } } }
队列消费者3消费消息,并回复消息:
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue(点对点)方式 消费这Consumer * @author donald * */ public class QueueConsumer3 { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://192.168.126.128:61616"; private static String qname = "testQueue"; public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Queue destination=session.createQueue(qname); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println("3消费消息:"+textMessage.getText()); MessageProducer producer = session.createProducer(message.getJMSReplyTo()); TextMessage replyMessage = session.createTextMessage(textMessage.getText()); producer.send(replyMessage); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); /* 另外一种接受方式 * while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } }*/ } }
消费者4,消费消费者回复消息
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Queue(点对点)方式 消费这Consumer * @author donald * */ public class QueueConsumer4 { private static String user = ActiveMQConnection.DEFAULT_USER; private static String password =ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://192.168.126.128:61616"; private static String qname = "replyQueue"; public static void main(String[] args) throws Exception{ // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一个发送或接收消息的线程 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息发送给谁. Queue destination=session.createQueue(qname); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener(){//有事务限制 @Override public void onMessage(Message message) { try { TextMessage textMessage=(TextMessage)message; System.out.println("4消费者回复消息:"+textMessage.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }); /* 另外一种接受方式 * while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } }*/ } }
开启消费则3,4监听,再启动生产者2;
控制台输出
生产者2:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.
消费者3:
3消费消息:向ActiveMq发送的Queue消息1
3消费消息:向ActiveMq发送的Queue消息2
3消费消息:向ActiveMq发送的Queue消息3
3消费消息:向ActiveMq发送的Queue消息4
3消费消息:向ActiveMq发送的Queue消息5
消费者4:
4消费者回复消息:向ActiveMq发送的Queue消息1
4消费者回复消息:向ActiveMq发送的Queue消息2
4消费者回复消息:向ActiveMq发送的Queue消息3
4消费者回复消息:向ActiveMq发送的Queue消息4
4消费者回复消息:向ActiveMq发送的Queue消息5
发表评论
-
Spring与ActiveMQ的集成详解二
2017-01-03 10:07 1987JMS(ActiveMQ) PTP和PUB/SUB模 ... -
Spring与ActiveMQ的集成详解一
2017-01-02 17:19 4495JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6070JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6415JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ消费者详解
2017-01-01 14:38 8542JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6764JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ会话初始化详解
2016-12-31 20:26 4686JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 11864JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4183ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7694下载apache-activemq-5.12.1.tar.gz ... -
Spring与ActiveMQ的集成
2016-12-27 18:09 1640JMS与MQ详解:http://www.fx114.net/q ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3027深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
ActiveMQ开发实例核心内容,本人使用ActiveMQ开发的使用总结,版权所有归属本人,如果要引用请注明出处。需配合ActiveMQ开发实例系列其他资源使用
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
Springboot和ActiveMQ的整合实例
基于Maven的ActiveMQ的简单实例
该例子是本人写的一个关于使用springMVC搭建的activeMQ的JSM实例,希望对学校JMS的朋友有所帮助。
此实例基于Spring+JMS+ActiveMQ+Tomcat,注解的完整实例,包含jar包
ActiveMQ开发实例-2,使用源码,需配合ActiveMQ开发实例系统其他资源使用
ActiveMQ开发库文件(Release版),需配合ActiveMQ开发实例系列其他资源使用
包括1、ActiveMQ java实例 2、ActiveMQ Spring结合实例 3、代码亲测,无问题。 4、资源分5分绝对值 注意:请先安装ActiveMQ 服务。
Spring MVC + JPA + MQ + redis +activemq 集成项目实例 详细描述,集成环境搭建
Spring整合ActiveMQ超级详细实例,基于Spring+JMS+ActiveMQ+Tomcat,注解的完整实例,包含jar包,可以参考学习
Spring和ActiveMQ的整合实例源码ActiveMQSpringDemo,此实例基于Spring+JMS+ActiveMQ+Tomcat,注解的完整实例,包含jar包
ActiveMQ开发实例-3,开发工程配置说明,需配合ActiveMQ开发实例系列其他资源使用
Spring+ActiveMQ active基本例子 注解的完整实例,包含jar包
ActiveMQ开发实例-5,ActiveMQ运行包,已验证可直接使用,需配合ActiveMQ开发实例系列其他资源使用
Activemq入门实例.pdf
spring 中使用activemq,queue 、topic的同步接收,异步接收的实例,使用的是maven的项目工程!
ActiveMQ实例,包括一个P2P的和一个PubSub的案例
博客http://blog.csdn.net/poorCoder_/article/details/61192791的代码。Spring整合ActiveMQ的简单实例
ActiveMQ实例,源代码ActiveMQ实例