`

ActiveMQ学习

 
阅读更多
  • JMS概述

JMS:Java消息服务。定义了Java中访问消息中间件的接口。JMS只是接口,实现JMS接口的消息中间件称为JMS Provider,例如ActiveMQ。

PTP:Point to Point,点对点消息模型

Pub/Sub:Publish/Subscribe,发布/订阅消息模型

Queue:队列目标

Topic:主题目标

 

JMS支持两种消息传送模型

PTP:消息从一个生产者传送至一个消费者。在此传送模型中,目标是一个队列。消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息。可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至、并由一个消费者成功使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者。

Pub/Sub:消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费者。可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者。主题目标也支持持久订阅的概念。持久订阅表示消费者已向主题目标进行注册,但在消息传送时此消费者可以处于非活动状态。当此消费者再次处于活动状态时,它将接收此信息。如果没有已经向主题目标注册的消费者,主题不保留其接收到的消息,除非有非活动消费者注册了持久订阅。

 

  • ActiveMQ安装

下载地址:http://activemq.apache.org/

解压后运行bin\activemq.bat

可以通过http://localhost:8161/admin访问控制台

注意:ActiveMQ默认采用的是jetty作为web服务器,要进控制台必须输入认证,认证信息在jetty-realm.properties文件里面。

 

  • 创建消息生产者
public class MQProducer {
	public static void main(String[] args) throws JMSException {
		// 创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("system", "manager", "tcp://localhost:61616");
        // 通过工厂创建一个连接
        Connection connection = factory.createConnection();
        // 启动连接
        connection.start();
        // 创建一个session会话
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        // 创建一个消息队列
        Destination destination = session.createQueue("my.queue");
        // 创建消息生成者
        MessageProducer producer = session.createProducer(destination);
        // 设置持久化模式
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        // 创建消息并发送
        Message textMessage = session.createTextMessage("text message");
        producer.send(textMessage);
        
        connection.close();
	}
}

 

  • 创建消息消费者
public class MQConsumer implements MessageListener {
	public static void main(String[] args) throws JMSException {
		// 创建链接工厂
		ConnectionFactory factory = new ActiveMQConnectionFactory("system","manager", "tcp://localhost:61616");
		// 通过工厂创建一个连接
		Connection connection = factory.createConnection();
		// 启动连接
		connection.start();
		// 创建一个session会话
		Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 创建一个消息队列
		Destination destination = session.createQueue("my.queue");
		// 创建消息消费者
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MQConsumer());
	}

	@Override
	public void onMessage(Message message) {
		try {
			TextMessage text = (TextMessage) message;
			System.out.println("接收:" + text.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

 

  • 代码详解

ConnectionFactory:建立到ActiveMQ的连接。需要认证的信息在conf/credentials.properties文件里面。

Connection:创建一个连接。start方法开启连接,close方法关闭连接。

Session:是一个发送或接收消息的线程。一个Connection可以创建多个session。

Destination:指定生产消息目标和消费消息来源的对象。在PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题。

MessageProducer:一个由Session创建的对象,用来向Destination发送消息。

MessageConsumer:一个由Session创建的对象,用来从Destination接收消息。通常采用异步的方式接受消息,即注册一个Listener,实现onMessage方法。

Message:支持5种消息类型TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics