`

java队列

    博客分类:
  • java
 
阅读更多

在JDK 5之前LinkedList就已经存在,而且本身实现都是一种双向队列。所以到了JDK 5以后就将LinkedList同时实现Deque接口,这样LinkedList就又属于Queue的一部分了。

通常情况下Queue都是靠链表结构实现的,但是链表意味着有一些而外的引用开销,如果是双向链表开销就更大了。所以为了节省内存,一种方式就是使用固定大小的数组来实现队列。在这种情况下队列的大小是固定,元素的遍历通过数组的索引进行,很显然这是一种双向链表的模型。ArrayDeque就是这样一种实现。

另外ArrayBlockingQueue也是一种数组实现的队列,但是却没有改造成双向,仅仅实现了BlockingQueue的模型。理论上和ArrayDeque一样也应该容易改造成双向的实现。

PriorityQueue和PriorityBlockingQueue实现了一种排序的队列模型。这很类似与SortedSet,通过队列的Comparator接口或者Comparable元素来排序元素。这种情况下元素在队列中的出入就不是按照FIFO的形式,而是根据比较后的自然顺序来进行。

CocurrentLinkedQueue是一种线程安全却非阻塞的FIFO队列,这种队列通常实现起来比较简单,但是却很有效。在接下来的章节会详细的描述它。

SynchronousQueue是一种特别的BlockingQueue,它只是把一个add/offer操作的元素直接移交给remove/take操作。也就是说它本身不会缓存任何元素,所以严格意义上说来讲并不是一种真正的队列。此队列维护一个线程列表,这些线程等待从队列中加入元素或者移除元素。简单的说,至少有一个remove/take操作时add/offer操作才能成功,同样至少有一个add/offer操作时remove/take操作才能成功。这是一种双向等待的队列模型,出队列等待加入等列,而入队列又等待出队列。这种队列的好处在于能够最大线程的保持吞吐量却又是线程安全的。所以对于一个需要快速处理的任务队列,SynchronousQueue是一个不错的选择。

 

BlockingQueue还有一种实现DelayQueue,这种实现允许每一个元素(Delayed)带有一个延时时间,当调用take/poll的时候会检测队列头元素这个时间是否<=0,如果满足就是说已经超时了,那么此元素就可以被移除了,否则就会等待。特别说明的是这个头元素应该是最先被超时的元素(这个时间是绝对时间)。这个类设计很巧妙,被用于ScheduledFutureTask来进行定时操作。希望后面会开辟一个章节讲讲这里面的想法。实在不行在讲线程池部分肯定会提到这个。

notify丢失通知问题

假设线程A因为某种条件在条件队列中等待,同时线程B因为另外一种条件在同一个条件队列中等待,也就是说线程A/B都被同一个Conditon.await()挂起,但是等待的条件不同。现在假设线程B的线程被满足,线程C执行一个notify操作,此时JVM从Conditon.await()的多个线程(A/B)中随机挑选一个唤醒,不幸的是唤醒了A。此时A的条件不满足,于是A继续挂起。而此时B仍然在傻傻的等待被唤醒的信号。也就是说本来给B的通知却被一个无关的线程持有了,真正需要通知的线程B却没有得到通知,而B仍然在等待一个已经发生过的通知。

如果使用notifyall,则能够避免此问题。notifyall会唤醒所有正在等待的线程,线程C发出的通知线程A同样能够收到,但是由于对于A没用,所以A继续挂起,而线程B也收到了此通知,于是线程B正常被唤醒。

 

既然notifyall能够解决单一notify丢失通知的问题,那么为什么不总是使用notifyall替换notify呢?

假设有N个线程在条件队列中等待,调用notifyall会唤醒所有线程,然后这N个线程竞争同一个锁,最多只有一个线程能够得到锁,于是其它线程又回到挂起状态。这意味每一次唤醒操作可能带来大量的上下文切换(如果N比较大的话),同时有大量的竞争锁的请求。这对于频繁的唤醒操作而言性能上可能是一种灾难。

如果说总是只有一个线程被唤醒后能够拿到锁,那么为什么不使用notify呢?所以某些情况下使用notify的性能是要高于notifyall的。

如果满足下面的条件,可以使用单一的notify取代notifyall操作:

相同的等待者,也就是说等待条件变量的线程操作相同,每一个从wait放回后执行相同的逻辑,同时一个条件变量的通知至多只能唤醒一个线程。

也就是说理论上讲在put/take中如果使用sinallAll唤醒的话,那么在清单2 中的notFull.singal就是多余的。

ArrayBlockingQueue 原理

 

在没有介绍ArrayBlockingQueue原理之前可以想象下,一个数组如何实现Queue的FIFO特性。首先,数组是固定大小的,这个是毫无疑问的,那么初始化就是所有元素都为null。假设数组一段为头,另一端为尾。那么头和尾之间的元素就是FIFO队列。

    1. 入队列就将尾索引往右移动一个,新元素加入尾索引的位置;
    2. 出队列就将头索引往尾索引方向移动一个,同时将旧头索引元素设为null,返回旧头索引的元素。
    3. 一旦数组已满,那么就不允许添加新元素(除非扩充容量)
    4. 如果尾索引移到了数组的最后(最大索引处),那么就从索引0开始,形成一个“闭合”的数组。
    5. 由于头索引和尾索引之间的元素都不能为空(因为为空不知道take出来的元素为空还是队列为空),所以删除一个头索引和尾索引之间的元素的话,需要移动删除索引前面或者后面的所有元素,以便填充删除索引的位置。
    6. 由于是阻塞队列,那么显然需要一个锁,另外由于只是一份数据(一个数组),所以只能有一个锁,也就是同时只能有一个线程操作队列。

有了上述几点分析,设计一个可阻塞的数组队列就比较容易了。

image

上图描述的ArrayBlockingQueue的数据结构。首先有一个数组E[],用来存储所有的元素。由于ArrayBlockingQueue最终设置为一个不可扩展大小的Queue,所以这里items就是初始化就固定大小的数组(final类型);另外有两个索引,头索引takeIndex,尾索引putIndex;一个队列的大小count;要支持阻塞就必须需要一个锁lock和两个条件(非空、非满),这三个元素都是不可变更类型的(final)。

由于只有一把锁,所以任何时刻对队列的操作都只有一个线程,这意味着对索引和大小的操作都是线程安全的,所以可以看到这个takeIndex/putIndex/count就不需要原子操作和volatile语义了。

清单1 描述的是一个可阻塞的添加元素过程。这与前面介绍的消费者、生产者模型相同。如果队列已经满了就挂起等待,否则就插入元素,同时唤醒一个队列已空的线程。对比清单2 可以看到是完全相反的两个过程。这在前面几种实现生产者-消费者模型的时候都介绍过了。

 

清单1 可阻塞的添加元素

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

 

 清单2 可阻塞的移除元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        try {
            while (count == 0)
                notEmpty.await();
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        E x = extract();
        return x;
    } finally {
        lock.unlock();
    }
}

需要注意到的是,尽管每次加入、移除一个元素使用的都是signal()通知,而不是signalAll()通知。我们参考上一节中notify替换notifyAll的原则:每一个await醒来的动作相同,每次最多唤醒一个线程来操作。显然这里符合这两种条件,因此使用signal要比使用signalAll要高效,并且是可靠的。

image上图描述了take()/put()的索引位置示意图。

一开始takeIndex/putIndex都在E/0位置,然后每加入一个元素offer/put,putIndex都增加1,也就是往后边移动一位;每移除一个元素poll/take,takeIndex都增加1,也是往后边移动一位,显然takeIndex总是在putIndex的“后边”,因为当队列中没有元素的时候takeIndex和putIndex相等,同时当前位置也没有元素,takeIndex也就是无法再往右边移动了;一旦putIndex/takeIndex移动到了最后面,也就是size-1的位置(这里size是指数组的长度),那么就移动到0,继续循环。循环的前提是数组中元素的个数小于数组的长度。整个过程就是这样的。可见putIndex同时指向头元素的下一个位置(如果队列已经满了,那么就是尾元素位置,否则就是一个元素为null的位置)。

 

比较复杂的操作时删除任意一个元素。清单3 描述的是删除任意一个元素的过程。显然删除任何一个元素需要遍历整个数组,也就是它的复杂度是O(n),这与根据索引从ArrayList中查找一个元素的复杂度O(1)相比开销要大得多。参考声明的结构图,一旦删除的是takeIndex位置的元素,那么只需要将takeIndex往“右边”移动一位即可;如果删除的是takeIndex和putIndex之间的元素怎么办?这时候就从删除的位置i开始,将i后面的所有元素位置都往“左”移动一位,直到putIndex为止。最终的结果是删除位置的所有元素都“后退”了一个位置,同时putIndex也后退了一个位置。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics