`
BrokenDreams
  • 浏览: 248987 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97932
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(20)-Executors

阅读更多

Jdk1.6 JUC源码解析(20)-Executors

作者:大飞

 

功能简介:
  • Executors是JUC包提供的一个工具性质的帮助类,它针对ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工厂方法和工具方法。
源码分析:
  • 首先看下针对ExecutorService提供的一些工厂方法:

 

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
       通过之前文章中对ThreadPoolExecutor的分析可知:

 

              1.这个方法创建了一个核心线程数量和最大线程数量一致的,并且任务队列是无界队列的线程池。
              2.由于默认核心线程不会超时,所以超时相关的参数也没有意义。
              3.如果在线程关闭之前,一个工作线程由于某种原因挂了,那么线程池会自动补上一个新的工作线程。
 
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
       除了能定制ThreadFactory之外,和上个方法一样。
 
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
       这个工厂方法看上去有点类似newFixedThreadPool(1) ,但有一点儿区别,这个不能重新调整配置(比如动态增大核心线程数量)了,由于方法内返回的不是ThreadPoolExecutor实例,而是一个包装类:
    static class FinalizableDelegatedExecutorService
	extends DelegatedExecutorService {
	    FinalizableDelegatedExecutorService(ExecutorService executor) {
	        super(executor);
	    }
	    protected void finalize()  {
	        super.shutdown();//被垃圾回收时,关闭线程池。
	    }
    }
    /**
     * 包装类,方法代理到内部的ExecutorService,只暴漏ExecutorService定义的方法。
     */
    static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List<Runnable> shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future<?> submit(Runnable task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Callable<T> task) {
            return e.submit(task);
        }
        public <T> Future<T> submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }
 
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
       除了能定制ThreadFactory之外,和上个方法一样。
 
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
       通过之前文章中对ThreadPoolExecutor的分析可知:
              1.这个方法创建了一个核心线程数量为0,最大线程(可以认为)无上限,并且任务队列是同步队列(无实际容量)的线程池。
              2.针对每一个新任务,如果当前没有空闲线程,都会创建一个新的工作线程来处理任务。工作线程默认空闲超过60秒超时被回收。
 
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
       除了能定制ThreadFactory之外,和上个方法一样。
 
    public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedExecutorService(executor);
    }
       DelegatedExecutorService这面已经看到过,这个方法就相当于将一个ExecutorService包装成一个不可配置的ExecutorService。
 
  • 继续看下针对ScheduledExecutorService提供的一些工厂方法:
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

       通过之前文章中对ScheduledThreadPoolExecutor的分析可知:

              1.这个方法创建了一个给定(核心)线程数量的ScheduledThreadPoolExecutor(由于其内部的任务队列是无界的,所以尽管继承自ThreadPoolExecutor,但最大线程数量无意义)。
 
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
        类似newScheduledThreadPool(1)  (这个其实看起来更像是一个加强版的Timer),但不能调整配置:
    static class DelegatedScheduledExecutorService
            extends DelegatedExecutorService
            implements ScheduledExecutorService {
        private final ScheduledExecutorService e;
        DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
        }
        public ScheduledFuture<?> schedule(Runnable command, long delay,  TimeUnit unit) {
            return e.schedule(command, delay, unit);
        }
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return e.schedule(callable, delay, unit);
        }
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,  long period, TimeUnit unit) {
            return e.scheduleAtFixedRate(command, initialDelay, period, unit);
        }
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,  long delay, TimeUnit unit) {
            return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    }
 
    public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedScheduledExecutorService(executor);
    }
       相当于将一个ScheduledExecutorService包装成一个不可配置的ScheduledExecutorService。
 
  • 再看下针对ThreadFactory提供的一些工厂方法:
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

       返回一个DefaultThreadFactory实例。在创建ThreadPoolExecutor和ScheduledThreadPoolExecutor时如果没有显式指定ThreadFactory,会默认使用这个,看下实现:

    static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :
                                 Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

 

    public static ThreadFactory privilegedThreadFactory() {
        return new PrivilegedThreadFactory();
    }
    static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final ClassLoader ccl;
        private final AccessControlContext acc;
        PrivilegedThreadFactory() {
            super();
            this.ccl = Thread.currentThread().getContextClassLoader();
            this.acc = AccessController.getContext();
            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
        }
        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        public Object run() {
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }

       privilegedThreadFactory和defaultThreadFactory返回的工厂类会创建设置相同的Thread,只是PrivilegedThreadFactory创建的Thread会使用和当前线程(创建线程)相同的访问控制和类加载器。

 
  • 最后看下针对Callable提供的一些工具方法:
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

       将一个Runnable和一个返回值包装成一个Callable,返回的这个适配类之前也见过:

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

 

    public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }

       上面方法的重载版本,返回值默认为null。

 
       当然也会涉及到有访问控制和类加载器设定的工具方法:
    public static Callable<Object> callable(final PrivilegedAction<?> action) {
        if (action == null)
            throw new NullPointerException();
        return new Callable<Object>() {
	    public Object call() { return action.run(); }};
    }

    public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
        if (action == null)
            throw new NullPointerException();
	return new Callable<Object>() {
	    public Object call() throws Exception { return action.run(); }};
    }

    public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallable<T>(callable);
    }

    public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
        if (callable == null)
            throw new NullPointerException();
        return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
    }
    static final class PrivilegedCallable<T> implements Callable<T> {
        private final AccessControlContext acc;
        private final Callable<T> task;
        private T result;
        private Exception exception;
        PrivilegedCallable(Callable<T> task) {
            this.task = task;
            this.acc = AccessController.getContext();
        }
        public T call() throws Exception {
            AccessController.doPrivileged(new PrivilegedAction<T>() {
                    public T run() {
                        try {
                            result = task.call();
                        } catch (Exception ex) {
                            exception = ex;
                        }
                        return null;
                    }
                }, acc);
            if (exception != null)
                throw exception;
            else
                return result;
        }
    }

    static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
        private final ClassLoader ccl;
        private final AccessControlContext acc;
        private final Callable<T> task;
        private T result;
        private Exception exception;
        PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
            this.task = task;
            this.ccl = Thread.currentThread().getContextClassLoader();
            this.acc = AccessController.getContext();
            acc.checkPermission(new RuntimePermission("getContextClassLoader"));
            acc.checkPermission(new RuntimePermission("setContextClassLoader"));
        }
        public T call() throws Exception {
            AccessController.doPrivileged(new PrivilegedAction<T>() {
                    public T run() {
                        Thread t = Thread.currentThread();
                        try {
                            ClassLoader cl = t.getContextClassLoader();
                            if (ccl == cl) {
                                result = task.call();
                            } else {
                                t.setContextClassLoader(ccl);
                                try {
                                    result = task.call();
                                } finally {
                                    t.setContextClassLoader(cl);
                                }
                            }
                        } catch (Exception ex) {
                            exception = ex;
                        }
                        return null;
                    }
                }, acc);
            if (exception != null)
                throw exception;
            else
                return result;
        }
    }
 
 
       Executors的代码解析完毕!
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics