`
j小虫
  • 浏览: 18483 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

阻塞队列

阅读更多

阻塞队列是jdk1.5新特性,本质就是一个队列,当队列为空时,消费方阻塞等待,直到队列有新的对象;队列满了时,生产方阻塞等待,直到队列有空位时;它实现了多线程的排队等待。

队列,Queue接口,有先进先出的特性,与List,Set在同一级别,都继承了Collection接口,而BlockingQueue继承了Queue。

 

Queue提供了以下方法:

     add,offer:add如果队列满了, 则抛出异常IIIegaISlabEepeplian,offer如果满则返回false

     remove,poll:移除一个元素,为空时,remove抛异常NoSuchElementException,poll返回null,可以配合queue的size作循环

 

 

BlockingQueue提供了以下方法

      put:类似add方法,但队列满了不抛异常,直接等待。

      take:类似poll方法,队列空时不返回null,直接阻塞等待。

 

Blocking队列分为4种

LinkedBlockingQueue 的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。

ArrayBlockingQueue 在构造时需要指定容量,并可以选择是否需要公平性,如果公平参数被设置 true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的:即等待时间最长的线程会先操 作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队列,此队列按 FIFO(先进先出)原则对元素进行排序

PriorityBlockingQueue 是一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,该 队列也没有上限(看了一下源码,PriorityBlockingQueue是对PriorityQueue的再次包装,是基于堆数据结构的,而 PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由 于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。

DelayQueue (基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。 下面是延迟接口:

  1. public   interface  Delayed  extends  Comparable<Delayed> {  
  2.      long  getDelay(TimeUnit unit);  

 

使用阻塞队列两个显著的好处就是:多线程操作共同的队列时不需要额外的同步,另外就是队列会自动平衡负载,即那边(生产与消费两边)处理快了就会被阻塞掉,从而减少两边的处理速度差距

 

 

最后copy了一段代码示例

  1. public   class  BlockingQueueTest {  
  2.     public   static   void  main(String[] args) {  
  3.         Scanner in = new  Scanner(System.in);  
  4.         System.out.print("Enter base directory (e.g. /usr/local/jdk5.0/src): " );  
  5.         String directory = in.nextLine();  
  6.         System.out.print("Enter keyword (e.g. volatile): " );  
  7.         String keyword = in.nextLine();  
  8.   
  9.         final   int  FILE_QUEUE_SIZE =  10 ; // 阻塞队列大小   
  10.         final   int  SEARCH_THREADS =  100 ; // 关键字搜索线程个数   
  11.   
  12.         // 基于ArrayBlockingQueue的阻塞队列   
  13.         BlockingQueue<File> queue = new  ArrayBlockingQueue<File>(  
  14.                 FILE_QUEUE_SIZE);  
  15.   
  16.         //只启动一个线程来搜索目录   
  17.         FileEnumerationTask enumerator = new  FileEnumerationTask(queue,  
  18.                 new  File(directory));  
  19.         new  Thread(enumerator).start();  
  20.           
  21.         //启动100个线程用来在文件中搜索指定的关键字   
  22.         for  ( int  i =  1 ; i <= SEARCH_THREADS; i++)  
  23.             new  Thread( new  SearchTask(queue, keyword)).start();  
  24.     }  
  25. }  
  26. class  FileEnumerationTask  implements  Runnable {  
  27.     //哑元文件对象,放在阻塞队列最后,用来标示文件已被遍历完   
  28.     public   static  File DUMMY =  new  File( "" );  
  29.   
  30.     private  BlockingQueue<File> queue;  
  31.     private  File startingDirectory;  
  32.   
  33.     public  FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) {  
  34.         this .queue = queue;  
  35.         this .startingDirectory = startingDirectory;  
  36.     }  
  37.   
  38.     public   void  run() {  
  39.         try  {  
  40.             enumerate(startingDirectory);  
  41.             queue.put(DUMMY);//执行到这里说明指定的目录下文件已被遍历完   
  42.         } catch  (InterruptedException e) {  
  43.         }  
  44.     }  
  45.   
  46.     // 将指定目录下的所有文件以File对象的形式放入阻塞队列中   
  47.     public   void  enumerate(File directory)  throws  InterruptedException {  
  48.         File[] files = directory.listFiles();  
  49.         for  (File file : files) {  
  50.             if  (file.isDirectory())  
  51.                 enumerate(file);  
  52.             else   
  53.                 //将元素放入队尾,如果队列满,则阻塞   
  54.                 queue.put(file);  
  55.         }  
  56.     }  
  57. }  
  58. class  SearchTask  implements  Runnable {  
  59.     private  BlockingQueue<File> queue;  
  60.     private  String keyword;  
  61.   
  62.     public  SearchTask(BlockingQueue<File> queue, String keyword) {  
  63.         this .queue = queue;  
  64.         this .keyword = keyword;  
  65.     }  
  66.   
  67.     public   void  run() {  
  68.         try  {  
  69.             boolean  done =  false ;  
  70.             while  (!done) {  
  71.                 //取出队首元素,如果队列为空,则阻塞   
  72.                 File file = queue.take();  
  73.                 if  (file == FileEnumerationTask.DUMMY) {  
  74.                     //取出来后重新放入,好让其他线程读到它时也很快的结束   
  75.                     queue.put(file);  
  76.                     done = true ;  
  77.                 } else   
  78.                     search(file);  
  79.             }  
  80.         } catch  (IOException e) {  
  81.             e.printStackTrace();  
  82.         } catch  (InterruptedException e) {  
  83.         }  
  84.     }  
  85.     public   void  search(File file)  throws  IOException {  
  86.         Scanner in = new  Scanner( new  FileInputStream(file));  
  87.         int  lineNumber =  0 ;  
  88.         while  (in.hasNextLine()) {  
  89.             lineNumber++;  
  90.             String line = in.nextLine();  
  91.             if  (line.contains(keyword))  
  92.                 System.out.printf("%s:%d:%s%n" , file.getPath(), lineNumber,  
  93.                         line);  
  94.         }  
  95.         in.close();  
  96.     }  

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics