- 浏览: 97695 次
- 性别:
- 来自: 北京
最新评论
-
LUCKYZHOUSTAR:
牛逼,楼主怎么学习的呢
Jedis 与 ShardedJedis 设计 -
demoxshiroki:
总结的不错
ThreadPoolExecutor机制 -
bluky999:
bluky999 写道opentan 写道zhangfeikr ...
Jedis 与 ShardedJedis 设计 -
bluky999:
opentan 写道zhangfeikr 写道使用Sharde ...
Jedis 与 ShardedJedis 设计 -
opentan:
zhangfeikr 写道使用ShardedJedis,一台挂 ...
Jedis 与 ShardedJedis 设计
ScheduledThreadPoolExecutor实现:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /** * False if should cancel/suppress periodic tasks on shutdown. */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * False if should cancel non-periodic tasks on shutdown. */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * Creates a new ScheduledThreadPoolExecutor with the given core * pool size. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if threadFactory is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if handler is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } }
ScheduledThreadPoolExecutor构造参数:
- corePoolSize: 任务执行线程池大小
- RejectedExecutionHandler: 任务拒绝策略,当线程池shutdown时,任务处理策略
- DelayedWorkQueue: 无界延迟队列,提交任务都加入队列中,由队列实现延迟执行功能
- MaximumPoolSize: 由于DelayedWorkQueue为无界队列,所以该值没有意义
提交任务:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); schedule(command, 0, TimeUnit.NANOSECONDS); } // Override AbstractExecutorService methods public Future<?> submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, TimeUnit.NANOSECONDS); }
ExecutorService的任务提交方式都有schedule(task,0,TimeUnit. NANOSECONDS)以延迟时间为0任务实现
任务计划执行方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Boolean>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay))); delayedExecute(t); return t; }
任务计划执行步骤:
- 封装任务:decorateTask()方法封装后,返回RunnableScheduledFuture
- 延迟执行:delayedExecute()延迟任务执行
/** * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. */ private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
延迟执行步骤:
- 线程池关闭(runState!=RUNNING),由拒绝策略决定
- 否则,线程池小于核心线程池大小,启动新线程
- 添加任务到等待队列
ScheduledThreadPoolExecutor线程池关闭
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. If the * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has * been set <tt>false</tt>, existing delayed tasks whose delays * have not yet elapsed are cancelled. And unless the * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has * been set <tt>true</tt>, future executions of existing periodic * tasks will be cancelled. */ public void shutdown() { cancelUnwantedTasks(); super.shutdown(); } /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution. Each * element of this list is a {@link ScheduledFuture}, * including those tasks submitted using <tt>execute</tt>, which * are for scheduling purposes used as the basis of a zero-delay * <tt>ScheduledFuture</tt>. * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { return super.shutdownNow(); }
shutdownNow()方式与ThreadPoolExecutor一样
shutdown()方法:不在接收新提交任务
- 先调用cancelUnwantedTasks(),决定未执行任务时候继续执行
- 调用父类ThreadPoolExecutor方法
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. */ private void cancelUnwantedTasks() { boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) super.getQueue().clear(); else if (keepDelayed || keepPeriodic) { Object[] entries = super.getQueue().toArray(); for (int i = 0; i < entries.length; ++i) { Object e = entries[i]; if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if (t.isPeriodic()? !keepPeriodic : !keepDelayed) t.cancel(false); } } entries = null; purge(); } }
等待队列中任务处理取决于:
- executeExistingDelayedTasksAfterShutdown : 执行已经存在与延迟队列任务
- continueExistingPeriodicTasksAfterShutdown : 继续执行周期任务
ScheduledFutureTask任务实现:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ private final long period; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } //... }
任务继承自FutureTask,同时实现了RunnableScheduledFuture接口(任务延迟执行时间getDelay()/任务周期之执行isPeriodic())
实例变量:
- time:任务延迟执行时间
- period:任务重复执行周期;正值:固定频率执行;负值:定时延迟执行;零: 不重复执行
任务执行:
public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); }
非周期任务:直接调用FutureTask.run()方法执行
周期任务:调用runPeriodic()周期执行
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); }
runPeriodic实现:
- 首先调用runAndReset(),保证任务重复执行(任务执行后,重置state=0)
- 根据runAndReset返回状态,以及线程池是否关闭或者线程池shutdown且getContinueExistingPeriodicTasksAfterShutdownPolicy()是否继续执行已有中期任务策略;将任务放入任务队列
- 否则,若是最后延迟任务,则唤醒空闲线程检查
DelayedWorkQueue实现:
包装了DelayQueue作为实现
发表评论
-
FutureTask实现分析
2012-11-15 22:27 1399FutureTask实现一个可以取消的异步计算任务。 ... -
ExecutorCompletionService实现解析
2012-11-15 22:26 1267ExecutorCompletionService解耦异步任务 ... -
ThreadPoolExecutor分析
2012-11-15 22:26 1607ThreadPoolExecutor 状态: RUNN ... -
ThreadPoolExecutor机制
2012-04-28 09:38 3457ThreadPoolExecutor作为java.util.c ... -
java concurrent包中任务执行框架分析
2012-04-26 22:43 2768提到java并发编程必然绕不过java的线程和任务接口;那么, ...
相关推荐
源码解析文件ScheduledThreadPoolExecutor
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
JDK1.8源码分析 相关的原始码分析结果会以注解的形式体现到原始码中 已完成部分: ReentrantLock CountDownLatch Semaphore HashMap TreeMap LinkedHashMap ConcurrentHashMap 执行器 ...
java线程类源码Java ScheduledThreadPoolExecutor演示 java.util.concurrent ScheduledThreadPoolExecutor作为java.util.Timer类的现代替代。
NULL 博文链接:https://cywhoyi.iteye.com/blog/1939040
主要介绍了java 定时器线程池(ScheduledThreadPoolExecutor),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
今天小编就为大家分享一篇关于Java自带定时任务ScheduledThreadPoolExecutor实现定时器和延时加载功能,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run...
Timers schedule one-shot or recurring TimerTask tasks for execution. Prefer java.util.concurrent.ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor for new code.
EJB at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
源码 Java8-Resources 一、Java线程池实现原理及应用(ThreadPoolExecutor) 1.使用线程池的好处 1.1. 降低资源消耗 通过池化技术重复利用已经创建的线程,降低线程创建和销毁造成的耗损 1.2.提高响应速度,任务到达...
18.一篇文章,从源码深入详解ThreadLocal内存泄漏问题 19.并发容器之BlockingQueue 20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之...
3. “要使用带有ThreadFactory参数的ScheduledThreadPoolExecutor构造方法哦,这样你就可以方便的设置线程名字啦 1. 添加
java游戏服务器中,需要使用到的定时器功能 现在要讲解的是一个quartz-all-1.8.6.jar另外一个是ScheduledThreadPoolExecutor。
系统关闭采用线程池中ScheduledThreadPoolExecutor来设置计划任务。 6、用户并发选课控制,采用异步处理,并且使用缓存层Redis记录相关信息,同时采用aop编程思想,在第一次请求选课接口时,加载redis中lua脚本文件...
今天,我们就一起来手撕ScheduledFutureTask类的源码,来深入理解ScheduledFutureTask类的细节。 类的层级关系 从ScheduledFutureTask类的定义可以看出,ScheduledFutureTask类是ScheduledThreadPoolExecutor类的...
Executor框架主要由3部分组成: ...Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。 异步计算的结果 。包括Future和实现Future的FutureTask类。
(所有Class父类)Reflect类(支持C++类反射)Exception类(各类异常)Thread类 (实现了线程的创建,退出)线程池类 (实现了ThreadExecutorPool,ThreadCachedPoolExecutor,ScheduledThreadPoolExecutor)线程锁...
monitor-thread 普通JDK自带的线程池时无法实现线程池的自动切换,基于监控与上下文自动切换的需求,封住了一套taxi开头的线程池,接...2)ScheduledThreadPoolExecutor<-------->WrapTaxiScheduledThreadPoolEx
在我了解的过程中发现java实现定时任务有四种,首先是jdk自带的两个Timer,ScheduledThreadPoolExecutor,后者是jdk1.5提出的,因为这个Timer毛病着实有点多,像什么单线程,出问题了其他任务也执