`
109735215
  • 浏览: 30665 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

【Hello world】(一)========ActiveMQ

阅读更多
【前言】

在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。还有一个比较热的是 RabbitMQ (是 erlang 语言实现的)。这里示例下使用 ActiveMQ.

JMS

用 ActiveMQ 最好还是了解下 JMS

JMS 公共 点对点域 发布/订阅域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber

JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。

ConnectionFactory 是连接工厂,负责创建Connection。

Connection 负责创建 Session。

Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。

Destination 是消息的目的地。

apache-activemq-5.3.0。http://activemq.apache.org/download.html,解压,然后双击 bin/activemq.bat。运行后,可以在http://localhost:8161/admin观察。也有 demo,http://localhost:8161/demo

把 activemq-all-5.3.0.jar 加入 classpath

发送消息的代码:

package com.mq.send;
import java.util.Date;

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

import javax.jms.*; 


public class SendTest {
	public static void main(String[] args) throws JMSException {
		// ConnectionFactory :连接工厂,JMS 用它创建连接 
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 
                        ActiveMQConnection.DEFAULT_USER, 
                        ActiveMQConnection.DEFAULT_PASSWORD, 
                        "tcp://localhost:61616"); 
        //JMS 客户端到JMS Provider 的连接 
        Connection connection = connectionFactory.createConnection(); 
        connection.start(); 
        // Session: 一个发送或接收消息的线程 
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
        // Destination :消息的目的地;消息发送给谁. 
        // 获取session注意参数值my-queue是Query的名字 
        Destination destination = session.createQueue("my-queue"); 
        // MessageProducer:消息生产者 
        MessageProducer producer = session.createProducer(destination); 
        //设置不持久化 
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
        //发送一条消息 
        for (int i = 0; i < 1000; i++) {
			
        	sendMsg(session, producer); 
		}
        session.commit(); 
        connection.close(); 
	}
	
	 public static void sendMsg(Session session, MessageProducer producer) throws JMSException { 
         //创建一条文本消息 
         TextMessage message = session.createTextMessage("Hello ActiveMQ!"+new Date().getTime()); 
         System.out.println("Hello ActiveMQ!"+new Date().getTime());
         //通过消息生产者发出消息 
         producer.send(message); 
        
 } 
}
接受消息:

package com.mq.send;

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

import javax.jms.*; 

/** 
* 消息的消费者(接受者) 
* 
* @author leizhimin 2009-8-12 11:41:33 
*/ 
public class JmsReceiver { 
        public static void main(String[] args) throws JMSException { 
                // ConnectionFactory :连接工厂,JMS 用它创建连接 
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 
                                ActiveMQConnection.DEFAULT_USER, 
                                ActiveMQConnection.DEFAULT_PASSWORD, 
                                "tcp://localhost:61616"); 
                //JMS 客户端到JMS Provider 的连接 
                Connection connection = connectionFactory.createConnection(); 
                connection.start(); 
                // Session: 一个发送或接收消息的线程 
                Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
                // Destination :消息的目的地;消息发送给谁. 
                // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 
                Destination destination = session.createQueue("my-queue"); 
                // 消费者,消息接收者 
                MessageConsumer consumer = session.createConsumer(destination); 
                while (true) { 
                        TextMessage message = (TextMessage) consumer.receive(1000); 
                        if (null != message) 
                                System.out.println("收到消息:" + message.getText()); 
                        else 
                                break; 
                } 
                session.close(); 
                connection.close(); 
        } 
}



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics