`

线程池与阻塞队列

    博客分类:
  • JAVA
 
阅读更多

前言

    上一篇文章中我们将ThreadPoolExecutor进行了深入的学习和介绍,实际上我们在项目中应用的时候很少有直接应用ThreadPoolExecutor来创建线程池的,在jdkapi中 有这么一句话“但是,强烈建议程序员使用较为方便的 Executors 工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、 Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。”所以这篇文章我们继 续学习其它几种线程池。

线程池分类

newCachedThreadPool()

     创 建一个可缓存的线程池,即这个线程池是无界线程池,无界指工作线程的创建数量几乎没有限制(其实也有限制的,数目为 Interger.MAX_VALUE),这样可以灵活的往线程池中添加数据;可以进行自动线程回收指的是如果长时间没有往线程池中提交任务,即如果工作 线程空闲了指定的时间,则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。

     我们一般使用如下代码进行创建:

  1. ExecutorServiceservice = Executors.newCachedThreadPool();  

    我们点击代码进入源码: 

  1. /** 
  2.   * Creates a thread pool that creates newthreads as needed, but 
  3.   * will reuse previously constructedthreads when they are 
  4.   * available.  These pools will typically improve theperformance 
  5.   * of programs that execute manyshort-lived asynchronous tasks. 
  6.   * Calls to {@code execute} will reusepreviously constructed 
  7.   * threads if available. If no existingthread is available, a new 
  8.   * thread will be created and added to thepool. Threads that have 
  9.   * not been used for sixty seconds areterminated and removed from 
  10.   * the cache. Thus, a pool that remainsidle for long enough will 
  11.   * not consume any resources. Note thatpools with similar 
  12.   * properties but different details (forexample, timeout parameters) 
  13.   * may be created using {@linkThreadPoolExecutor} constructors. 
  14.   * 
  15.   * @return the newly created thread pool 
  16.   */  
  17.  public static ExecutorServicenewCachedThreadPool() {  
  18.      return new ThreadPoolExecutor(0,Integer.MAX_VALUE,  
  19.                                    60L,TimeUnit.SECONDS,  
  20.                                    newSynchronousQueue<Runnable>());  
  21.  }  

    看到代码有没有很熟悉,调用的上我们上一篇文章中的ThreadPoolExecutor类的构造方法,只不过核心线程数为0,同时指定一个最大线程数。

newFixedThreadPool(int)

    固定大小线程池这个很好理解,就是创建一个指定工作线程数量的线程池,如果线程达到设置的最大数,就将提交的任务放到线程池的队列中。一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

    一般创建:

  1. ExecutorServicenewFixedThreadPool=Executors.newFixedThreadPool(5);  

    点击进入源码:

  1. /** 
  2.  * Creates a thread pool that reuses afixed number of threads 
  3.  * operating off a shared unboundedqueue.  At any point, at most 
  4.  * {@code nThreads} threads will be activeprocessing tasks. 
  5.  * If additional tasks are submitted whenall threads are active, 
  6.  * they will wait in the queue until athread is available. 
  7.  * If any thread terminates due to afailure during execution 
  8.  * prior to shutdown, a new one will takeits place if needed to 
  9.  * execute subsequent tasks.  The threads in the pool will exist 
  10.  * until it is explicitly {@linkExecutorService#shutdown shutdown}. 
  11.  * 
  12.  * @param nThreads the number of threads inthe pool 
  13.  * @return the newly created thread pool 
  14.  * @throws IllegalArgumentException if{@code nThreads <= 0} 
  15.  */  
  16. public static ExecutorServicenewFixedThreadPool(int nThreads) {  
  17.     return new ThreadPoolExecutor(nThreads,nThreads,  
  18.                                   0L,TimeUnit.MILLISECONDS,  
  19.                                   newLinkedBlockingQueue<Runnable>());  
  20. }  

    调用的依然是我们上一篇文章中的ThreadPoolExecutor类的构造方法,只不过核心线程数为和最大线程数一样都是我们人为指定的。 

newSingleThreadExecutor()

    单线程线程池,只创建唯一的线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行。

    一般创建方法:

  1. ExecutorServicenewSingleThreadExecutor = Executors.newSingleThreadExecutor();  

    点击进入源码:

  1. /** 
  2.  * Creates an Executor that uses a singleworker thread operating 
  3.  * off an unbounded queue. (Note howeverthat if this single 
  4.  * thread terminates due to a failureduring execution prior to 
  5.  * shutdown, a new one will take its placeif needed to execute 
  6.  * subsequent tasks.)  Tasks are guaranteed to execute 
  7.  * sequentially, and no more than one taskwill be active at any 
  8.  * given time. Unlike the otherwiseequivalent 
  9.  * {@code newFixedThreadPool(1)} thereturned executor is 
  10.  * guaranteed not to be reconfigurable touse additional threads. 
  11.  * 
  12.  * @return the newly createdsingle-threaded Executor 
  13.  */  
  14. public static ExecutorServicenewSingleThreadExecutor() {  
  15.     return newFinalizableDelegatedExecutorService  
  16.         (new ThreadPoolExecutor(11,  
  17.                                 0L,TimeUnit.MILLISECONDS,  
  18.                                 newLinkedBlockingQueue<Runnable>()));  
  19. }  

    调用的依然是我们上一篇文章中的ThreadPoolExecutor类的构造方法,只不过核心线程数为和最大线程数一样都是1

    介绍到这里我们发现这三个线程池调用的都是ThreadPoolExecutor的构造函数,这三个线程的区别除了核心线程数和最大线程数参数不一样外,最重要的是传入的最后一个参数即workQueue是不一样的。

newCachedThreadPool的参数为SynchronousQueue,newFixedThreadPool和newSingleThreadExecutor的参数都为LinkedBlockingQueue,其实这一种排队策略也叫阻塞队列,那接下来我们就来介绍一下常见的阻塞队列。

阻塞队列BlockingQueue

    阻塞队列顾名思义首先它是一个队列,常见的队列有“后进先出”的栈和“先进先出”的队列。多线程环境中,通过队列可以很容易实现数据共享,最经典的就是“生 产者”和“消费者”模型,这就是一个典型的阻塞队列,比如生产者生产到一定程度必须停一下,让生产者线程挂起,这就是阻塞。

    在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒) 

    java.util.concurrent 包中的BlockingQueue就是阻塞队列的接口,作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要 唤醒线程,因为这一切BlockingQueue都给你一手包办了,并且它还是线程安全的。那我们现在来看下BlockingQueue接口的源码:

  1. public interfaceBlockingQueue<E> extends Queue<E> {  
  2.    
  3.     boolean add(E e); 
  4.    
  5.     boolean offer(E e);    
  6.    
  7.     void put(E e) throws InterruptedException;    
  8.    
  9.     boolean offer(E e, long timeout, TimeUnitunit)  
  10.         throws InterruptedException;  
  11.      
  12.     E take() throws InterruptedException;  
  13.      
  14.     E poll(long timeout, TimeUnit unit)  
  15.         throws InterruptedException;  
  16.      
  17.     int remainingCapacity();  
  18.      
  19.     boolean remove(Object o);  
  20.      
  21.     public boolean contains(Object o);  
  22.      
  23.     int drainTo(Collection<? super E> c);  
  24.      
  25.     int drainTo(Collection<? super E> c,int maxElements);  
  26. }  

    上面就是接口的所有方法,现在我们就介绍下这个接口中的核心方法: 

放入数据:

  1. boolean add(E e);  

    这个方法将将泛型对象加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

  1. boolean offer(E e);  

    这个方法将将泛型对象加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

  1. boolean offer(E e, long timeout, TimeUnitunit)throws InterruptedException;  

    这个方法可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。(本方法不阻塞当前执行方法的线程)

  1. void put(E e) throws InterruptedException;  

    这个方法把泛型对象放到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.(本方法有阻塞的功能) 

移除数据:

  1. boolean remove(Object o);  

    这个方法从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。(本方法不阻塞当前执行方法的线程)

  1. E poll(long timeout, TimeUnit unit) throws InterruptedException;  

    这个方法从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。(本方法不阻塞当前执行方法的线程)

  1. E take() throws InterruptedException;  

    这个方法是取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;(本方法有阻塞的功能)

  1. int drainTo(Collection<? super E> c);  

    这个方法是取走BlockingQueue里排在首位的对象,取不到时返回null;(本方法不阻塞当前执行方法的线程) 

  1. int drainTo(Collection<? super E> c,int maxElements);  

    这个方法是取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;(本方法不阻塞当前执行方法的线程)

    总 结一下BlockingQueue接口中的方法,这些方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不 同:第一种是抛出一个异常,第二种是返回一个特殊值(null或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。

 

 

抛出异常

特殊值

阻塞

超时

插入

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除

remove()

poll()

take()

poll(time,unit)

检查

element()

peek()

不可用

不可用

BlockingQueue实现类

     1)ArrayBlockingQueue: 基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小,以便缓存队列中数据对象。并且可以指定公平性与非公平 性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。其所含的对象是以FIFO(先入先出)顺序排序的

    2)LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。其所含的对象是以FIFO(先入先出)顺序排序的。

    3)PriorityBlockingQueue: 类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都 是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

    4)DelayQueue: 基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。 DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

    5)SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,其 中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致 LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于 ArrayBlockingQueue.

总结

    我 们这篇文章延续了上一篇文章中关于ThreadPoolExecutor线程池的一些内容,分别是newCachedThreadPool、 newFixedThreadPool、newSingleThreadExecutor,同时根据这些线程池与ThreadPoolExecutor的 关系,进而引出了阻塞队列BlockingQueue,于是我们详细介绍了接口BlockingQueue和接口中的方法,最后又介绍了接口 BlockingQueue的实现类。

分享到:
评论

相关推荐

    并发-线程池和阻塞队列

    并发-线程池和阻塞队列 并发-线程池和阻塞队列 并发-线程池和阻塞队列

    并发-线程池和阻塞队列.pdf

    讲述线程池原理,线程池使用场景和注意事项,手动创建线程池方法,注意事项,阻塞队列的相关知识

    libthread_new.rar_linux 线程池_linux任务队列_线程池

    实现了一个线程池。其中运用了队列,以避免任务丢失。在队列和线程池之间创建了一个中间夹层,以提高可移植性。当任务来时,先压入队列,然后...线程完成任务后,再去队列查询,如果有任务就去执行,没有则阻塞,等待

    基础知识五、Python实现线程池之线程安全队列

    文章目录一、线程池组成二、线程安全队列的实现三、测试逻辑3.1、测试阻塞逻辑3.2、测试读写加锁逻辑 一、线程池组成  一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中...

    day19_阻塞队列、线程池、File类、递归.pdf

    day19_阻塞队列、线程池、File类、递归.pdf

    Python 使用threading+Queue实现线程池示例

    一、线程池 1、为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3&gt;T2,那...

    java线程池概念.txt

    但是如果调用了allowCoreThreadTimeOut(boolean)方法并设置了参数为true,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的阻塞队列大小为0;(这部分通过查看...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步问题)、PriorityBlockingQueue示例、高性能无阻塞无界队列: ConcurrentLinkedQueue、DelayQueue...

    Android图片上传队列Service

    没有网络的时候,将操作产生的本地图片(拍照,也可能是其他文件),存储起来。有网络的时候传输到文件服务器。 文件服务器只支持一个文件一个文件的传输。

    阻塞队列、线程池和四大函数式接口.md

    java进阶部分

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...

    Java NIO+多线程实现聊天室

    阻塞队列BlockingQueue,生产者消费者模式 选择器 渠道 字节缓冲区 ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子指标 内置锁 竣工服务 log4j+slf4j日志 实现的功能 登录注销 ...

    深入理解Java线程池:ThreadPoolExecutor _ Idea Buffer1

    1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务 2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保

    java并发工具包详解

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...

    java定时器,java定时器

    本项目用于实现动态可配置的定时任务功能,程序基于Java的定时器线程池,阻塞队列开发,定时调度时间采用cron表达式配置的方式,其中cron表达式解析工具类提取自spring。 用法极其简单,只需要将ScheduleUtils工具类...

    个人总结的深入java多线程开发

    2)一个具有线程池的工作队列 3 3)使用线程池的风险: 4 4)有效使用线程池的原则 5 二.入门 6 1)定时器 Timer—sleep()的替代类 6 2)后台线程 daemon 6 3)线程join()方法 8 4)几种多线程代码写法 10 5)一个超时自动...

    Java 动态定时器

    本Java 动态定时器基于Java的定时器线程池,阻塞队列实现,定时调度时间采用cron表达式配置的方式,其中cron表达式解析工具类提取自spring。 用法极其简单,只需要将ScheduleUtils工具类复制到项目里,然后调用...

    python队列Queue的详解

    Queue Queue是python标准库中的线程安全的队列(FIFO)实现,提供...一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。 举个栗子: import Queue q = Queue.Que

Global site tag (gtag.js) - Google Analytics