项目中用到rocket mq的方式有多种,
第一种,严格按照时间消费的模式,这种模式需要用串行方式,生产者生产的时候,这时候生产者需要往特定的队列里有序push:
SendResult result = producer.send(msg, new MessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = arg.hashCode();
int index = id % mqs.size();
return mqs.get(index);
}
}, dataAsyncEvent.getDataType());
消费者也要按照顺序严格有序消费,用这个有序的监听者:
//同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
return ConsumeOrderlyStatus.CONSUME_SUCCESS;
}
public interface MessageListenerOrderly extends MessageListener {
/**
* 方法抛出异常等同于返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT<br>
* P.S: 建议应用不要抛出异常
*
* @param msgs
* msgs.size() >= 1<br>
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,默认消息数为1
* @param context
* @return
*/
public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeOrderlyContext context);
}
第二种:不追求时间顺序,只要把生产出来的事件全部消费完就可以。这种可以用并行的方式处理,效率高很多:
生产者:
SendResult result = producer.send(msg);
消费者:(用此接口处理)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
public interface MessageListenerConcurrently extends MessageListener {
/**
* 方法抛出异常等同于返回 ConsumeConcurrentlyStatus.RECONSUME_LATER<br>
* P.S: 建议应用不要抛出异常
*
* @param msgs
* msgs.size() >= 1<br>
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,默认消息数为1
* @param context
* @return
*/
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context);
}
http://blog.csdn.net/fangletian1981/article/details/20694381
相关推荐
Rocket MQ.xmind
Rocket MQ 用户指南 v3.0.4
云运维工程师从入门到精通,6个要点掌握Rocket MQ 原理,5步教程快速入门Rocket MQ ,100+常见问题排查精解
这个视频是龙果 的rocket mq视频,讲的非常不错。分为上下两个系列。直接用txt 打开后。里边是百度云资源
主要是针对消息中间件及其应用的介绍, rocketmq是阿里巴巴开源的一款分布式的消息中间件,是实现分布式系统中解耦、异步消息、流量销锋、日志处理等。
《Rocket MQ 使用排查指南》,110页,大咖团队经典提炼,内部教程
RocketIO高速串行传输原理与实现.pdf RocketIO 高速串行传输 FPGA xilinx
1. 用户发起海量秒杀请求到秒杀业务处理系统 2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 3. 下游的通知系统订阅消息队列 Rocke
demo示例
人肉,一步一步 手把手教你,因为国内目前还没有这样的文档。
一个基于Rocket MQ的不同数据库之间数据实时同步的平台 watcher为监视数据更新的windows service 配置文件: 1.\Config\producerconfig.json -- rocket mq的生产者配置,用来将从数据库查询出来的数据推送到mq以便...
rocket运维手册必看
包含了IBM的MQ初始化,发送,接收的工具类,方便极了,可直接放入到项目中。
rocketmq-all-4.3.2-bin-release和rocketmq-console
绍Xilinx公司的Virtex4 FX系列FPGA中用于解决高速串行互连问题的Rocket IO模块的基本工作原理,并通过开发板验证了该模块在高速数据传输中的可靠性。
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
Rocket
MQ各个版本与java JDK各个版本和IBM jar包各个版本的混合测试结果。以及对不同通道SSL加密套件的支持。
ROCKET
rocket是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性: 1、支持发布/订阅和点对点消息模型 2、在同一队列中有严格的顺序传递(FIFO) 3、支持pull和push两种消息模式 4、单一队列百万级消息...