`
he_wen
  • 浏览: 234034 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

利用Executor框架执行并行任务---之基础篇

阅读更多

    本文章的主题主要是在应用程序中寻找可以并行的任务,也就是把问题分成几个子问题。

 

一、讲解Executor和Executors以及ThreadPoolExecutor

 

public interface Executor{

void execute(Runnable command) ;
 
}

 

    执行已提交的 Runnable 任务的对象。

 

  Executors是一个工厂类,它创建不同类型的线程池,主要是根据不同的参数配置。

 

下面还是主要说明ThreadPoolExecutor类中的构造方法,如:

 

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler)  

 

具体流程如下:

1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

 

ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。

下面分析一下创建几个经典的线程池是怎么做到的:

1、一个固定大小的线程池

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 2、

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。

二、讲解ExecutorService

由于创建了线程池说明是一个非常复杂的程序,所以就需要对它进行管理,也就是说他们的生命周期。

ExecutorService是Executor的父类,它定义了一个灵活的执行策略以及对任务的批量处理,讲解这个接口之前,需要讲解一下Future接口和Callable接口。

public interface Future<V> 

 

 

 

boolean cancel(boolean mayInterruptIfRunning)
          试图取消对此任务的执行。
 V get()
          如有必要,等待计算完成,然后获取其结果。
 V get(long timeout, TimeUnit unit)
          如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
 boolean isCancelled()
          如果在任务正常完成前将其取消,则返回 true
 boolean isDone()
          如果任务已完成,则返回 true

 

 

 

 

 

 

 

由于是异步获得结果,重点说明get()方法。FutureTask实现了Future接口并且实现了Runnable接口,所以在执行任务的时候可以调用run方法,如:

   

     Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                public V call() throws InterruptedException {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft = new FutureTask<V>(eval);
            f = ft;
            cache.put(arg, ft);
            ft.run(); // call to c.compute happens here
        }
        try {
            return f.get();

 执行任务还有另外一个方法就是委托给线程:new Thread(ft).start()

 

下面讲解ExecutorService接口:

 

由于Executor接口一次只能执行一个任务对多个任务不支持,而且对任务执行中参数的异常不可以管理以及取消提交的任务都不支持,对于以上的需求,就扩展了ExecutorService接口。

 

 

 

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();    
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

      <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

自从Executor引进了Service后,该应用就能提供一种优雅的方式关闭应用程序,并且提供一些影响关闭状态的消息。

 

ExecutorService生命周期有三个状态:运行状态、正在关闭状态、终止状态,他是刚刚创建的时候就是运行状态。

 shutdow方法允许已经被提交的任务被执行但是不接收新的任务;shutdownNow方法是立即关闭:试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

 

invokeAll方法时批量的提交任务返回一个future集合,这个方法在下面的文章能应用。。。

0
8
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics