`
orange5458
  • 浏览: 348304 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Active MQ (二) Java Application编程

阅读更多

1.准备工作

  1)启动ActiveMQ

  2)将%ActiveMQ_HOME%\activemq-all-5.1.0.jar导入对应的JAVA Project

 

2.对于MQ编程通常使用JMS编程,但是由于纯JAVA Application Project中不存在JNDI容器,所以还是在

  创建ConnectionFactory时还是不得不与ActiveMQ API耦合

 

3.JMS体系结构

  

 

JMS有两种消息模式:

1)一对一/点到点 模式

2)预订/发布 模式


JMS编程主要的接口有: 

ConnectionFactory

Connection : 一般一个aplication中只有一个,线程安全,可复用。

Session

Queue

Topic

MessageProducer

MessageConsumer

MessageListener

 

4.实例

1)一对一/点到点

 

QueueReceiverListener.java 

 

package com.siyuan.jms;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

public class QueueReceiverListener implements MessageListener {
 
 private Connection connection;
 
 private String queueNameSendTo;
 
 /**
  * @return the connection
  */
 public Connection getConnection() {
  return connection;
 }

 /**
  * @param connection the connection to set
  */
 public void setConnection(Connection connection) {
  this.connection = connection;
 }

 /**
  * @return the queueNameSendTo
  */
 public String getQueueNameSendTo() {
  return queueNameSendTo;
 }

 /**
  * @param queueNameSendTo the queueNameSendTo to set
  */
 public void setQueueNameSendTo(String queueNameSendTo) {
  this.queueNameSendTo = queueNameSendTo;
 }

 public void onMessage(Message message) {
  if (message instanceof TextMessage) {
   try {
    System.out.println("Current time : " + new Date());
    System.out.println("Receive : " + ((TextMessage) message).getText());
    System.out.println("Receive JMSCorrelationID : " +  message.getJMSCorrelationID());
    
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queueSendTo = session.createQueue(queueNameSendTo);
    MessageProducer sender = session.createProducer(queueSendTo);
    TextMessage messageToSend = session.createTextMessage();
    messageToSend.setText("Nice to meet u, too.");
    messageToSend.setJMSCorrelationID(message.getJMSCorrelationID());
    sender.send(messageToSend);
    System.out.println("Send : " + messageToSend.getText());
    System.out.println("Send JMSCorrelationID : " +  messageToSend.getJMSCorrelationID());
    System.out.println("--------------------------------------------");
    sender.close();
    session.close();
   } catch (JMSException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }
 }

}

====================================================================

 

JMSQueueTest.java

 

package com.siyuan.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSQueueTest {
 
 public static String QUEUE_RECEIVE_FROM_NAME = "MY.TEST.SENDER";

 public static String QUEUE_SEND_TO_NAME = "MY.TEST.RECEIVER";
 
 /**
  * @param args
  * @throws JMSException
  */
 public static void main(String[] args) throws JMSException {
  
  ConnectionFactory connFactory
   = new ActiveMQConnectionFactory("tcp://localhost:61616");
  Connection connection = connFactory.createConnection();
  connection.start();
  
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  
  Queue queue = session.createQueue(QUEUE_RECEIVE_FROM_NAME);
  MessageConsumer messageProcessor = session.createConsumer(queue);
  QueueReceiverListener processor = new QueueReceiverListener();
  processor.setConnection(connection);
  processor.setQueueNameSendTo(QUEUE_SEND_TO_NAME);
  messageProcessor.setMessageListener(processor);
  
 }

}

 

说明:本例为模拟一消息接收处理器,即从一Queue中获取Req消息,处理后将

         Resp消息发送至另一Queue中,其中Req和Resp通过JMSCorrelationID

         进行关联。相关的Req消息发送器将在ActiveMQ和Tomcat集成中在JSP中实现。

编程中需注意:(1) connection.start();
                     (2) 异步方式获取消息,不能connection.close();

 

2)预订/发布

 

JMSTopicTest.java

 

package com.siyuan.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSTopicTest {

 /**
  * @param args
  * @throws JMSException
  */
 public static void main(String[] args) throws JMSException {
  // TODO Auto-generated method stub
  ConnectionFactory connFactory
   = new ActiveMQConnectionFactory("tcp://localhost:61616");
  Connection connection = connFactory.createConnection();
  connection.start();
  
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  
  Topic topic = session.createTopic("MY.TEST.TOPIC");

  MessageConsumer subscriber = session.createConsumer(topic);
  MessageProducer publisher = session.createProducer(topic);
  publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   
  TextMessage messageToPublish = session.createTextMessage();
  messageToPublish.setText("Hello all!");
  publisher.send(messageToPublish);
  
    Message messageFromSubscribe = subscriber.receive(3000);
  System.out.println(messageFromSubscribe);
  if (messageFromSubscribe instanceof TextMessage) {
   System.out.println(((TextMessage) messageFromSubscribe).getText());
  }
  publisher.close();
  subscriber.close();
  session.close();
  connection.close();
 }

}

 

编程中需注意:(1) subscriber定义必须在publisher定义之前

                               只能预订后才能收到发布的消息

 

 5.参考资料:

http://orange5458.iteye.com/admin/blogs/991311

JMS学习文档 PDF格式

 

  • 大小: 64.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics