`
2277259257
  • 浏览: 498506 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

阻塞队列+线程池

 
阅读更多

阻塞队列

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列,下图展示了如何通过阻塞队列来合作:

线程 1 往阻塞队列中添加元素,而线程 2 从阻塞队列中移除元素

从 5.0 开始,JDK 在 java.util.concurrent 包里提供了阻塞队列的官方实现。尽管 JDK 中已经包含了阻塞队列的官方实现,但是熟悉其背后的原理还是很有帮助的。

阻塞队列的实现

阻塞队列的实现类似于带上限的 Semaphore 的实现。下面是阻塞队列的一个简单实现

public class BlockingQueue {

private List queue = new LinkedList();

private int  limit = 10;

public BlockingQueue(int limit){

this.limit = limit;

}

public synchronized void enqueue(Object item)

throws InterruptedException  {

while(this.queue.size() == this.limit) {

wait();

}

if(this.queue.size() == 0) {

notifyAll();

}

this.queue.add(item);

}

public synchronized Object dequeue()

throws InterruptedException{

while(this.queue.size() == 0){

wait();

}

if(this.queue.size() == this.limit){

notifyAll();

}

return this.queue.remove(0);

}

}

必须注意到,在 enqueue 和 dequeue 方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用 notifyAll 方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用 enqueue 或者 dequeue 方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

 

线程池

线程池(Thread Pool)对于限制应用程序中同一时刻运行的线程数很有用。因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。

我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里有空闲的线程,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个阻塞队列(Blocking Queue),线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。以后会再深入有关 Java 实现多线程服务器的细节。

Java 5 在 java.util.concurrent 包中自带了内置的线程池,所以你不用非得实现自己的线程池。你可以阅读我写的java.util.concurrent.ExecutorService 的文章以了解更多有关内置线程池的知识。不过无论如何,知道一点关于线程池实现的知识总是有用的。

这里有一个简单的线程池实现:

public class ThreadPool {

  private BlockingQueue taskQueue = null;
  private List<PoolThread> threads = new ArrayList<PoolThread>();
  private boolean isStopped = false;

  public ThreadPool(int noOfThreads, int maxNoOfTasks) {
    taskQueue = new BlockingQueue(maxNoOfTasks);

    for (int i=0; i<noOfThreads; i++) {
      threads.add(new PoolThread(taskQueue));
    }
    for (PoolThread thread : threads) {
      thread.start();
    }
  }

  public void synchronized execute(Runnable task) {
    if(this.isStopped) throw
      new IllegalStateException("ThreadPool is stopped");

    this.taskQueue.enqueue(task);
  }

  public synchronized boolean stop() {
    this.isStopped = true;
    for (PoolThread thread : threads) {
      thread.stop();
    }
  }

}

(校注:原文有编译错误,我修改了下)

public class PoolThread extends Thread {

  private BlockingQueue<Runnable> taskQueue = null;
  private boolean       isStopped = false;

  public PoolThread(BlockingQueue<Runnable> queue) {
    taskQueue = queue;
  }

  public void run() {
    while (!isStopped()) {
      try {
        Runnable runnable =taskQueue.take();
        runnable.run();
      } catch(Exception e) {
        // 写日志或者报告异常,
        // 但保持线程池运行.
      }
    }
  }

  public synchronized void toStop() {
    isStopped = true;
    this.interrupt(); // 打断池中线程的 dequeue() 调用.
  }

  public synchronized boolean isStopped() {
    return isStopped;
  }
}

线程池的实现由两部分组成。类 ThreadPool 是线程池的公开接口,而类 PoolThread 用来实现执行任务的子线程。

为了执行一个任务,方法 ThreadPool.execute(Runnable r)用 Runnable 的实现作为调用参数。在内部,Runnable 对象被放入阻塞队列 (Blocking Queue) ,等待着被子线程取出队列。

一个空闲的 PoolThread 线程会把 Runnable 对象从队列中取出并执行。你可以在 PoolThread.run()方法里看到这些代码。执行完毕后,PoolThread 进入循环并且尝试从队列中再取出一个任务,直到线程终止。

调用 ThreadPool.stop()方法可以停止 ThreadPool。在内部,调用 stop 先会标记 isStopped 成员变量(为 true)。然后,线程池的每一个子线程都调用 PoolThread.stop()方法停止运行。注意,如果线程池的 execute()在 stop()之后调用,execute()方法会抛出 IllegalStateException 异常。

子线程会在完成当前执行的任务后停止。注意 PoolThread.stop() 方法中调用了 this.interrupt()。它确保阻塞在 taskQueue.dequeue() 里的 wait()调用的线程能够跳出 wait()调用(校对注:因为执行了中断 interrupt,它能够打断这个调用),并且抛出一个 InterruptedException 异常离开 dequeue()方法。这个异常在 PoolThread.run()方法中被截获、报告,然后再检查 isStopped 变量。由于 isStopped 的值是 true, 因此 PoolThread.run()方法退出,子线程终止。

分享到:
评论

相关推荐

    day19_阻塞队列、线程池、File类、递归.pdf

    day19_阻塞队列、线程池、File类、递归.pdf

    并发-线程池和阻塞队列

    并发-线程池和阻塞队列 并发-线程池和阻塞队列 并发-线程池和阻塞队列

    并发-线程池和阻塞队列.pdf

    讲述线程池原理,线程池使用场景和注意事项,手动创建线程池方法,注意事项,阻塞队列的相关知识

    阻塞队列、线程池和四大函数式接口.md

    java进阶部分

    libthread_new.rar_linux 线程池_linux任务队列_线程池

    实现了一个线程池。其中运用了队列,以避免任务丢失。在队列和线程池之间创建了一个中间夹层,以提高可移植性。当任务来时,先压入队列,然后...线程完成任务后,再去队列查询,如果有任务就去执行,没有则阻塞,等待

    Python 使用threading+Queue实现线程池示例

    一、线程池 1、为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3&gt;T2,那...

    Android图片上传队列Service

    没有网络的时候,将操作产生的本地图片(拍照,也可能是其他文件),存储起来。有网络的时候传输到文件服务器。 文件服务器只支持一个文件一个文件的传输。

    java线程池概念.txt

    但是如果调用了allowCoreThreadTimeOut(boolean)方法并设置了参数为true,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的阻塞队列大小为0;(这部分通过查看...

    这就是标题—— JUC.pdf

    JUC是什么 线程 进程 / 线程 ...BlockingQueue(阻塞队列) 线程池 池化技术 线程池的优势 线程池的特点 线程池三大方法 线程池七大参数 线程池四种拒绝策略 ForkJoin 异步回调 Volatile 指令重排 JMM

    【原价2300!!】尚硅谷_互联网大厂高频重点面试题视频详细讲解

    上半场,从多线程并发入手,分层递进讲解,逐步让大家掌握volatile、原子类和原子引用、CAS、ABA、Java锁机制、阻塞队列、线程池等重点;下半场,逐步过渡到JVM和GC的知识,深度讲解多种常见OOM异常和JVM参数调优,...

    2019年互联网大厂高频重点面试题(第2季)

    上半场,从多线程并发入手,分层递进讲解,逐步让大家掌握volatile、原子类和原子引用、CAS、ABA、Java锁机制、阻塞队列、线程池等重点;下半场,逐步过渡到JVM和GC的知识,深度讲解多种常见OOM异常和JVM参数调优,...

    基础知识五、Python实现线程池之线程安全队列

    文章目录一、线程池组成二、线程安全队列的实现三、测试逻辑3.1、测试阻塞逻辑3.2、测试读写加锁逻辑 一、线程池组成  一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中...

    2019互联网面试题第2季.mmap

    上半场,从多线程并发入手,分层递进讲解,逐步让大家掌握volatile、原子类和原子引用、CAS、ABA、Java锁机制、阻塞队列、线程池等重点;下半场,逐步过渡到JVM和GC的知识,深度讲解多种常见OOM异常和JVM参数调优,...

    2019互联网大厂高频重点面试题 (第2季)脑图-完结.txt

    上半场,从多线程并发入手,分层递进讲解,逐步让大家掌握volatile、原子类和原子引用、CAS、ABA、Java锁机制、阻塞队列、线程池等重点;下半场,逐步过渡到JVM和GC的知识,深度讲解多种常见OOM异常和JVM参数调优,...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    多个线程竞争问题、多个线程多个锁问题、创建一个缓存的线程池、多线程使用Vector或者HashTable的示例(简单线程同步问题)、PriorityBlockingQueue示例、高性能无阻塞无界队列: ConcurrentLinkedQueue、DelayQueue...

    java并发库高级应用源码--张孝祥

    java并发库thread使用,传统线程技术、定时器技术、线程互斥技术,同步通讯技术、多线程共享数据、并发库应用,线程锁技术,阻塞锁、阻塞队列,线程池等应用

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...

    深入理解Java线程池:ThreadPoolExecutor _ Idea Buffer1

    1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务 2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保

    java并发工具包详解

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. ...

Global site tag (gtag.js) - Google Analytics