`
arlord
  • 浏览: 5066 次
最近访客 更多访客>>
文章分类
社区版块
存档分类
最新评论

ActiveMQ 入门

阅读更多
版本:5.7.0
核心jar包:
核心类:ActiveMQConnectionFactory
PooledConnectionFactory

环境:apache-activemq-5.7.0
对于MQ来说主要的两个对象是消息的发送者和消息的监听者,所以下面我们直接编写两个对象,发送消息的对象和监听消息的对象
package com.arlord;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

public class SendMessage {
    
      private static final Logger logger = Logger.getLogger(SendMessage. class);
    
          //使用TCP协议连接MQ服务器
      private static final String url = "tcp://localhost:61616";
    
          //消息队列名称
      private static final String QUEUE_NAME = "choice.queue";
    
          //消息内容 一般不建议使用String,如果是xml文件可以用StringBuffer
      protected String expectedBody = "<hello>world</hello>" ;
    
      public void sendMessage() throws JMSException{
            logger.info( "===========sendMessage start==============");
         Connection connection = null;
//ActiveMQConnectionFactory在MQ里面是一个执行对象,主要用来创建连接
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//创建连接
         connection = (Connection)connectionFactory.createConnection();
//启动连接
         connection.start();
        //创建消息会话
         Session session = (Session)connection.createSession(false , Session.AUTO_ACKNOWLEDGE );
//
         Destination destination = session.createQueue(QUEUE_NAME );
//创建消息生产者
         MessageProducer producer = session.createProducer(destination);
//创建消息
         TextMessage message = session.createTextMessage(expectedBody );
//使用消息生产者发送消息
         producer.send(message);
         producer.close();
         session.close();
         logger.info( "===============sendMessage end==============");
     }
    
      public static void main(String[] args){
             SendMessage sndMsg = new SendMessage();
             try{
              sndMsg.sendMessage();
             } catch(Exception ex){
              System. out.println(ex.toString());
             }
            }
}


下面就编写消息的监听者
package com.arlord;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

//消息监听者要实现MessageListener 的接口
public class ReceiveMessage implements MessageListener {
    
     //连接本地的MQ服务器
     private static final String url = "tcp://localhost:61616";
    
//声明消息队列名称,注意这个队列名称要与消息发送者的队列名称一致才能监听到发送的消息
     private static final String QUEUE_NAME = "choice.queue";
    
//接收消息
     public void receiveMessage(){
          Connection connection = null;
          try {
               ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
               connection = connectionFactory.createConnection();
               connection.start();
               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
               Destination destination = session.createQueue(QUEUE_NAME);
     //这个时候要使用MessageConsumer,该类是消息的消费者
               MessageConsumer consumer = session.createConsumer(destination);
//为这个消息消费者注入监听器
               consumer.setMessageListener(this);
          } catch (JMSException e) {
               System.out.println(e.toString());
          }
         
         
     }
    
     protected void consumeMessageAndClose(Connection connection,Session session,
               MessageConsumer consumer)throws JMSException{
          for(int i = 0;i<1;){
               Message message = consumer.receive(1000);
               if(message != null){
                    i++;
                    onMessage(message);
               }
          }
          /*System.out.println("Closing conenction");
          consumer.close();
          session.close();
          connection.close();*/
     }
    
//重写接口的onMessage方法
     public void onMessage(Message message){
          try{
               if(message instanceof TextMessage){
                    TextMessage txtMsg =(TextMessage) message;
                    String msg = txtMsg.getText();
                    System.out.println("Received: " + msg);
               }
          }catch(Exception ex){
               ex.printStackTrace();
          }
         
     }
    
     public static void main(String args[]) { 
            ReceiveMessage rm = new ReceiveMessage(); 
            rm.receiveMessage(); 
    } 

}

执行步骤:
1、启动mq服务器,在下下来的包里面bin文件夹直接双击activemq.bat 启动
2、执行SendMessage
3、执行ReceiveMessage

说明:可以通过页面访问mq服务器:http://localhost:8161/admin/  点击页面的queue查看消息队列的情况
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics