`

rabbitmq 学习-13- 发送接收消息示例-2

阅读更多

Basic RPC

As a programming convenience, the Java client API offers a class RpcClient which uses a temporary reply queue to provide simple RPC-style communication facilities via AMQP.

The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.

实现Rpc形式的发送与接收消息

/**
 * 使用RpcClient发送消息
 *
 * @author sunjun
 * @createtime 2010-4-27 上午11:21:05
 */
public class TestSender2 extends TestBase {

    /**
     * 发送一个消息
     *
     * @throws IOException
     */
    public void sendMsg() throws IOException {
        // 创建一个channel
        Channel channel = getChannel();

        // 设置return listener,用于监听回复(发生异常时)
        channel.setReturnListener(new ReturnListener() {

            @Override
            public void handleBasicReturn(int replyCode, String replyText,
                    String exchange, String routingKey,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out
                        .println("-----------get exception reply message---------");
                System.out.println("replyCode: " + replyCode);
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("ContentEncoding: "
                        + properties.getContentEncoding());
                System.out.println("content type: "
                        + properties.getContentType());
                System.out.println("Expiration: " + properties.getExpiration());
                System.out.println("type: " + properties.getType());
                System.out.println("reply to: " + properties.getReplyTo());
                System.out.println("body: " + new String(body));
            }

        });

        // 定义一个queue
        String queue = "test.queue";
        channel.queueDeclare(queue);

        // 定义一个exchange
        String exchange = "test.exchange";
        channel.exchangeDeclare(exchange, "direct");

        // bind queue到exchange上
        String routingKey = "test.routingkey";
        channel.queueBind(queue, exchange, routingKey);

        RpcClient rpcClient = new RpcClient(channel, exchange, routingKey);
        try {
            // 发送消息
            String msg = "hello world!";
            byte[] result = rpcClient.primitiveCall(msg.getBytes());
            System.out.println(new String(result));
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        }

        System.out.println("send message success.");

        // close
        // close(channel);
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        TestSender2 testSender = new TestSender2();
        try {
            testSender.sendMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

 


 

/**
 * 使用RpcServer接收消息,并发送回复消息
 *
 * @author sunjun
 * @createtime 2010-4-27 下午03:14:31
 */
public class TestReceiver3 extends TestBase {

    /**
     * 接收消息
     *
     * @throws IOException
     */
    public void receive() throws IOException {
        // 获取channel
        Channel channel = getChannel();

        // 定义一个queue,如果先启动receive端而又没有定义queue,会报错no queue 'test.queuea' in vhost
        String queue = "test.queue";
        channel.queueDeclare(queue);

        System.out.println("receive message started.");

        // 循环取消息
        while (true) {
            RpcServer rpcServer = new RpcServer(channel, queue) {
                @Override
                public byte[] handleCall(Delivery request,
                        BasicProperties replyProperties) {
                    // 接收消息
                    byte[] message = request.getBody();
                    // 发送返回消息
                    return super.handleCall(request, replyProperties);
                }
            };
        }

    }

    public static void main(String[] args) throws IOException {
        new TestReceiver3().receive();
    }
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics