`
Donald_Draper
  • 浏览: 950981 次
社区版块
存档分类
最新评论

FutureTask解析

    博客分类:
  • JUC
阅读更多
package java.util.concurrent;
import java.util.concurrent.locks.*;

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the <tt>get</tt>
 * method will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled.
 *
 FutureTask是一个可取消的异步计算任务,并提供了基于Future接口实现的开始和取消
计算任务,查看计算任务状态和在计算任务结束后获取结果的方法。计算任务的结果,只有
在计算任务完成时,才能取得,如果计算任务还没完成,将会阻塞。只要计算任务完成,
计算任务就不能被取消或重新启动。
 * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
 * {@link java.lang.Runnable} object.  Because <tt>FutureTask</tt>
 * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
 * submitted to an {@link Executor} for execution.
 *
FutureTask可用于包装Callable和Runnable对象。由于FutureTask实现了Runnable接口,
所有可以被调到执行器,执行。
 * <p>In addition to serving as a standalone class, this class provides
 * <tt>protected</tt> functionality that may be useful when creating
 * customized task classes.
 *
 当我们创建任务线程类时为单独的类(独立任务),FutureTask的protect功能方法也许有用。

 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
 */
public class FutureTask<V> implements RunnableFuture<V> {
    /** Synchronization control for FutureTask 用于控制FutureTask的同步器*/
    private final Sync sync;
        /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Callable</tt>.
     *
     创建一个FutureTask,在执行时,将会执行参数Callable
     * @param  callable the callable task
     * @throws NullPointerException if callable is null
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }
    /**
     * Creates a <tt>FutureTask</tt> that will, upon running, execute the
     * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
     * given result on successful completion.
     *
     直接通过Executors执行任务,并将结果保存到result中
     * @param runnable the runnable task
     * @param result the result to return on successful completion. If
     * you don't need a particular result, consider using
     * constructions of the form:
     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
     * @throws NullPointerException if runnable is null
     */
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }
}

FutureTask内部关联着一个同步器Sync,主要用于控制cancel,get等操作。
我们来看一下内部同步器Sync:
  /**
     * Synchronization control for FutureTask. Note that this must be
     * a non-static inner class in order to invoke the protected
     * <tt>done</tt> method. For clarity, all inner class support
     * methods are same as outer, prefixed with "inner".
     *
     控制FutureTask的同步器,由于需要调用protected的done方法,所以类必须定义为
     非静态内部类。为了清晰起见,所有内部类支持的方法,与外部类FutureTask一样,只不过
     添加了inner最为前缀。
     * Uses AQS sync state to represent run status
     */
    private final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -7828117401763700385L;

        /** State value representing that task is ready to run  准备就绪*/
        private static final int READY     = 0;
        /** State value representing that task is running 正在运行*/
        private static final int RUNNING   = 1;
        /** State value representing that task ran 运行完*/
        private static final int RAN       = 2;
        /** State value representing that task was cancelled 取消*/
        private static final int CANCELLED = 4;

        /** The underlying callable 执行线程callable*/
        private final Callable<V> callable;
        /** The result to return from get() 结果*/
        private V result;
        /** The exception to throw from get() get方法抛出的异常*/
        private Throwable exception;

        /**
         * The thread running task. When nulled after set/cancel, this
         * indicates that the results are accessible.  Must be
         * volatile, to ensure visibility upon completion.
	 线程用于执行任务。在set/cancel操作后为null,预示着任务结果可用,
	 变量必须volatile,以保证在任务执行完时,结果的可见性。
         */
        private volatile Thread runner;
        //构造任务同步器
        Sync(Callable<V> callable) {
            this.callable = callable;
        }
	//返回任务执行的状态,是运行完还是取消
	 private boolean ranOrCancelled(int state) {
            return (state & (RAN | CANCELLED)) != 0;
        }
	//是否执行结束,如果任务运行完或取消,且运行任务线程为null,即任务结束
	 boolean innerIsDone() {
            return ranOrCancelled(getState()) && runner == null;
        }
	 /**
         * Implements AQS base acquire to succeed if ran or cancelled
	 任务运行完或取消,则尝试获取共享锁成功。
         */
        protected int tryAcquireShared(int ignore) {
            return innerIsDone() ? 1 : -1;
        }
        /**
         * Implements AQS base release to always signal after setting
         * final done status by nulling runner thread.
	 在通过设置运行任务线程为null,设置任务线程状态为结束时,释放共享锁
         */
        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }
	//判断任务状态是否为取消
	 boolean innerIsCancelled() {
            return getState() == CANCELLED;
        }
	//获取任务执行结果
        V innerGet() throws InterruptedException, ExecutionException {
	     //这个我们在AQS篇章中有讲,这里不再说
	    //如果任务线程执行结束,如果状态为取消,则抛出CancellationException
            acquireSharedInterruptibly(0);
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
	    //否则任务线程运行完,返回结果
            return result;
        }
	//超时获取任务执行结果,这个与get方法不同是,超时等待任务线程执行结束
	 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
}

Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,
正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因
,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。
回到FutureTask
 public boolean isCancelled() {
        return sync.innerIsCancelled();
    }
    public boolean isDone() {
        return sync.innerIsDone();
    }
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }

从上面可以看出FutureTask的isCancelled,isDone,get和超时get方法是直接委托给
内部同步器Sync的相应方法。
再看其他方法先看取消
//FutureTask
   public boolean cancel(boolean mayInterruptIfRunning) {
        //委托给内部同步器
        return sync.innerCancel(mayInterruptIfRunning);
    }

//Sync
 boolean innerCancel(boolean mayInterruptIfRunning) {
            //自旋设置任务线程运行状态为CANCELLED
            for (;;) {
                int s = getState();
                if (ranOrCancelled(s))
		    //如果任务已经执行结束,则返回false,不可取消
                    return false;
		    //AQS设置任务线程运行状态为CANCELLED
                if (compareAndSetState(s, CANCELLED))
                    break;
            }
	    //如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
	    //释放锁
            releaseShared(0);
            //做一些任务执行结束工作
            done();
            return true;
        }


//FutureTask

  /**
     * Protected method invoked when this task transitions to state
     * <tt>isDone</tt> (whether normally or via cancellation). The
     * default implementation does nothing.  Subclasses may override
     * this method to invoke completion callbacks or perform
     * bookkeeping. Note that you can query status inside the
     * implementation of this method to determine whether this task
     * has been cancelled.
     无论任务线程取消还是正常运行结束,只要线程的isDone状态为true,则调用
     此方法。默认实现不做任务事情,留给子类扩展。子类可以重写此方法,用于
     在执行完成时,调用回调接口或者执行记录工作。同时可以在 此方法中确认
     任务线程是否被取消。
     */
    protected void done() { }

从上来看取消操作,首先自旋设置任务线程运行状态为CANCELLED,
如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,
释放锁,做一些任务执行结束工作(默认为空)。

再来看run
    // The following (duplicated) doc comment can be removed once
    //
    // 6270645: Javadoc comments should be inherited from most derived
    //          superinterface or superclass
    // is fixed.
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    public void run() {
        //委托给内部同步器
        sync.innerRun();
    }

//Sync
 void innerRun() {
           //如果任务线程处理就绪状态,则设置为运行状态,否则返回
            if (!compareAndSetState(READY, RUNNING))
                return;
            runner = Thread.currentThread();
            if (getState() == RUNNING) { // recheck after setting thread
                V result;
                try {
		    //执行callable
                    result = callable.call();
                } catch (Throwable ex) {
		    //设置执行异常
                    setException(ex);
                    return;
                }
		//设置结果
                set(result);
            } else {
                releaseShared(0); // cancel
            }
        }

分别来看设置执行异常和设置结果

//设置结果
set(result);
//FutureTask
    /**
     * Sets the result of this Future to the given value unless
     * this future has already been set or has been cancelled.
     * This method is invoked internally by the <tt>run</tt> method
     * upon successful completion of the computation.
     如果结果已经被设值或任务线程被取消,则设置失败。此方在run方法成功
     完成任务时,调用。
     * @param v the value
     */
    protected void set(V v) {
        sync.innerSet(v);
    }

//Sync
void innerSet(V v) {
            for (;;) {
                int s = getState();
		//如果任务运行完,则返回
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		//设置任务运行状态为RAN,并设值
                if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();
                    return;
                }
            }
        }


//设置执行异常
setException(ex);
//FutureTask

    /**
     * Causes this future to report an <tt>ExecutionException</tt>
     * with the given throwable as its cause, unless this Future has
     * already been set or has been cancelled.
     * This method is invoked internally by the <tt>run</tt> method
     * upon failure of the computation.
     * @param t the cause of failure
     设置执行异常
     */
    protected void setException(Throwable t) {
        sync.innerSetException(t);
    }

//Sync 这一步与innerSet相似,不在说
void innerSetException(Throwable t) {
            for (;;) {
                int s = getState();
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    // aggressively release to set runner to null,
                    // in case we are racing with a cancel request
                    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                if (compareAndSetState(s, RAN)) {
                    exception = t;
                    releaseShared(0);
                    done();
                    return;
                }
            }
        }

再看runAndReset
 
 /**
     * Executes the computation without setting its result, and then
     * resets this Future to initial state, failing to do so if the
     * computation encounters an exception or is cancelled.  This is
     * designed for use with tasks that intrinsically execute more
     * than once.
     此方的功能如果任务线程正在运行,并且没有设置结果,可以重新设置任务线程为
     就绪状态,如任务线程运行异常或取消,则重置失败。这个用于任务需要多次执行的场景。
     * @return true if successfully run and reset
     */
    protected boolean runAndReset() {
	//委托给内部同步器
        return sync.innerRunAndReset();
    }

//Sync
 
 boolean innerRunAndReset() {
            //如果任务线程处于从READY切换到RUNNING失败,则返回false,即任务线程不处于就绪状态
            if (!compareAndSetState(READY, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
		    //如果任务线程正在运行,调用callable
                    callable.call(); // don't set result
                runner = null;//重置任务线程为null
                return compareAndSetState(RUNNING, READY);//重置任务线程从RUNNING到READY
            } catch (Throwable ex) {
                setException(ex);
                return false;
            }
        }


总结:
FutureTask内部关联一个同步器Sync,Sync主要用于控制FutureTask的运行状态,状态一共有4中,准备就绪READY,正在运行RUNNING,运行完RAN,取消CANCELLED;任务线程结束可能有两个原因,运行完RAN或取消CANCELLED。Sync内部有一个线程runner用于执行任务,当任务线程执行结束时,runner为null。取消操作,首先自旋设置任务线程运行状态为CANCELLED,如果任务处于运行状态可以中断,任务运行线程不为null,则中断任务运行线程,释放锁,做一些任务执行结束工作(默认为空)。FutureTask的相关操作主要通过Sync来完成。
/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the <tt>run</tt> method causes completion of the <tt>Future</tt>
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's <tt>get</tt> method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
0
0
分享到:
评论
1 楼 hero001 2018-07-09  
     方式                

相关推荐

    java多线程编程同步器Future和FutureTask解析及代码示例

    主要介绍了java多线程编程同步器Future和FutureTask解析及代码示例,对二者进行了详细介绍,分析了future的源码,最后展示了相关实例代码,具有一定参考价值 ,需要的朋友可以了解下。

    FutureTask:FutureTask原始解析与重组-源码解析

    FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的...

    Java FutureTask类使用案例解析

    主要介绍了Java FutureTask类使用案例解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值析,需要的朋友可以参考下

    Java并发编程原理与实战

    提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 并发容器CopyOnWriteArrayList原理与使用.mp4 并发容器...

    编程狂人第十三期

    FutureTask 源码解析 C++:在堆上创建对象,还是在栈上? 技术纵横 UPYUN:用Erlang开发的对象存储系统 iOS 7最佳实践:一个天气App案例 看看百度招iOS开发面试的问题,你能答对多少? CoconutKit:iOS开发必备的...

    Android AsyncTask 完美解析 看不懂源码你就输了

    1.简介 android.os.AsyncTask,一个执行异步操作的类,我们可以使用它来处理后台任务,并且在UI线程中... 例如{@link Executor},{@ link ThreadPoolExecutor}和{@link FutureTask}。 2.基本使用 2.1 关键API andro

    高并发之——深度解析ScheduledFutureTask类

    JDK 1.5提供的ScheduledThreadPoolExecutor执行定时...从ScheduledFutureTask类的定义可以看出,ScheduledFutureTask类是ScheduledThreadPoolExecutor类的私有内部类,继承了FutureTask类,并实现了RunnableScheduledFu

    龙果 java并发编程原理实战

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    Java 并发编程原理与实战视频

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    龙果java并发编程完整视频

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    java并发编程

    第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future源码解读00:29:22分钟 | 第45节Fork/Join框架详解00:28:09分钟 | 第46节...

    leetcode分类-JavaStudy:Java学习

    FutureTask 的使用 CompletableFuture 的使用 (划重点) CompletableService 的使用 ForkJoin 的使用 应用 基于org.apache.poi 解析excel文件 数据结构和算法 完成8大排序 跳跃表的实现 (做了部分优化 代码看起来没有...

    java8集合源码分析-Notes:笔记

    多线程:多线程生成的原因(Java内存模型与i++操作解析)] [Java 多线程:生产者消费者问题] [Java 多线程:synchronized 关键字(修饰类,方法,静态方法,代码块)] [Java 多线程:Lock 接口(接口方法分析,...

Global site tag (gtag.js) - Google Analytics