`
gwj41
  • 浏览: 97744 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

用Docker 构建RabbitMQ shovel(动态)

阅读更多

<div class="iteye-blog-content-contain" style="font-size: 14px">

在有了构建RabbitMQ 静态Shovel的经验,构建动态Shevol有显得容易多了

首先和构建静态Shovel一样,先运行两个RabbitMQ节点rabbitmq_a和rabbitmq_b,然后给rabbitmq_a设置shevol:

docker exec rabbitmq_a bash -c "rabbitmqctl set_parameter shovel my-shovel '{\"src-uri\": \"amqp://guest:guest@172.17.0.2:5672/%2f\", \"src-queue\": \"my-queue\", \"dest-uri\": \"amqp://guest:guest@172.17.0.3:5672/%2f\", \"dest-queue\": \"another-queue\", \"ack-mode\": \"on-confirm\"}'"

注意需要用转义符"\"

Consumer类

public static void main( String[] args ) throws IOException {
        init();
        boolean autoAck = false;
        channel.basicConsume("another-queue", autoAck, "myConsumerTag",
                new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body)
                            throws IOException
                    {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        LOGGER.info("routingKey is:" + routingKey);
                        LOGGER.info("contentType is:" + contentType);
                        long deliveryTag = envelope.getDeliveryTag();
                        // (process the message components here ...)
                        LOGGER.info("content is:" + new String(body,"UTF-8"));
                        channel.basicAck(deliveryTag, false);
                    }
                });
    }

    public static void init() {
        factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("172.17.0.3");
        factory.setPort(5672);
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

 Producer

@Test
    public void direct() throws IOException {
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().contentType("text/plain").build();
        byte[] messageBodyBytes;
        for (int i = 0; i < 3; i++) {
            messageBodyBytes = ("Dynamic Shovel Test! No." + i).getBytes();
            channel.basicPublish("", "my-queue", properties, messageBodyBytes);
        }
    }

 

</div>

 

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics