`
Donald_Draper
  • 浏览: 950914 次
社区版块
存档分类
最新评论

ActiveMQ PTP模式实例二

阅读更多
这篇主要是测试PTP模式下的回复消息,具体测试代码如下:
队列生产者2(消费者回复消息):
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  生存者Producer 
 * @author donald
 *
 */
public class QueueProducer2 {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url =  "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
   private static String replyQueueName =  "replyQueue";
   static {
	  
   }
   public static void main(String[] args)throws Exception {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       // Connection 启动  
       connection.start();  
       System.out.println("Connection is start...");  
       // Session: 一个发送或接收消息的线程  
       Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
       // Queue :消息的目的地;消息发送给谁.  
       Queue  destination = session.createQueue(qname);  
       //消费者接受消息,回复消息到replyQueue队列
       Queue  replyQueue = session.createQueue(replyQueueName);
       // MessageProducer:消息发送者  
       MessageProducer producer = session.createProducer(destination);  
       // 设置持久化,此处学习,实际根据项目决定  
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        // 构造消息,此处写死,项目就是参数,或者方法获取  
       sendMessage(session, producer,replyQueue);  
       session.commit();  
 
       connection.close();  
       System.out.println("send text ok.");  
   }  
     
   public static void sendMessage(Session session, MessageProducer producer,Queue replyQueue)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           message.setJMSReplyTo(replyQueue);
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  
}  


队列消费者3消费消息,并回复消息:

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  消费这Consumer
 * @author donald
 *
 */
public class QueueConsumer3 {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Queue destination=session.createQueue(qname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
                   TextMessage textMessage=(TextMessage)message;  
                   System.out.println("3消费消息:"+textMessage.getText());
                   MessageProducer producer = session.createProducer(message.getJMSReplyTo());  
                   TextMessage replyMessage = session.createTextMessage(textMessage.getText());
                   producer.send(replyMessage);  
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
}  



消费者4,消费消费者回复消息

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  消费这Consumer
 * @author donald
 *
 */
public class QueueConsumer4 {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String qname =  "replyQueue";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Queue destination=session.createQueue(qname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
                   TextMessage textMessage=(TextMessage)message;  
                   System.out.println("4消费者回复消息:"+textMessage.getText());
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
} 

开启消费则3,4监听,再启动生产者2;
控制台输出
生产者2:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.

消费者3:
3消费消息:向ActiveMq发送的Queue消息1
3消费消息:向ActiveMq发送的Queue消息2
3消费消息:向ActiveMq发送的Queue消息3
3消费消息:向ActiveMq发送的Queue消息4
3消费消息:向ActiveMq发送的Queue消息5


消费者4:
4消费者回复消息:向ActiveMq发送的Queue消息1
4消费者回复消息:向ActiveMq发送的Queue消息2
4消费者回复消息:向ActiveMq发送的Queue消息3
4消费者回复消息:向ActiveMq发送的Queue消息4
4消费者回复消息:向ActiveMq发送的Queue消息5
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics