RocketMQ的安装部署请参考官网Quick Start
RocketMQ的简单应用请参考官网github样例
本篇介绍如何通过自定义selector实现按messageQueue定向发送和接收消息
我们先看看MessageQueueSelector接口
public interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); }
RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。
1、pom.xml引入rocketmq jar包
<!-- 引入rocketmq --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <!-- 提供常用的lang包工具类 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency>
2、MessageQueueSelector接口实现
package com.lh.rocketmq.selector; import java.util.List; import org.apache.commons.lang3.math.NumberUtils; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; /** * 通过调用 producer.send(msg, new SelectMessageQueueByExtOrg() , queueId)指定发送通道 * * @author lh * @since 2017-4-22 * @version 1.0.0 * */ public class SelectMessageQueueByExtOrg implements MessageQueueSelector { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(NumberUtils.toInt(arg.toString())); } }
3、producer通过自定义的MessageQueueSelector 发送消息
package com.lh.rocketmq.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; import com.lh.rocketmq.common.MqConst; import com.lh.rocketmq.selector.SelectMessageQueueByExtOrg; /** * producer通过自定义的MessageQueueSelector 发送消息 * @author lh * @since 2017-4-23 * */ public class ProducerByExtOrgSelector { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr(MqConst.NAME_SRV_ADDR); try { producer.start(); int queueId = 0; for (int i = 0; i < 16000; i++) { queueId = i % 4; Message msg = new Message(MqConst.TOPIC_NAME, MqConst.TAG_PUSH + queueId, "key" + i, ("hello rocketmq " + i).getBytes()); SendResult result = producer.send(msg, new SelectMessageQueueByExtOrg(), queueId); System.out.println("offset=" + result.getQueueOffset() + ", msgId=" + result.getMsgId() + ", sendStatus=" + result.getSendStatus()); } } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
mq常量类
package com.lh.rocketmq.common; /** * mq常量类 * @author lh * */ public class MqConst { /** * 服务地址 */ public static final String NAME_SRV_ADDR = "192.168.191.130:9876"; /** * 主题名称 */ public static final String TOPIC_NAME = "rocketmq-simple-demo"; /** * broker名称 */ public static final String BROKER_NAME = "localhost.localdomain"; /** * tag */ public static final String TAG_PUSH = "push"; /** * 消息定向queueId * 对应Message.getUserProperty(MqConst.MESSAGE_KEY_QUEUE_ID) */ public static final String MESSAGE_KEY_QUEUE_ID="queueId"; /** * 测试队列和tag相同的标识 */ public static final int TARGET_QUEUEID_TAG= 0; }
4、通过指定的messageQueue拉取消息
package com.lh.rocketmq.consumer; import java.nio.charset.Charset; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; import com.lh.rocketmq.common.MqConst; /** * 通过指定的messageQueue拉取消息 * @author lh * @since 2017-04-23 * */ public class PullConsumerByQueueId { public static void main(String[] args) { long startTime = System.currentTimeMillis(); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer"); consumer.setNamesrvAddr(MqConst.NAME_SRV_ADDR); long offset = 0; long maxOffset = offset; try { consumer.start(); MessageQueue mq = new MessageQueue(MqConst.TOPIC_NAME, MqConst.BROKER_NAME, MqConst.TARGET_QUEUEID_TAG); do{ PullResult result = consumer.pullBlockIfNotFound(mq, null, offset, 32); List<MessageExt> msgs = result.getMsgFoundList(); if (msgs != null && msgs.size() != 0) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody(), Charset.forName("utf-8"))); } } offset = result.getNextBeginOffset(); maxOffset = result.getMaxOffset(); System.out.println("offset="+offset+", status="+result.getPullStatus()); }while(offset < maxOffset); } catch (Exception e) { e.printStackTrace(); }finally{ consumer.shutdown(); long endTime = System.currentTimeMillis(); System.out.println("PullConsumerByQueueId\t take times="+ (endTime - startTime)); } } }
附近为工程代码,有需要的同学请自行下载
相关推荐
Android自定义矩形及selector、shape的使用Android自定义矩形及selector、shape的使用
This is a change Background Or TextColor Selector support library, with which you can directly specify the Background to be displayed in different states or TextColor Layout xml, such as clicking the ...
如何自定义CheckBox的样式 1:首先在布局文件中添加CheckBox的控件配置,如: android:id="@+id/button1" style="@style/CheckBoxStyles"//这里就是用户可以自定...以上三步之后,实现CheckBox的样式自定义
自定义shape文件的简单实用,包括不同active状态下的使用selector来绘制
代码实现drawable的selector效果,不用为每个控件写selector样式; 另外,实现圆形图片处理、圆角图片处理功能
ListView Button ImageView 里应用selector选择器切换图片并保持住
在插件使用方面,file_selector插件可以方便地实现文件选择功能,支持多选和所有类型的文件,并且在选择文件后可以获取到文件的路径和其他信息。 在使用file_selector插件时,需要注意以下几点: 在Android系统中...
开发过程中使用阿里巴巴的iconfont来减小apk大小,这是做的一个Demo,里面可以用来实现selector的状态改变,非常好用的,希望对感兴趣的朋友有帮助。
Selector
在simulink上进行BusCreator和BusSelector的简单实验
基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。...Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。
支持使用selector的自定义RadioButton
本文实例讲述了Android编程使用自定义shape实现shadow阴影效果的方法。分享给大家供大家参考,具体如下: 直接上xml文件, 并且附上相应的解析: <?xml version=1.0 encoding=utf-8?> <selector xmlns:android...
用selector设置button可用和不可用的样式
一个强大的selector注入器,它可以让view自动产生selector状态,免去了你写selector的麻烦。
NULL 博文链接:https://bijian1013.iteye.com/blog/2317341
Android selector 完整demo
selenium css selector 定位详解
如何自定义seekBar的样式 应用中自定seekbar: 1:首先在布局文件中添加seekbar控件 android:layout_width="300px" android:layout_height="wrap_content" android:max="100" android:progress="0" ...