`
zhuhui_zj
  • 浏览: 36187 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

并发容器分析(二)--ArrayBlockingQueue

阅读更多

一、简介

    BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,让容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。ArrayBlockingQueue是对BlockingQueue的一个数组实现,它使用一把全局的锁并行对queue的读写操作,同时使用两个Condition阻塞容量为空时的取操作和容量满时的写操作。

二、具体实现

    ArrayBlockingQueue底层定义如下:

 

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    // 使用循环数组来实现queue,初始时takeIndex和putIndex均为0
    private final E[] items;
    private transient int takeIndex;
    private transient int putIndex;
    private int count;

    // 用于并发的锁和条件
   private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    /**
     * 循环数组
     * Circularly increment i.
     */
    final int inc(int i) {
        return (++i == items.length)? 0 : i;
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        // 分配锁及该锁上的condition
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

  ...
}

   ArrayBlockingQueue的取操作:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
       // 激发notFull条件
        notFull.signal();
        return x;
    }

     /**
        * condition的await的语义如下:
     * 与condition相关的锁以原子方式释放,并禁用该线程
     * 方法返回时,线程必须获得与该condition相关的锁
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
            	  // 等待notEmpty的条件
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

  ...
}

   ArrayBlockingQueue的写操作:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
            	  // 等待notFull条件
           while (count == items.length)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(o);
        } finally {
            lock.unlock();
        }
    }

  ...
}

    注意:ArrayBlockingQueue在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式。

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics