`
liuInsect
  • 浏览: 132004 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

FutureTask 源码解析

阅读更多
为了更好的提供文章,我已经将博客迁移到了自建的博客网站上,我将更多的从源码分析的角度入手,为大家带来更多的深度文章,请大家继续关注我~!  博客地址:www.liuinsect.com
_______________________________________________________________________________

 站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:
1. 线程执行结果带有返回值
2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回。
 
那,怎么实现future这种超时控制呢?来看看代码:


 
FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,这个类承担了所有future的功能,AbstractQueuedSynchronizer的作者是大名鼎鼎的并发编程大师Doug Lea,它的作用远远不止实现一个Future这么简单,后面在说。
 
下面,我们从一个future提交到线程池开始,直到future超时或者执行结束来看看future都做了些什么。怎么做的。
首先,向线程池ThreadPoolExecutor提交一个future:

 ThreadPoolExecutor将提交的任务用FutureTask包装一下:


 

 
然后尝试将包装后的Future用Thread类包装下后启动,

红色标记的地方表示,当当前线程池的大小小于corePoolSize时,将任务提交,否则将该任务加入到workQueue中去,如果workQueue装满了,则尝试在线程数小于MaxPoolSize的条件下提交该任务。


 
顺便说明下,我们使用线程池时,常常看到有关有界队列,无界队列作为工作队列的字眼:使用无界队列时,线程池的大小永远不大于corePoolSize,使用有界队列时的maxPoolSize才有效,原因就在这里,如果是
无界队列,红框中的add永远为true 下方的addIfUnderMaximumPoolSize怎么也走不到了,也就不会有线程数量大于MaxPoolSize的情况
 
言归正传,看看addIfUnderCorePoolSize 中做了什么事:
new了一个Thread,将我们提交的任务包装下后就直接启动了


 
我们知道,线程的start方法会调用我们runnable接口的run方法,因此不难猜测FutureTask也是实现了Runnable接口的


 

 
 FutureTask的run()方法中是这么写:

 
innerRun方法先使用原子方式更改了一下自己的一个标志位state(用于标示任务的执行情况)
然后红色框的方法 实现回调函数call的调用,并且将返回值作为参数传递下去,放置在一个叫做result的泛型变量中,
然后future只管等待一段时间后去拿result这个变量的值就可以了。   至于怎么实现的“等待一段时间再去拿” 后面马上说明。


 
innerSet在经过一系列的状态判断后,最终将V这个call方法返回的值赋值给了result



 
说到这里,我们知道,future是通过将call方法的返回值放在一个叫做result的变量中,经过一段时间的等待后再去拿出来返回就可以了。
怎么实现这个 “等一段时间”呢?
 
要从Sync的父类AbstractQueuedSynchronizer这个类说起:
 
我们知道AbstractQueuedSynchronizer 后者的中文名字叫做 同步器,顾名思义,是用来控制资源占用的一种方式。对于FutureTask来说,“资源”就是result,线程执行的结果。思路就是通过控制对result这个资源的访问来决定是否需要马上去取得result这个结果,当超时时间未到,或者线程未执行结束时,是不能去取result的。当线程正常执行结束后,一系列的标志位会被修改,并告诉等待future执行结果的各个线程,可以来获取result了。
 
这里会涉及到 独占锁和共享锁的概念。
 
独占锁:同一时间只有一个线程获取锁。再有线程尝试加锁,将失败。 典型例子 reentrantLock
共享锁:同一时间可以有多个线程获取锁。 典型例子,本例中的FutureTask
 
为什么说他们?因为Sync本质上就是想完成一个共享锁的功能,所以Sync继承了AbstractQueuedSynchronizer 所以Sync的方法使用的是AbstractQueuedSynchronizer的共享锁的API
 
首先,我们明白,future结束有两种状态:
     1. 线程正常执行完毕,通知等待结果的主线程对应于future.get()方法。
     2. 线程还未执行完毕,等待结果的主线程已经等不到了(超时),抛出一个TimeOutException后不再等待。对应于future.get(long timeout, TimeUnit unit)
 
下面我们依次看看对于这两种状态,我们是怎么处理的:
从上图中可以得知,线程在执行完毕后会将执行的结果放到result中, 红色框中同时提到了releaseShared 方法,我们从这里进入AbstractQueuedSynchronizer 
 


 
当result已经被赋值,或者FutureTask为cancel状态时,FutureTask会尝试去释放共享锁(可以同时有多个线程调用future.get() 方法,也就是会有多个线程在等待future执行结果,而furue在执行完毕后会依次唤醒各个线程)
如果尝试成功,则开始真正的释放锁,这里是AbstractQueuedSynchronizer 比较精妙的地方, “尝试”动作都定义为抽象方法,交个各个子类去定义“尝试成功的含义” 而真正的释放则自己实现,这种复杂规则交个子类,流程交给自己的思路很值得借鉴

 
再看FutureTask的 “尝试释放”的规则:
没啥好说,怎么尝试都成功。


 
接着AbstractQueuedSynchronizer 开始了真正的释放唤醒工作:
 private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases. This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;//把头元素取出来,保持头元素的引用,防止head被更改
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//如果状态位为:需要一个信号去唤醒  注释原话:/** waitStatus value to indicate successor's thread needs unparking */
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //修改状态位
                        continue; // loop to recheck cases
                    unparkSuccessor(h);//如果修改成功,则通过头元素找到一个线程,并且唤醒它(唤醒动作是通过JNI方法去调用的)
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }
 
循环遍历后,知道已经没有结点需要唤醒则返回,依次return后,future的run方法执行完毕。
 
 
以上是针对future线程的,我们知道,FutureTask已经将执行结果放在了result中,并且按等的先后顺序依唤醒了等待队列上的线程。
那,猜测future.get方法就不难了,对于带超时的get方法:最大的可能性就是不断的检查future的一个状态位,看它是否执行完毕,执行完则获取结果返回,否则,再阻塞自己一段时间。
对于不待超时的,就上来就先尝试获取结果,拿不到就阻塞自己,直到上述的innerSet方法唤醒它。
究竟是不是这样呢?一起来看看:



 



 
因为innerGet(long nanosTimeout) 和innerGet()流程大致相同,所以我们重点讲解innerGet(long nanosTimeout) ,在唯一一个有区别的地方说明下即可。
 
如下图所示,对于innerGet(long nanosTimeout) 方法,FutureTask采用的方法是直接加锁或者每隔一段时间尝试加锁,如果成功,则返回true,则如上图所示,直接返回result,主线程拿到执行结果。
否则,抛出超时异常。


 
对于tryAcquireShared 方法,比较简单,直接看future是否执行完毕


 
如果没有结束,则进入doAcquireSharedNanos方法:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {

        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.SHARED);//在队列尾部增加一个结点,我的理解是,用来标明这个队列是共享者队列还是独占队列
        try {
            for (;;) {
                final Node p = node.predecessor();//拿出刚才新增结点的前一个结点:实际有效的队尾结点。
                if (p == head) {
                    int r = tryAcquireShared(arg);//尝试获取锁。
                    if (r >= 0) {//
                        setHeadAndPropagate(node, r);//返回值大于1 对于FutureTask代表任务已经被cancel了,则更改队列头部结点。
                        p.next = null; // help GC  将p结点脱离队列,帮助GC
                        return true;//返回true后 上述中可以知道当前线成会抛出超时异常 确定下会不会唤醒其他节点?
                    }
                }
                if (nanosTimeout <= 0) { //如果设置的超时时间小于等于0 则取消获取锁
                    cancelAcquire(node);
                    return false;
                }
                if (nanosTimeout > spinForTimeoutThreshold && //等待的时间必须大于一个自旋锁的周期时间
                    shouldParkAfterFailedAcquire(p, node))   //  遍历队列,找到需要沉睡的第一个节点
                    LockSupport.parkNanos(this, nanosTimeout); // 调用JNI方法,沉睡当前线程
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;   // 更新等待时间  循环遍历
                lastTime = now;   
                if (Thread.interrupted())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }
 
这样通过AQS的协作,所有调用future.get(long timeout, TimeUnit unit)的线程都会按顺序等待,直到线成执行完被唤醒或者超时时间到 主动抛出异常。
 
总结
 
至此为止FutureTask的解析已经基本结束了,可以看到。它依靠AQS的共享锁实现了对线程执行结果的访问控制。和我们通常意义上的访问控制(并发访问某个资源,获取失败时,沉睡自己等待唤醒或者超时后返回)基本是一致的,不外乎维护了一个等待资源的列表。将等待资源的线程通过链表的方式串了起来。
 
当然AQS的功能远不仅如此,它还提供了一套独占锁的API,帮助使用者实现独占锁的功能。
最常用的Reentrantlock就是使用这套API做的。
有机会的话再和大家分享下它的实现。
  • 大小: 4.8 KB
  • 大小: 5.6 KB
  • 大小: 7 KB
  • 大小: 4.5 KB
  • 大小: 16.9 KB
  • 大小: 12.9 KB
  • 大小: 4.3 KB
  • 大小: 7.3 KB
  • 大小: 7.3 KB
  • 大小: 2 KB
  • 大小: 13 KB
  • 大小: 15.2 KB
  • 大小: 15.1 KB
  • 大小: 5.8 KB
  • 大小: 7.3 KB
  • 大小: 14.1 KB
  • 大小: 23.1 KB
  • 大小: 10.1 KB
  • 大小: 5.7 KB
0
0
分享到:
评论
1 楼 China2010pan 2015-07-27  

相关推荐

    FutureTask:FutureTask原始解析与重组-源码解析

    FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...

    编程狂人第十三期

    FutureTask 源码解析 C++:在堆上创建对象,还是在栈上? 技术纵横 UPYUN:用Erlang开发的对象存储系统 iOS 7最佳实践:一个天气App案例 看看百度招iOS开发面试的问题,你能答对多少? CoconutKit:iOS开发必备的...

    java多线程编程同步器Future和FutureTask解析及代码示例

    主要介绍了java多线程编程同步器Future和FutureTask解析及代码示例,对二者进行了详细介绍,分析了future的源码,最后展示了相关实例代码,具有一定参考价值 ,需要的朋友可以了解下。

    Java并发编程原理与实战

    CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 ...

    Android AsyncTask 完美解析 看不懂源码你就输了

    1.简介 android.os.AsyncTask,一个执行异步操作的类,我们可以使用它来处理后台任务,并且在UI线程中... 例如{@link Executor},{@ link ThreadPoolExecutor}和{@link FutureTask}。 2.基本使用 2.1 关键API andro

    龙果 java并发编程原理实战

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    Java 并发编程原理与实战视频

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    龙果java并发编程完整视频

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    java并发编程

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    java8集合源码分析-Notes:笔记

    集合源码分析 [TOC] 0. 项目构建 0.1 版本控制 0.1.1 Git 0.2 项目管理 0.2.1 Maven 0.2.2 Gradle 1.:hot_beverage: Java 1.1 Java基础 1.1.1 算法与数据结构 字符串KMP算法 BitSet解决数据重复和是否存在等问题 ...

    高并发之——深度解析ScheduledFutureTask类

    今天,我们就一起来手撕ScheduledFutureTask类的源码,来深入理解ScheduledFutureTask类的细节。 类的层级关系 从ScheduledFutureTask类的定义可以看出,ScheduledFutureTask类是ScheduledThreadPoolExecutor类的...

Global site tag (gtag.js) - Google Analytics