`

rabbitmq 学习-12- 发送接收消息示例-1

阅读更多

这里是同步发送消息,异步接收消息
接收有两种方式:http://www.rabbitmq.com/api-guide.html#getting

Retrieving individual messages(channel.basicGet)

To retrieve individual messages, use Channel.basicGet. The returned value is an instance of GetResponse, from which the header information (properties) and message body can be extracted

Retrieving messages by subscription(Consumer)

import com.rabbitmq.client.Consumer; import com.rabbitmq.client.QueueingConsumer;

Another way to receive messages is to set up a subscription using the Consumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively.

The easiest and safest supplied implementation of Consumer is the QueueingConsumer convenience class:


TestBase.java -- 基类
/**
 * @author sunjun
 * @createtime 2010-4-27 下午03:13:27
 */
public class TestBase implements ShutdownListener {

    private static Connection connection;

    static {
        ConnectionParameters parameters = new ConnectionParameters();
        parameters.setUsername("guest");
        parameters.setPassword("guest");
        parameters.setVirtualHost("/");
        // 默认情况下使用用户guest/guest,vhost=/
        ConnectionFactory factory = new ConnectionFactory(parameters);
        try {
            // 获取connection
            connection = factory.newConnection("192.168.18.24", AMQP.PROTOCOL.PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected Channel getChannel() throws IOException {
        return connection.createChannel();
    }

    protected void close(Channel channel) {
        try {
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected void printBasicProperties(BasicProperties properties) {
        System.out.println("properties.ContentEncoding: " + properties.getContentEncoding());
        System.out.println("properties.content type: " + properties.getContentType());
        System.out.println("properties.Expiration: " + properties.getExpiration());
        System.out.println("properties.type: " + properties.getType());
        System.out.println("properties.reply to: " + properties.getReplyTo());
        System.out.println("properties.appId: " + properties.getAppId());
        System.out.println("properties.classId: " + properties.getClassId());
        System.out.println("properties.className: " + properties.getClassName());
        System.out.println("properties.clusterId: " + properties.getClusterId());
    }

    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        System.out.println("-----------shutdown information----------");
        Object reason = cause.getReason();
        System.out.println(reason);
        Object reference = cause.getReference();
        System.out.println(reference);
        System.out.println(cause.isHardError());
        System.out.println(cause.isInitiatedByApplication());
    }
}



TestSender.java
/**
 * 发送消息
 *
 * @author sunjun
 * @createtime 2010-4-27 上午11:21:05
 */
public class TestSender extends TestBase {

    /**
     * 使用channel类basicPublish发送
     *
     * @throws IOException
     */
    public void sendMsg() throws IOException {
        // 创建一个channel
        Channel channel = getChannel();

        // 设置return listener,处理不能发送或未被送达的消息,如果发送时设置 强制发送 或 立即发送 ,但是消息没有被接收到,就会回调return listener
        // Handling unroutable or undelivered messages
        // If a message is published with the "mandatory" or "immediate" flags set, but cannot be
        // delivered, the broker will return it to the sending client (via a AMQP.Basic.Return
        // command).
        channel.setReturnListener(new ReturnListener() {

            @Override
            public void handleBasicReturn(int replyCode, String replyText, String exchange,
                    String routingKey, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("-----------get exception reply message---------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("ContentEncoding: " + properties.getContentEncoding());
                System.out.println("content type: " + properties.getContentType());
                System.out.println("Expiration: " + properties.getExpiration());
                System.out.println("type: " + properties.getType());
                System.out.println("reply to: " + properties.getReplyTo());
                System.out.println("body: " + new String(body));
            }

        });

        // 定义一个由rabbitmq server自动生成queueName的queue, autodelete, non-durable
        // DeclareOk queueDeclare = channel.queueDeclare();
        // String queue = queueDeclare.getQueue();
        // System.out.println(queue);

        // 定义一个queue,non-autodelete, non-durable(即rabbitmq重启后会消失)
        String queueName = "test.queue";
        DeclareOk queueDeclare = channel.queueDeclare(queueName);

        // 定义一个queue,durable(即rabbitmq重启后也不会消失)
        // String queue = "test.queue";
        // DeclareOk queueDeclare = channel.queueDeclare(queue ,true);
        // qeuue名称,也就是test.queue
        System.out.println("queueName: " + queueDeclare.getQueue());
        // 消费者数量
        System.out.println("ConsumerCount: " + queueDeclare.getConsumerCount());
        // 未读取的消息数量
        System.out.println("MessageCount: " + queueDeclare.getMessageCount());

        // 定义一个exchange
        String exchange = "test.exchange";
        channel.exchangeDeclare(exchange, "direct");

        // bind queue到exchange上
        String routingKey = "test.routingkey";
        channel.queueBind(queueName, exchange, routingKey);

        // 发送一个非持久化文本消息
        String msg = "hello world!";
        // 异步发送消息,不用等待去接收消费者的回复消息
        channel.basicPublish(exchange, routingKey, MessageProperties.TEXT_PLAIN, msg.getBytes());

        System.out.println("send message success.");

        // close
        close(channel);
    }

    /**
     * main
     *
     * @param args
     */
    public static void main(String[] args) {
        try {
            new TestSender().sendMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}



TestReceiver.java
/**
 * 接收消息,使用channel.basicGet接收消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        boolean noAck = true;
        // 获取到消息的计数器
        int counter = 0;
        // 循环从queue(test.queue)中获取消息
        while (true) {
            GetResponse response = channel.basicGet(queue, noAck);
            if (response != null) {
                System.out.println("----------- receive message " + (++counter) + "-----------");
                System.out.println("Message : " + new String(response.getBody()));

                Envelope envelope = response.getEnvelope();
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                long deliveryTag = envelope.getDeliveryTag();

                BasicProperties properties = response.getProps();
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("MessageCount: " + response.getMessageCount());
                System.out.println("deliveryTag: " + deliveryTag);
                printBasicProperties(properties);

                System.out.println("receive success.\n\n");

                if (!noAck) // 发送回复
                    channel.basicAck(deliveryTag, false);
            }
        }
    }

    /**
     * main
     *
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        new TestReceiver().receive();
    }
}



TestReceiver2.java
/**
 * 接收消息,使用Consumer(QueueingConsumer)来取消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver2 extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        int counter = 0;
        boolean noAck = false;

        // 定义一个消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        String consumerTag = channel.basicConsume(queue, noAck,
                queueingConsumer);

        System.out.println("receive message started.");

        // 循环取消息
        while (true) {
            Delivery nextDelivery = null;
            try {
                // 取消息
                nextDelivery = queueingConsumer.nextDelivery();
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (nextDelivery != null) {
                System.out.println("----------- receive message " + (++counter)
                        + "-----------");
                System.out.println("Message : "
                        + new String(nextDelivery.getBody()));

                Envelope envelope = nextDelivery.getEnvelope();
                String exchange = envelope.getExchange();
                String routingKey = envelope.getRoutingKey();
                long deliveryTag = envelope.getDeliveryTag();

                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("deliveryTag: " + deliveryTag);

                BasicProperties properties = nextDelivery.getProperties();
                printBasicProperties(properties);

                System.out.println("receive success.\n\n");

                if (!noAck)
                    channel.basicAck(deliveryTag, false);

                // channel.basicCancel(queueingConsumer.getConsumerTag());
                // System.out.println("cancel consumer success.");

                break;
            }
        }
    }

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        TestReceiver2 test = new TestReceiver2();
        test.receive();
    }
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics