在实际业务场景中,有许多要用到延时消息或消费的功能,最常见的是:下订单后,半小时或指定时间段内如果没有付款,就取消订单。如果使用定时任务轮询的话,不太合适,一来定时任务有一个时间间隔,同时也会导致单线程消息的速度跟不上。
对于这类,常用的解决方案如下:
- 定时任务轮询
优点:简单方便,实现快速,如果使用得当,可支持分支式集群环境
缺点:轮询的时间间隔及排除处理的方式,会导致触发不及时,而且在量非常大的场景,表现越差
实现代码:略
- 阻塞队列:DelayQueue
优点:JDK自带,稳定可靠
缺点:考虑复杂场景时,实现较为复杂,另外,如果消息量较多时,内存成为瓶颈,分布式环境实现也会比较复杂。
核心代码:略
- 时间轮:HashedWheelTimer
优点:Netty自带功能,稳定可靠
缺点:同上DelayQueue。
核心代码:
import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; public class HashedWheelTimerTest { static class MyTimerTask implements TimerTask { boolean flag; public MyTimerTask(boolean flag) { this.flag = flag; } public void run(Timeout timeout) throws Exception { System.out.println("要去数据库删除订单了。。。。"); this.flag = false; } } public static void main(String[] argv) { MyTimerTask timerTask = new MyTimerTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); timer.newTimeout(timerTask, 12, TimeUnit.SECONDS); int i = 1; while (timerTask.flag) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(i + "秒过去了"); i++; } timer.stop(); } }
- MQ延时消费:RocketMQ/RabbitMQ等
优点:使用MQ自带的延时功能,方便可靠。
缺点:要搭建并维护MQ体系。
核心代码:不同MQ的具体代码不一样,略。
- Redis队列消费
PS:以下redis的相关代码基于redisson,如果使用jedis或lettuce,逻辑和原理相同,只是调用的组件接口不一样。
自实现redis延时队列
优点:基于redis,稳定性的性能可以达到一个比较好的平稳
缺点:要考虑特殊情况(如redis消息丢失等),并自己实现相应的消息收发逻辑。
核心代码:
import org.redisson.Redisson; import org.redisson.api.RScoredSortedSet; import org.redisson.api.RedissonClient; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * 使用redis实现的延时队列 */ public class RedisDelayQueue { public static final String EXPIRED_QUEUE_KEY = "queue:delay:list"; public static void main(String[] args) { DelayTaskProducer producer = new DelayTaskProducer(); long now = System.currentTimeMillis(); producer.produce("消息1:dataId=123", now + TimeUnit.SECONDS.toMillis(5)); producer.produce("消息2:dataId=45", now + TimeUnit.SECONDS.toMillis(15)); producer.produce("消息3:dataId=234", now + TimeUnit.SECONDS.toMillis(75)); producer.produce("消息4:dataId=534", now + TimeUnit.SECONDS.toMillis(55)); //创建多个消息实例 // for(int i=0;i<10;i++){ new DelayTaskConsumer().start(); // } } public static class DelayTaskProducer { /*** * 将要延时处理的消息放到redis中 * @param msg 具体消息内容 * @param expiredAt 指定在哪个时间点到期消费 */ public void produce(String msg, long expiredAt) { RedissonClient redissonClient = createClient(1); RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(EXPIRED_QUEUE_KEY); set.add(expiredAt, msg); } } public static class DelayTaskConsumer { private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public void start() { scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(), 1, 200, TimeUnit.MILLISECONDS); } } public static class DelayTaskHandler implements Runnable { @Override public void run() { RedissonClient redissonClient = createClient(1); RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(EXPIRED_QUEUE_KEY); Double firstScore = set.firstScore(); if (firstScore == null || firstScore > System.currentTimeMillis()) { return;//如果队列为空,则时间还没到,则不执行 } String firstMsg = set.takeFirst(); System.out.println("开始消费消息:" + firstMsg); } } protected static RedissonClient createClient(int db) { Config config = new Config(); config.setCodec(new StringCodec()); config.useSingleServer() .setAddress("redis://172.18.12.34:6379") .setPassword("beta1234") .setConnectionPoolSize(500) .setIdleConnectionTimeout(10000) .setTimeout(3000) .setConnectTimeout(30000) .setRetryAttempts(3) .setRetryInterval(1000) .setDnsMonitoringInterval(-1) .setPingConnectionInterval(10000) .setDatabase(db); return Redisson.create(config); } }
基于redis的key过期广播
优点:触发及时可靠,逻辑简单
缺点:要防止数据丢失后重新加载的情况;开启服务端相应广播事件队列;对redis存在性能消耗等。
核心代码:使用redis的key过期事件监听,要开启redis服务,具体开启方式见:https://www.iteye.com/blog/hellohank-2524409,消息收发的示例代码如下:
import com.alibaba.fastjson.JSON; import org.redisson.api.RBucket; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.StringCodec; import java.util.concurrent.TimeUnit; /** * 基于redis的key过期事件广播队列监听。所以,需要开启redis服务的这个功能。具体看:https://www.iteye.com/blog/hellohank-2524409 */ public class RedisDelayQueueWithExpiredListener extends RedisDelayQueue{ public static void main(String[] args) { int db = 2; RedissonClient redissonClient = createClient(db); RTopic topic = redissonClient.getTopic("__keyevent@" + db + "__:expired", new StringCodec()); topic.addListener(String.class, new MessageListener() { @Override public void onMessage(CharSequence channel, Object msg) { System.out.println("onMessage:" + channel + "; Thread: " + Thread.currentThread().toString()); System.out.println(msg); } }); String key = "test_expire_listen"; Object value = "val"; RBucket bucket = redissonClient.getBucket(key); bucket.set(JSON.toJSONString(value), 5, TimeUnit.SECONDS); bucket.expire(6,TimeUnit.SECONDS); } }
相关推荐
1. 什么是延时队列? 2. 如何实现一个高效的延时队列? 3. DelayQueue的实现原理 4. RabbitMQ实现延时队列的基本原理 5. Redis实现延时队列的基本原理 ...6. 时间轮(Time Wheel) ...7. 几种方案的对比
redis的sorted set实现延时队列
springboot整合RabbitMQ实现延时队列的两种方式 教程及源码。参考博客:https://blog.csdn.net/qq_29914837/article/details/94070677
更多免费相关资料《完整版》https://ke.qq.com/course/417774?flowToken=1013299 手把手教你实现分布式延时队列 1. 服务器事件分类以及定时事件如何处理 2 常见定时器实现以及如何选择 3. 分布式定时器该如何实现
rabbitmq延时队列和四种交换机模式下队列的简单实现,需要自己配置一下属性文件。
该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
主要介绍了一口气说出Java 6种延时队列的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
springboot+rabbitmq实现延时队列,包括消息发送和消费确认,消费者端使用策略模式处理业务
rabbitMq+erlang+延时队列插件完整安装包(正确的版本对应)
主要给大家介绍了java利用delayedQueue实现本地的延迟队列的相关资料,文中介绍的非常详细,相信对大家具有一定的参考价值,需要的朋友们下面来一起看看吧。
redis延时队列
整个延迟队列由4个部分组成: 1. JobPool用来存放所有Job的元信息。 2. DelayBucket是一组以时间为维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Id)。 3. Timer负责实时扫描各个Bucket,并将delay...
Serve 基于Swoole Server 编写的消息队列消费系统
由redis支持的优先级队列队列,为eggjs构建。
多层时间轮,可根据配置的时间轮大小参数以及插入任务的相对时间,动态地创建不同层次的时间轮实例(这里的多层时间轮采用了...引入了延时队列以减少空轮询,将时间轮的推进与任务的提交执行隔离开,提升模型的效率。
java使用DelayQueue延迟队列和Redis缓存实现订单自动取消功能
将整个Redis当做消息池,以kv形式存储消息 使用ZSET做优先队列,按照score维持优先级
使用RabbitMQ+延迟队列实现分布式事务的最终一致性方案,demo以典型的订单+库存系统为例
延时队列我在项目里是怎么实现的?.doc
主要介绍了JAVA 实现延迟队列的方法,文中讲解非常详细,供大家参考和学习,感兴趣的朋友可以了解下