`
uule
  • 浏览: 6305904 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

ArrayBlockingQueue

 
阅读更多

 ArrayBlockingQueue

     

public interface BlockingQueue<E> extends Queue<E> {  
     
    /**
	* 添加元素
	*/
    boolean add(E e);       //将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。     
          
    void put(E e) throws InterruptedException;   //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要)  
         
    boolean offer(E e);    //将指定元素插入此队列中  
      
	  	  
    /**
	* 删除元素
	*/   
    
    E take() throws InterruptedException;   //检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待  
         
    E poll(long timeout, TimeUnit unit)  
        throws InterruptedException;    //检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要)  
                
    boolean remove(Object o);       //从队列中删除指定元素  
		
	E peek();
}  

 

Java代码  收藏代码
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
  2.         implements BlockingQueue<E>, java.io.Serializable {  
  3.   
  4.     private final E[] items;  
  5.       
  6.     /** items index for next take, poll or remove */  
  7.     private int takeIndex;  
  8.       
  9.     /** items index for next put, offer, or add. */  
  10.     private int putIndex;  
  11.   
  12.     private int count;  
  13.       
  14.     /*ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象, 
  15.     * 由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue 
  16.     */  
  17.     private final ReentrantLock lock;  
  18.   
  19.     private final Condition notEmpty;  
  20.   
  21.     private final Condition notFull;  
  22.       
  23.     ...   
  24. }  

 

 /**
 *  构造函数
 */
 public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }


    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
			
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

 

Java代码  收藏代码
  1. final int inc(int i) {  
  2.     return (++i == items.length)? 0 : i;  
  3. }     
  4.   
  5. //塞数据、取数据内部主要引用这两个方法  
  6.   
  7. //塞数据:  
  8. private void insert(E x) {  
  9.     items[putIndex] = x;  
  10.     putIndex = inc(putIndex);  
  11.     ++count;  
  12.     notEmpty.signal();  
  13. }  
  14.   
  15.       
  16. //取数据:  
  17. private E extract() {  
  18.     final E[] items = this.items;  
  19.     E x = items[takeIndex];  
  20.     items[takeIndex] = null;  
  21.     takeIndex = inc(takeIndex);  
  22.     --count;  
  23.     notFull.signal();  
  24.     return x;  
  25. }  

 put()/take() 可打断,并且会等待

 

 

put()/take() 可打断,并且会等待

/**
* 放数据
*/

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
		
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }
	
	
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

/**
* 取数据
*/

 public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
	
	
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }	
	
 public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : items[takeIndex];
        } finally {
            lock.unlock();
        }
    }		
	
public boolean remove(Object o) {
        if (o == null) return false;
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int k = 0;
            for (;;) {
                if (k++ >= count)
                    return false;
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                i = inc(i);
            }

        } finally {
            lock.unlock();
        }
    }

 

从 extract()方法里可见,tabkeIndex维护一个可以提取/移除元素的索引位置,因为takeIndex是从0递增的,所以这个类是FIFO队列

putIndex维护一个可以插入的元素的位置索引。

count显然是维护队列中已经存在的元素总数。

 

ArrayBlockingQueue是JAVA5中的一个阻塞队列,能够自定义队列大小,当插入时,如果队列已经没有空闲位置,那么新的插入线程将阻塞到该队列,一旦该队列有空闲位置,那么阻塞的线程将执行插入。从队列中取数据为:take,放数据为:put

 

 

     基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

 

  ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics