`

Java Concurrent之Executor

 
阅读更多
JDK5-Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。以往我们通常会

  • 将所有的任务交给一个线程,串行执行 缺点:糟糕的吞吐量 和响应速度
  • 将每个任务分配一个线程执行 缺点:资源管理复杂性 线程的频繁建立与销毁过于销毁资源

为了能够更好地控制多线程,JDK5之后提供了(java.util.concurrent)一种灵活的线程池实现作为Executor框架的一部分。任务执行的主要抽象不是 Thread而是Executor。

什么是线程池?

线程池,从字面上看,是指管理一组同构工作线程的资源池。线程池与工作队列(Work Queue)密切相关。工作线程(Work Thread)的任务很简单: 从工作队列中获取任务执行,然后返回线程池等待获取下一个任务。它比“每个任务分配一个线程”更有优势:

  • 线程池在处理多个请求时,重用线程而不是创建线程 从而很好的将多线程的创建与销毁的巨大资源消耗进行分摊
  • 线程重用而不是重新创建,在请求到来时就不用创建线程,大大提高了程序的响应性
  • 通过适当地调整线程池大小,可以创建足够多的线程以便使处理器处于忙碌状态,同时还可以防止过多的线程相互竞争资源而使应用程序内存耗尽。

JDK5类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

public class Executors {
 
    /**
     * 创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池最大数量,这时线程池规模将不再
     * 变化(若某个线程发生了未预期的异常,则线程池将会补充一个新的线程)
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if nThreads <= 0
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }
    /**
     * 单线程的Executor,它创建单个工作线程来执行任务,若这个线程异常结束,会创建另一个线程来替代。
     * 能保证依照任务在任务队列中的顺序来串行执行
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }
    /**
     * 创建一个可缓存的线程池,若线程池的当前规模超过了处理需求时,则将回收空闲的线程,
     * 当需求增加时,则可以添加新的线程,
     * 线程池的规模不受任何限制
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
    /**
     * 创建一个固定长度的线程池而且以延迟或定时的方式来执行任务,类似于Timer
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle.
     * @param threadFactory the factory to use when the executor
     * creates a new thread.
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if corePoolSize < 0
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

 测试代码:

   /**
 * 
 * @author Sonicery_D
 */
public class ConcurrentTest {
 
    @Test
    public void test01(){
        Executor threadPool = null;
        /*
         *创建一个固定大小为20的线程池 
         */
        threadPool = Executors.newFixedThreadPool(20);
        /*
         * 创建大小不受限制的可缓存的线程池
         */
        threadPool = Executors.newCachedThreadPool();
        /*
         * 创建一个单工作线程的线程池
         */
        threadPool = Executors.newSingleThreadExecutor();
        /*
         * 创建一个固定长度的线程池而且以延迟或定时的方式来执行任务的线程池
         */
        threadPool = Executors.newScheduledThreadPool(20);
        /*
         * 每个创建线程池的方法都有ThreadFactory的重载
         */
        threadPool = Executors.newFixedThreadPool(20, new ThreadFactory(){
            //控制线程产生的细节操作
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);//设置为守护线程
                thread.setPriority(Thread.MAX_PRIORITY);//设置最大优先级
                return thread;
            }
        });
        /*
         * 提交一个任务
         */
        threadPool.execute(new Runnable(){
 
            @Override
            public void run() {
                // doSomething...
            }
            
        });
        /*我们一般使用ExecutorService,方便我们更好地操作线程池*/
        ExecutorService executorService = (ExecutorService)threadPool;
        /*
         * 提交有返回值的任务
         */
        Future result = executorService.submit(new Callable(){
            @Override
            public String call() throws Exception {
                // to do something...
                return null;
            }
        });
        /*
         * 温柔地关闭线程池
         */
        executorService.shutdown();
        /*
         * 粗暴的关闭线程池
         */
        executorService.shutdownNow();
        
    }
}

 通过源码我们可以看出,Executors类中的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool的默认实现都是:

/**
     *
     * @param corePoolSize 指定线程池中线程数量 包括空闲线程
     * @param maximumPoolSize 指定线程池允许最大线程数量
     * @param keepAliveTime  超过corePoolSize 多余的空闲线程的最大存活时间
     * @param unit keepAliveTime的时间单位 
     * @param workQueue 任务队列,被提交但未执行的任务存放容器。
     * @param threadFactory  用于生产线程池中的线程,可以控制线程产生的具体细节
     * @throws IllegalArgumentException if corePoolSize or
     * keepAliveTime less than zero, or if maximumPoolSize less than or
     * equal to zero, or if corePoolSize greater than maximumPoolSize.
     * @throws NullPointerException if workQueue is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue
                              ThreadFactory threadFactory) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
     /**
     * @param handler 拒绝策略,当任务执行由于线程数量超过限制或者任务队列达到最大限度而造成阻塞 时的拒绝策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

 RejectedExecutionHandler handler: handler拒绝策略,当任务的数量达到线程池的实际负载能力时,该如何处理提交的任务。 JDK内置了4中拒绝策略:

  • AbortPolicy策略: 该策略会直接抛出异常
  • CallerRunsPolicy策略: 只要线程池未关闭,该策略直接在调用者线程中运行当前被放弃任务
  • DiscardOledestPolicy策略: 该策略丢弃最老的一个请求,即即将被执行的任务,并尝试再提交当前任务
  • DiscardPolicy策略: 该策略丢弃无法处理的任务,不做任何处理

所有拒绝的策略都继承了RejectedExecutionHandler,用户可以通过继承RejectedExecutionHandler扩展自己自定义的拒绝策略

ThreadPoolExecutor内部机制
  • 当线程池中的线程数量小于corePoolSize时,新建线程处理任务
  • 当线程池中的线程数量等于corePoolSize时,将任务放入workQueue中,线程池中的空闲线程从workQueue中取任务执行
  • 当workQueue中放不下新来的任务请求时,创建新线程处理;若线程池中的线程数量大于maximumPoolSize时,使用RejectedExecutionHandler 来做拒绝处理。
  • 当线程池中的线程池数量大于corePoolSize时,多余的线程的存活时间最大为keepAliveTime,若无任务可处理则自行销毁

其内部结构如下: 



 

  • 大小: 24.4 KB
分享到:
评论

相关推荐

    java.util.concurrent

    java.util.concurrent总体概览图。 收取资源分3分。...java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和具体实现类。

    Java实现生产者消费者模型

    Java实现生产者消费者模型 生产者消费者模型,是一般面试题都会考的,下面介绍使用ReetrantLock实现 生产者消费者模型。 定义一个ReentrantLock锁,同时new出两个condition,一...import java.util.concurrent.Executor

    Mastering Concurrency Programming with Java 8

    you will learn how to use the most important components of the Java 8 Concurrency API: the Executor framework to execute multiple tasks in your applications, the phaser class to implement concurrent ...

    Executor,Executors,ExecutorService比较.docx

    Executors: 是java.util.concurrent包下的一个类,提供了若干个静态方法,用于生成不同类型的线程池。Executors一共可以创建下面这四类线程池: 1.newFixedThreadPool创建一个可缓存线程池,如果线程池长度超过...

    java7帮助文档

    The fork/join framework, which is based on the ForkJoinPool class, is an implementation of the Executor interface. It is designed to efficiently run a large number of tasks using a pool of worker ...

    Java 7 Concurrency Cookbook

    Java is a concurrent platform and offers a lot of classes to execute concurrent tasks inside a Java program. With each version, Java increases the functionalities offered to programmers to facilitate ...

    Java 9 Concurrency Cookbook - Second Edition

    Writing concurrent and parallel programming applications is an integral skill for any Java programmer. Java 9 comes with a host of fantastic features, including significant performance improvements ...

    tascalate-concurrent:阻塞(IO绑定)的实现可取消java.util.concurrent.CompletionStage和java.util.concurrent.ExecutorService-s的相关扩展

    在版本,工件已重命名:新名称: &lt; dependency&gt; &lt; groupId&gt;net.tascalate&lt;/ groupId&gt; &lt; artifactId&gt;net.tascalate.concurrent&lt;/ artifactId&gt; &lt; version&gt;0.9.5&lt;/ version&gt; &lt;!-- Any version above 0.8.0, the ...

    java 并发编程 多线程

    java 并发 编程 多线程 concurrent lock condition executorserice executor java.util.curcurrent.

    Java并发编程实战

    5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 CopyOnWriteArrayList 5.3 阻塞队列和生产者-消费者模式 5.3.1 ...

    PriorityAsyncTask:Android Studio 库模块 - PriorityAsyncTask

    具体实现可以通过 getPriority() 指定优先级,并且此优先级仅在任务在 PriorityAsyncTask.THREAD_POOL_EXECUTOR 上执行时使用,而不是在通过 executeOnExecutor(java.util.concurrent.Executor, Object[]) 指定的...

    Java 并发编程实战

    5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 CopyOnWriteArrayList 5.3 阻塞队列和生产者-消费者模式 5.3.1 ...

    thread count

    import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchDemo { private static final int PLAYER_...

    \java超强笔记(超级经典)

    全部是txt格式的,容量小,以下内容为其中之一: 5.0新特性: 泛型: 泛型的形式: 类型&gt; &lt;E extends Numner&comparator&gt; 类名&接口,表示E继承Numner类实现comparator接口 &lt;?&gt; 泛型通配符表示任意...

    个人总结的深入java多线程开发

    看完《think in java》多线程章节,自己写的多线程文档,还结合了其他的相关网络资料。 线程 一. 线程池 1)为什么要使用线程池 2 2)一个具有线程池的工作队列 3 3)使用线程池的风险: 4 4)有效使用线程池的原则 5...

    安卓java读取网页源码-JavaAyo:java的学习和测试工程,纯java工程,直接运行main方法

    java并发编程--Executor框架(一) 怎么理解Condition JAVA细粒度锁实现的几种方式: NIO,IO 注解 [笔记] dagger2: Retrofit和ButterKnife的注解怎么实现 反射 JOOR [ClassLoader==&gt;字节码,apk] Socket==&gt;openfire==&gt;...

    线程超时死掉

    Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实 现,可以来进行异步计算。 Future模式可以这样来描述:我有一个任务,提交给了Future,Future替我完成这个...

    基础技术部牛路《Java多线程入阶分享》纯干货

    Java多线程入阶干货分享 1.使用线程的经验:设置名称、响应中断、使用ThreadLocal 2.Executor:ExecutorService和Future 3.阻塞队列:put和take、offer和poll、drainTo 4.线程间通信:lock、condition、wait、notify...

Global site tag (gtag.js) - Google Analytics