`
DavidIsOK
  • 浏览: 74065 次
社区版块
存档分类
最新评论

java线程(四):阻塞队列(BlockingQueue)

    博客分类:
  • java
阅读更多

 

1. 阻塞队列定义

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

当没有空间可以用或者空间已满时下面各个方法的处理方式

方法\处理方式

抛出异常

返回布尔值

一直阻塞

超时退出

插入方法

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除方法

remove()

poll()

take()

poll(time,unit)

获取头部方法

element()

peek()

   

 

2. Java里的阻塞队列

JDK7提供了6个阻塞队列。分别是ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueDelayQueueSynchronousQueueLinkedTransferQueue,下面我们一一介绍他们的特点。

 

2.1 ArrayBlockingQueue

ArrayBlockingQueue实现了数组支持的有界阻塞队列,实现了FIFO(先进先出)排序机制。队列大小固定,之后不允许在增加数组容量,重载的构造函数允许指定公平策略来排序等待的线程。如果为真,所有阻塞的插入和删除操作都按照FIFO顺序处理,默认为假,这样可能导致等待线程顺序不公平,而且可能导致部分饿死(就是一直获取不到资源)的问题,但同时也带来了提高吞吐量的好处。

 

2.2 LinkedBlockingQueue

LinkedBlockingQueue 此队列按先进先出原则对元素排序,而且默认最大容量是最大整数值Integer.MAX_VALUE

 

2.3 PriorityBlockingQueue

    PriorityBlockingQueue一个无界阻塞队列,根据自然顺序决定队列中的元素优先级。它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。

 

2.4 DelayQueue

DelayQueue是一种特殊的优先级队列,按照每个元素的延迟时间(也就是在元素可以从队列中移除的剩余时间)进行排序。可想而知,队头肯定是离到期时间最短的元素了。如果都没到期,那就没有队头,poll是会返回null的。当然peek()查询队头元素和size()查询元素总数还是可以的。

 

2.5 SynchronousQueue

   SynchronousQueue就跟我们之前的消费者和生产者类似了。此队列没有任何容量,它定义了每一个插入操作都必须等待对应的移除操作,反之亦然。

 

2.6 LinkedTransferQueuejava7):

   LinkedTransferQueue是个非常有用的类,java7中新加入的阻塞队列,我们会有一个专门的例子来展示它。也是是TransferQueue接口的实现类,而TransferQueueBlockingQueue的扩展。很多开源项目里面都用到了这类。这个队列的容量是有限的,有以下方法


A. transfer(E e)
若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,直到到有消费者线程取走该元素。(这里有点类似SynchronousQueue队列,下面会说为什么),此时要注意,在队列为空时,只要有>=1个消费者的话,就立即转交,否则如果队列不为空它会等消费者消费完了队列的元素后再移交这个元素。(其实我有点不清楚这跟进去队列有什么区别吗?我后期会去研究研究,到时贴在这里。也希望有牛人能在下面解答一下,我将在这里注明作者)

 

B. tryTransfer(E e)若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻移交对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作,与上面的方法就区别开了

 

C. tryTransfer(E e, long timeout, TimeUnit unit)若当前存在一个正在等待获取的消费者线程,会立即移交给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false同时该元素被移除。

 

D. hasWaitingConsumer()判断是否存在消费者线程。

 

E. getWaitingConsumerCount()获取所有等待获取元素的消费线程数量。

 

F. size()返回队列的元素个数,因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改,一般不用。

PS:其实transfer方法在SynchronousQueue的实现中就已存在了,只是没有做为API暴露出来。SynchronousQueue有一个特性:它本身不存在容量,只能进行线程之间的

元素传送。SynchronousQueue在执行offer操作时,如果没有其他线程执行poll,则直接返回false.线程之间元素传送正是通过transfer方法完成的。

 

 

3.关于阻塞队列的两个例子

前一个是普通阻塞队列,后一个是LinkedTransferQueue

3.1 :模拟一个股票交易系统

package com.hxw.Threads;
 
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
 
public class StockExchange {
   /**
    * 模拟一个股票交易系统,100卖方,100个买方不停执行买卖操作
    */
   public static void main(String[] args) {
      System.out.println("按下回车键停止买卖!!!!!!!!!!");
      BlockingDeque<Integer> orderQueue=new LinkedBlockingDeque<Integer>();   //构建阻塞订单队列
      Seller seller=new Seller(orderQueue);
      Thread[] sellerThread=new Thread[10];
      for (int i = 0; i < sellerThread.length; i++) {   //加入100个卖方线程
          sellerThread[i]=new Thread(seller);
          sellerThread[i].start();
      }
     
      Buyer buyer=new Buyer(orderQueue);
      Thread[] buyerThread=new Thread[10];
      for (int i = 0; i < buyerThread.length; i++) { //加入100个买方线程
          sellerThread[i]=new Thread(buyer);
          sellerThread[i].start();
      }
     
      try {
         while(System.in.read()!='\n')  //当按下回车键时
         System.out.println("所有买卖方全部禁止交易!");
         for(Thread t: sellerThread){   //让卖方线程停止
            t.interrupt();
         }
         for(Thread t: buyerThread){    //让买方线程停止
            t.interrupt();
         }
      } catch (IOException e) {
         e.printStackTrace();
      }
   }
 
}
 
class Seller implements Runnable{
 
   private BlockingDeque<Integer> orderQueue;
   private boolean shutdownRequest=false;  //设定一个关闭标志
   public Seller(BlockingDeque<Integer> orderQueue) {
      this.orderQueue=orderQueue;
   }
   @Override
   public void run() {
      while(shutdownRequest==false){ //正常情况下无限循环抛售
         Integer quantity=(int)(Math.random()*100);
         try {
            orderQueue.put(quantity);
            System.out.println("卖方 "+Thread.currentThread().getName()+"抛售了股票 "+quantity+"股");
         } catch (InterruptedException e) {
            shutdownRequest=true; //抛出异常后停止
            e.printStackTrace();
         }
      }
   }
  
}
 
class Buyer implements Runnable{
 
   private BlockingDeque<Integer> orderQueue;
   private boolean shutdownRequest=false;  //同上
   public Buyer(BlockingDeque<Integer> orderQueue) {
      this.orderQueue=orderQueue;
   }
   @Override
   public void run() {
      while(shutdownRequest==false){
         try {
            Integer quantity=orderQueue.take();
            System.out.println("买方"+Thread.currentThread().getName()+"购买了股票 "+quantity+"股");
         } catch (InterruptedException e) {
            shutdownRequest=true;
            e.printStackTrace();
         }
      }
   }
  
}

 【运行结果】:

卖方 Thread-3抛售了股票 60股

买方Thread-18购买了股票 91股

卖方 Thread-5抛售了股票 32股

卖方 Thread-4抛售了股票 36股

卖方 Thread-0抛售了股票 70股

买方Thread-13购买了股票 42股

卖方 Thread-7抛售了股票 79股

买方Thread-14购买了股票 40股

买方Thread-11购买了股票 24股

卖方 Thread-2抛售了股票 73股

买方Thread-10购买了股票 32股

.....

 

 

3.2 模拟一个为顾客创建幸运数字的机器,顾客来了机器才创建,没有顾客不会创建幸运数字

package com.hxw.Threads;
 
import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
 
public class LuckyNumberGenerate {
 
   /**
    * LinkedTransferQueue 示例,模拟一个为顾客创建幸运数字的机器,顾客来了机器才创建,没有顾客不会创建幸运数字
    */ public static void main(String[] args) {
      TransferQueue<Integer> queue = new LinkedTransferQueue<Integer>();
      Thread producer = new Thread(new NumberProducer(queue));
      producer.setDaemon(true);
      producer.start();
      for (int i = 0; i < 10; i++) {
         Thread consumer = new Thread(new LuckyComsumer(queue));
         consumer.setDaemon(true);
         consumer.start();
         try {
            Thread.sleep(1794);   //平均两秒来一个顾客
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
 
}
 
class NumberProducer implements Runnable {
   private TransferQueue<Integer> queue = null;
 
   NumberProducer(TransferQueue<Integer> queue) {
      this.queue = queue;
   }
 
   @Override
   public void run() {
      try {
         while (true) {
            if (queue.hasWaitingConsumer()) {
                System.out.println("顾客请稍等,正在创建幸运数字....");
                TimeUnit.SECONDS.sleep(1);  //创建一个幸运数字需要一秒
                queue.transfer(produce());
            }
           
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
 
   }
   private Integer produce() {
      return (new Random().nextInt(100));
   }
 
}
 
class LuckyComsumer implements Runnable {
   private TransferQueue<Integer> queue = null;
 
   public LuckyComsumer(TransferQueue<Integer> queue) {
      this.queue = queue;
      System.out.println("有客来了!");
   }
 
   @Override
   public void run() {
      try {
         System.out.println("客户"+ Thread.currentThread().getName() + "获得幸运数字,幸运数字为"+queue.take()+"\n");
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
 
   }
 
}

   【运行结果】:

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-1获得幸运数字,幸运数字为31

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-2获得幸运数字,幸运数字为73

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-3获得幸运数字,幸运数字为31

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-4获得幸运数字,幸运数字为82

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-5获得幸运数字,幸运数字为81

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-6获得幸运数字,幸运数字为54

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-7获得幸运数字,幸运数字为87

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-8获得幸运数字,幸运数字为69

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-9获得幸运数字,幸运数字为61

 

有客来了!

顾客请稍等,正在创建幸运数字....

客户Thread-10获得幸运数字,幸运数字为25

 

2
1
分享到:
评论

相关推荐

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    6.JUC并发工具类在大厂的应用场景详解 (1).pdf 7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf ...10、阻塞队列BlockingQueue 实战及其原理分析.pdf

    深入理解Java线程编程中的阻塞队列容器

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景...

    阻塞队列BlockingQueue的使用

    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文简要介绍下BlockingQueue...

    BlockingQueue(阻塞队列)详解

    在新增的Concurrent包中,BlockingQueue...通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。

    【Java】Queue、BlockingQueue和队列实现生产者消费者模式

    BlockingQueue接口 – 阻塞队列2.1 ArrayBlockingQueue类(有界阻塞队列)2.2 LinkedBlockingQueue类(无界阻塞队列)3. 源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public ...

    个人总结的深入java多线程开发

    9)线程阻塞的条件(重要) 14 10) Interrupt()注意事项 16 三. 线程之间协作 17 1)演示简单的消费者和生产者的例子: 17 2)管道的读写流处理方式 19 3)重要的演示死锁的问题—哲学家就餐问题 20 4)终止多线程程序的两种...

    Java可阻塞队列-ArrayBlockingQueue

     ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先出)原则,当队列已经填满,在去增加则会导致阻塞,这种阻塞类似线程阻塞。  ArrayBlockingQueue提供的增加和取出方法总结  使用...

    RustBlockingQueue:线程安全队列,在空时阻止出队

    RustBlockingQueue是使用线程安全的阻塞队列在线程之间进行通信的工具。 请注意,Rust消息传递工具执行的操作大致相同。 这很好地说明了如何构建线程之间可以共享的数据结构。 设计: 在此设计中,有一个结构...

    Java NIO+多线程实现聊天室

    阻塞队列BlockingQueue,生产者消费者模式 选择器 渠道 字节缓冲区 ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子指标 内置锁 竣工服务 log4j+slf4j日志 实现的功能 登录注销 ...

    Java并发编程实战

    5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...

    线程安全队列Queue

    在项目启动时,开一个单线程来专门处理巡检任务的下发给巡检服务组件。使用BlockingQueue阻塞算法。BlockingQueue作为线程容器,可以为线程同步提供有力的保障。

    Java 并发编程实战

    5.3 阻塞队列和生产者-消费者模式 5.3.1 示例:桌面搜索 5.3.2 串行线程封闭 5.3.3 双端队列与工作密取 5.4 阻塞方法与中断方法 5.5 同步工具类 5.5.1 闭锁 5.5.2 FutureTask 5.5.3 信号量 5.5.4 栅栏 5.6...

    Java容器.xmind

    阻塞队列 BlockingQueue接口 void put(E e) 将指定元素插入此队列,如果没有可用空间,则等待 E take()获取并移除此队列头部元素,如果没有可用元素,则等待 ArrayBlockingQueue 数组结构实现,有界队列,手工固定...

    Java多线程 BlockingQueue实现生产者消费者模型详解

    主要介绍了Java多线程 BlockingQueue实现生产者消费者模型详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    Java 常见并发容器总结

    可以看做一个线程安全的 `LinkedList`,这是一个非阻塞队列。 - **`BlockingQueue`** : 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。 - **`...

    Java开发基于多线程和NIO实现聊天室源码+项目说明(含服务端+客户端).zip

    - 阻塞队列BlockingQueue,生产者消费者模式 - Selector - Channel - ByteBuffer - ProtoStuff 高性能序列化 - HttpClient连接池 - Spring依赖注入 - lombok简化POJO开发 - 原子变量 - 内置锁 - ...

    Chat:Java NIO+多线程实现聊天室

    Java基于多线程和NIO实现聊天室涉及到的技术点线程池ThreadPoolExecutor阻塞队列BlockingQueue,生产者消费者模式SelectorChannelByteBufferProtoStuff 高性能序列化HttpClient连接池Spring依赖注入lombok简化POJO...

    Java并发编程(学习笔记).xmind

    如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务 threadFactory 创建线程的工厂 handler 拒绝策略 unit 是一个枚举,表示 ...

    java线程池概念.txt

    //如果线程池线程大小大于核心线且且添加任务到线程失败,就把任务添加到阻塞队列 if (poolSize &gt;= corePoolSize || !addIfUnderCorePoolSize(command)) {//新建线程并启动 if (runState == RUNNING && workQueue...

Global site tag (gtag.js) - Google Analytics