`

activemq-queue-selector

阅读更多
序上一篇:http://wangxinchun.iteye.com/blog/2145958
对于consumer而言,receive消息的时候,可以根据关键字筛选自己感兴趣的消息,当然可以定义不同的Destination 队列来承载这些消息,但是一般情况下,同一个队列可以保存一个主题的消息,减少队列的个数,为此新增了consumer的receive时的过滤功能。
case如下:

消息消费者:
/**
  通过改动intended参数,我们分别获取到了50条消息
*/
public class Consumer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final long TIMEOUT = 20000;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            //通过改动intended参数,我们分别获取到了50条消息
            MessageConsumer consumer = session.createConsumer(destination, "intended = 'you'");
            int i = 0;
            while (true) {
                Message message = consumer.receive(TIMEOUT);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        System.out.println("Got " + i++ + ". message: " + text);
                    }
                } else {
                    break;
                }
            }

            consumer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }
}



消息生产者:
/**
 * 此producer 对队列test-queue 发送消息,设置intended 属性,值分别为me you
 */
public class Producer {

    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final int NUM_MESSAGES_TO_SEND = 100;
    private static final long DELAY = 100;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;

        try {

            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
                TextMessage message = session.createTextMessage("Message #" + i);
                System.out.println("Sending message #" + i);
                if (i % 2 == 0) {
                    System.out.println("Sending to me");
                    message.setStringProperty("intended", "me");
                } else {
                    System.out.println("Sending to you");
                    message.setStringProperty("intended", "you");
                }
                producer.send(message);
                Thread.sleep(DELAY);
            }

            producer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }

}


验证:通过Producer 发送了100 条消息。
通过Consumer 设置intended 为me 收到50 条消息,通过Consumer设置intended  为you 收到50条消息,符合预期~
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics