转载自 http://windows9834.blog.163.com/blog/static/2734500420131195224532/
官方文档:http://activemq.apache.org/redelivery-policy.html
在事务控制里抛出异常,txManager会进行rollback处理.(在activeMQ里,消息默认会redelivery到客户端6次,如果继续异常,消息会放到deadletter queue里(ActiveMQ.DLQ))。
在以下三种情况中,消息会被再次传送给消费者:
1.在使用事务的Session中,调用rollback()方法;
2.在使用事务的Session中,调用commit()方法之前就关闭了Session;
3.在Session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了recover()方法。
可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的再次传送策略。
当消息被再次投递到客户端时的详细信息在 Message Redelivery and DLQ Handling 章节能找到详细的说明。
你可以在ActiveMQConnectionFactory 或ActiveMQConnection类中配置RedeliveryPolicy属性,用来定义重传策略的具体细节。
您可以使用Java代码,Spring配置或配置URI字符串来定义重传策略。
可用的属性
属性 | 默认值 | 说明 |
collisionAvoidanceFactor | 0.15 | 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。 |
maximumRedeliveries | 6 | 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。 |
maximumRedeliveryDelay | -1 | 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
initialRedeliveryDelay | 1000L | 初始重发延迟时间 |
redeliveryDelay | 1000L | 重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4) |
useCollisionAvoidance | false | 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。 |
useExponentialBackOff | false | 启用指数倍数递增的方式增加延迟时间。 |
backOffMultiplier | 5 | 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。 |
官方文档有些参数功能说得不是很清楚,如collisionAvoidanceFactor,那只能查看源代码了,
可以查看org.apache.activemq.ActiveMQMessageConsumer类中的rollback()方法:
publicvoid rollback()throwsJMSException{
......
finalint currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
if(currentRedeliveryCount >0){
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}else{
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
}
......
}
先获取重发次数,如果是第一次重发则延迟时间为initialRedeliveryDelay,如果不是第一次重发则延迟时间为redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);这里的redeliveryDelay参数是前一次重发的延迟时间。
publiclong getNextRedeliveryDelay(long previousDelay){
long nextDelay;
if(previousDelay ==0){
nextDelay = redeliveryDelay;
}elseif(useExponentialBackOff && backOffMultiplier >1){
nextDelay =(long)(previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay !=-1&& nextDelay > maximumRedeliveryDelay){
// in case the user made max redelivery delay less than redelivery
delay for some reason.
nextDelay =Math.max(maximumRedeliveryDelay, redeliveryDelay);
}
}else{
nextDelay = previousDelay;
}
if(useCollisionAvoidance){
/*
* First random determines +/-, second random determines how far to
* go in that direction. -cgs
*/
Random random = getRandomNumberGenerator();
double variance =(random.nextBoolean()? collisionAvoidanceFactor :
-collisionAvoidanceFactor)* random.nextDouble();
nextDelay += nextDelay * variance;
}
return nextDelay;
}
计算下一次重发延迟时间时,当启用指数倍数增长方式useExponentialBackOff=true,且增长倍数backOffMultiplier > 1时,下一次延迟时间为previousDelay * backOffMultiplier,这里的previousDelay为前一次延迟重发的时间。
collisionAvoidanceFactor这个参数spring中是不能直接注入的,注入方法为:
publicvoid setCollisionAvoidancePercent(short collisionAvoidancePercent){
this.collisionAvoidanceFactor = collisionAvoidancePercent *0.01d;
}
这里的collisionAvoidancePercent是个短整形,比如15,则方法直接计算为15%。在spring中注入方式为:
<propertyname="collisionAvoidancePercent"value="15"/>
如果useCollisionAvoidance=true,则nextDelay += nextDelay * variance;这里的variance是一个随机的正负值,Random.nextDouble()的范围是:0.0d到1.0d,当redeliveryDelay=1000L,collisionAvoidanceFactor值为0.15时,那么正负variance的范围就是:-0.15到+0.15,那么最后nextDelay的值范围就是:1000+1000*-15%~1000+1000*15%,即在850~1150范围内波动。
相关推荐
springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
ActiveMQ(包括消息生成端和andorid消息接受端),实现了点对点的消息推送,和广播消息推送,当然离线推送也实现了。
ActiveMQ队列消息过期时间设置和自动清除解决方案.docx
作为消息中间件的MQ在java开发中起着举足轻重的地位,无论是ActiveMQ、RabbitMQ、还是RokcetMQ至少要会一个,否则别说自己是java程序员。Java自学网整理了目前行业最常用的消息中间件视频供大家学习。
详细描述了ActiveMQ消息过期-时间设置和自动清除解决方案。
activeMQ的发送消息后接收者返回信息
用C#实现的ActiveMQ发布/订阅消息传送源程序
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
springboot整合ActiveMQ源码,适合范围消息队列入门小伙伴,对ActiveMQ消息队列不太了解,不知道如何发送消息,接收消息可以围观。
SpringBoot+ActiveMq+MQTT实现消息的发送和接收 后台消费者、生产者、消息发送接口、发送消息业务类等相关配置
springboot集成activemq实现消息接收demo
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
基于ActiveMQ实现简单的消息收发案例 ,详情请参见博文:http://blog.csdn.net/l1028386804/article/details/68969450
本篇文章记录centos6下ActiveMQ+Zookeeper消息中间件集群-完整部署过程,讲解十分详细,本人线上实操手册。在此分享出来,希望能帮助到有用到的朋友。
公司做了一个项目,由于JTREE面临着近40万的数据量。虽然初始化这些数据问题...每次树要加栽的时候先显示“正在加载...”,当结构从ActiveMQ进行返回的时候,再重新渲染此树。例子我打包传到我的资源里面。需要的下
本代码关于activemq-cpp的核心代码参考的chenxun2009的博客园,其他部分包括:从配置文件中读取消息通道,过滤条件等信息。
整合Spring + ActiveMQ 的朋友可以下载看一下 简单易懂
activeMQ的测试工具,用于发送和接收activeMQ消息,jar包形式的,安装完jdk之后用java -jar xxx.jar命令运行
SpringBoot快速玩转ActiveMQ消息队列,jdk8下的简要版介绍。