前言
在日常java开发过程中使用线程池一般都是通过Executors提供的静态方法创建线程池,但目前还没有提供使用DelayQueue(延迟队列)作为任务队列的线程池创建方法。在笔者另一篇博客中《DelayQueue--阅读源码从jdk开始》,有个场景需要使用DelayQueue实现定时的页面发布功能,在那次实现过程中使用DelayQueue的take方法获取到任务后再放入线程池,由于这里是串行take,如果在同一时刻有多个任务需要被执行,这时势必有有延迟,虽然延迟不多,但不是最佳实现方案。
通过前一篇对ThreadPoolExecutor总结(点这里),我们可以直接使用ThreadPoolExecutor的构造方法构造自定义的线程池,使用DelayQueue作为“任务队列”即可。
使用DelayQueue创建线程池
这个步骤很简单,只要理解了ThreadPoolExecutor构造方法的各个参数即可(对各个参数的详细讲解见上一篇文章):
DelayQueue queue = new DelayQueue<>();//延迟队列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,1000l, TimeUnit.MILLISECONDS,queue);
之后,只需调用ThreadPoolExecutor的execute提交任务即可。
创建延迟任务类
我们知道ThreadPoolExecutor的execute方法,需要一个实现了Runnable接口的对象,那么这个任务类必须是实现Runnable接口;并且最终这个对象要能放到DelayQueue中,这个任务类必须实现Delayed接口。最终这个任务类实现如下:
public class TaskInfo implements Delayed,Runnable { //任务id private int id; //业务类型 private int type; //业务数据 private String data; //执行时间 private long excuteTime; public TaskInfo(int id, int type, String data, long excuteTime) { this.id = id; this.type = type; this.data = data; this.excuteTime = TimeUnit.NANOSECONDS.convert(excuteTime, TimeUnit.MILLISECONDS)+System.nanoTime(); } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getType() { return type; } public void setType(int type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public long getExcuteTime() { return excuteTime; } public void setExcuteTime(long excuteTime) { this.excuteTime = excuteTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime- System.nanoTime() , TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { TaskInfo msg = (TaskInfo)o; return this.excuteTime>msg.excuteTime?1:( this.excuteTime<msg.excuteTime?-1:0); } @Override public void run() { System.out.println("run task:"+id); } }
初始化核心线程
上面已经创建好任务类了,也许大家会觉得直接new TaskInfo(),并且调用ThreadPoolExecutor的execute方法提交任务就行,如下:
//创建任务 TaskInfo t1 = new TaskInfo(1,1,"任务1",8000); TaskInfo t2 = new TaskInfo(2,2,"任务2",8000); //提交任务 threadPoolExecutor.execute(t1); threadPoolExecutor.execute(t2);
通过前一篇文章的分析,在线程池刚初始化时,由于核心线程数为0,此时执行execute提交任务,任务不会进入延迟队列,而是直接执行,就无法满足业务需求(任务被提前执行了)。正确做法是在线程初始化完成后,先调用prestartAllCoreThreads方法,先创建好核心线程,即:
threadPoolExecutor.prestartAllCoreThreads();
完成示例代码:
public class ThreadPoolExecutorTest { private static ExecutorService es = Executors.newFixedThreadPool(3);//3个线程的线程池 public static void main(String[] args){ DelayQueue queue = new DelayQueue<>();//延迟队列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,1000l, TimeUnit.MILLISECONDS,queue); threadPoolExecutor.prestartAllCoreThreads();//初始化核心线程 TaskInfo t1 = new TaskInfo(1,1,"任务1",8000); TaskInfo t2 = new TaskInfo(2,2,"任务2",8000); TaskInfo t3 = new TaskInfo(3,3,"任务3",9000); TaskInfo t4 = new TaskInfo(4,4,"任务4",5000); TaskInfo t5 = new TaskInfo(5,5,"任务5",5000); TaskInfo t6 = new TaskInfo(6,6,"任务6",6000); TaskInfo t7 = new TaskInfo(7,7,"任务7",7000); TaskInfo t8 = new TaskInfo(8,8,"任务8",10000); threadPoolExecutor.execute(t1); threadPoolExecutor.execute(t2); threadPoolExecutor.execute(t3); threadPoolExecutor.execute(t4); threadPoolExecutor.execute(t5); threadPoolExecutor.execute(t6); threadPoolExecutor.execute(t7); threadPoolExecutor.execute(t8); } }
执行main方法,可以发现任务是按时延迟执行的,而且如果在同一刻如果有多个任务需要执行,这时也可以利用线程池并行执行,进一步降低延迟。
另外大家也可以注释掉threadPoolExecutor.prestartAllCoreThreads();这句,验证下如果不初始化核心线程会有什么后果。
心灵鸡汤
有的程序员觉得整天实现一些简单的功能没有技术含量,如果你觉得某项工作没有技术含量,那只是你自己把它做得没有技术含量,认真的写好自己的每一行代码,不停的去完善,它就会成为有技术含量的工作。想想达芬奇画鸡蛋的故事。
摘自--《天星老师语录》
相关推荐
DelayQueue的使用以及注意事项,这里需要由BlockingQueue的基本知识,一般的Queue的使用方法poll(),take(),drainTo()和offer(),put()这些应该懂。
DelayQueue、Redis结合使延迟、定时任务使用 DelayQueue、Redis结合使延迟、定时任务使用 DelayQueue、Redis结合使延迟、定时任务使用 源代码下载
java使用DelayQueue延迟队列和Redis缓存实现订单自动取消功能
学习视频,可以丰富java知识。能够获得更多的专业技能
主要为大家详细介绍了Java多线程并发开发之DelayQueue使用示例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。这篇文章主要介绍了springboot执行延时任务-DelayQueue的使用,需要的朋友可以参考下
基于DelayQueue的简单的定时任务队列.zip Quick Start class Main { public static void main(String[] args) { // 初始化任务队列 JobScheduler scheduler = new JobScheduler("default"); // 向队列中提交任务...
local delayQueue implemented by JDK & two kinds of distributed delayQueue based redis 1. 基本介绍 RedisSynDelayQueue 基于redis,并发情况下会加分布式锁,单线程场景(syn=false)性能较好, 并发场景性能较...
具体代码参考
除了具有很好的并发性的Collections,java.util.concurrent还引入了其他一些预先构建的组件,它们可帮助您调整和执行多线程应用程序中的线程。
数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...
Agenda • Methodology and Process • Tools of Load Test • Tuning Components in the Software Stack > Operating System > Java Virtual Machine > Application Container > Application Architecture ...
延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列...3.每个业务都要维护一个自己的扫表逻辑。 当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似 延时队列能
4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 ...
4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque ...
延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10...
基于String构建自己的锁管理器、 Collections构建不可修改的集合对象、 CopyOnWriteArrayList的应用场景、 可堵塞队列的功能及行为、 使用API实现压力测试、 CyclicBarrier与Exchanger、 线程池的意义及自定义注意...
4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque ...
Java并发编程常见知识点源码集锦,涉及到对象锁,Executors多任务线程框架,线程池等... ConcurrentLinkedQueue、DelayQueue示例、自定义的线程拒绝策略、自定义线程池(使用有界队列)、自定义线程池(使用无界队列)。。。
4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque ...