`

ActiveMQ消息选择器与异步接收

 
阅读更多
一、消息选择器
   消息选择器:过滤消息属性与设置条件相等的消息进行消费。语义与sql一致。
  
   private final String selector_1 = "sex='w'";
   this.consumer = session.createConsumer(destination, selector_1);
   


二、消息异步接收
   消息异步接收 :当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息

示例:

public class Producer {
	// 建立connectionFactory工厂对象
	private ActiveMQConnectionFactory connectionFactory;
	// 连接对象
	private Connection connection;
	// session对象
	private Session session;
	// 生产者
	private MessageProducer producer;

	public Producer() {
		this.connectionFactory = new ActiveMQConnectionFactory();
		try {
			this.connection = connectionFactory.createConnection("fu", "fu");
			this.connection.start();
			//参一:开启事务,参二,手工签收
			this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
			this.producer = session.createProducer(null);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void send() throws Exception {
		Destination destination = this.session.createQueue("first");

		MapMessage map = this.session.createMapMessage();
		//设置消息
		map.setString("name", "zs");
		map.setString("age", "40");
		//设置消息属性
		map.setStringProperty("sex", "m");

		MapMessage map1 = this.session.createMapMessage();
		map1.setString("name", "ls");
		map1.setString("age", "20");
		map1.setStringProperty("sex", "w");
		
		MapMessage map2 = this.session.createMapMessage();
		map2.setString("name", "ww");
		map2.setString("age", "35");
		map2.setStringProperty("sex", "w");
		//参一目标,参二数据,参三非持久化,参四做优先及,参五失效时间
		this.producer.send(destination, map, DeliveryMode.PERSISTENT, 2, 1000*10);
		this.producer.send(destination, map1, DeliveryMode.PERSISTENT, 2, 1000*10);
		this.producer.send(destination, map2, DeliveryMode.PERSISTENT, 9, 1000*10);

		//提交事务
        this.session.commit();
        //关闭连接
        this.connection.close();
	}

	public static void main(String[] args) throws Exception {
		Producer p = new Producer();
		p.send();
	}

}




public class Comsumer {

	// private final String selector_0 = "age>30";
	// 消息过滤的不是消息本身,而是过滤消息附带的某些属性
	private final String selector_1 = "sex='w'";

	// 建立connectionFactory工厂对象
	private ActiveMQConnectionFactory connectionFactory;
	// 连接对象
	private Connection connection;
	// session对象
	private Session session;
	// 生产者
	private MessageConsumer consumer;
	// 目标地址
	private Destination destination;

	public Comsumer() {
		this.connectionFactory = new ActiveMQConnectionFactory();
		try {
			this.connection = connectionFactory.createConnection("fu", "fu");
			this.connection.start();
			this.session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
			this.destination = this.session.createQueue("first");
			// 消息过滤的不是消息本身,而是过滤消息附带的某些属性
			this.consumer = session.createConsumer(destination, selector_1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void recever() throws Exception {
		// 消息异步接收:当消息到达时,ActiveMQ主动通知消费端,可以注册一个MessageListener类实现onMessage方法,监听MQ送达消息
		this.consumer.setMessageListener(new Listener());

	}

	class Listener implements MessageListener {

		public void onMessage(Message message) {
			try {
				if (message instanceof TextMessage) {

				} else if (message instanceof MapMessage) {
					MapMessage m = (MapMessage) message;
					System.out.println(m.toString());
					System.out.println(m.getString("name"));
					System.out.println(m.getString("age"));
					// 手工签收消息
					m.acknowledge();
				}

			} catch (JMSException e) {
				e.printStackTrace();
			}
		}

	}

	public static void main(String[] args) throws Exception {
		Comsumer c = new Comsumer();
		c.recever();
	}
}


   
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics