`

rocketmq 发送/接收消息简单demo

阅读更多
rocketmq 发送/接收消息简单demo

(1)配置连接ip
producer.setNamesrvAddr("192.168.1.133:9876");

(2)发送消息 TestProducer.java
public class TestProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.1.133:9876");
        producer.start();
       
        long start=System.currentTimeMillis();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest1",
                        "TagA",
                        "key113",
                        ("Hello world "+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);


            } catch (Exception e) {
                e.printStackTrace();
            }
        long end=System.currentTimeMillis();
        System.out.println(end-start+"**");
        producer.shutdown();
    }
}

(3)接收消息(Push) ConsumerTest.java
public class ConsumerTest {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerGroupName_C");
        consumer.setNamesrvAddr("192.168.1.133:9876");
      
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest1", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                System.out.println(new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}



分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics