`
Donald_Draper
  • 浏览: 950949 次
社区版块
存档分类
最新评论

DelayQueue解析

    博客分类:
  • JUC
阅读更多
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
ArrayBlockingQueue解析:http://donald-draper.iteye.com/blog/2364034
PriorityBlockingQueue解析:http://donald-draper.iteye.com/blog/2364100
SynchronousQueue解析上-TransferStack:http://donald-draper.iteye.com/blog/2364622
SynchronousQueue解析下-TransferQueue:http://donald-draper.iteye.com/blog/2364842
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} of
 * <tt>Delayed</tt> elements, in which an element can only be taken
 * when its delay has expired.  The [i]head[/i] of the queue is that
 * <tt>Delayed</tt> element whose delay expired furthest in the
 * past.  If no delay has expired there is no head and <tt>poll</tt>
 * will return <tt>null</tt>. Expiration occurs when an element's
 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
 * than or equal to zero.  Even though unexpired elements cannot be
 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
 * treated as normal elements. For example, the <tt>size</tt> method
 * returns the count of both expired and unexpired elements.
 * This queue does not permit null elements.
 *
 DelayQueue是一个延时元素无界非阻塞队列,在延时队列中,如果有一个延时时间耗尽,
 则将会被消费take。队列的头部是延时时间即将过期或过期最早的元素。如果队列中没有
 过期元素,那么poll操作将会返回null。判断一个元素是否过期的标准是,调用元素的
getDelay方法,如果返回的延时时间等于或小于零,即过期。未过期的元素不能被take或poll操作从队列中移除,
将被做为队列普通元素看待。size返回的队列中的过期与未过期元素。队列不允许为null元素的存在。
 * <p>This class and its iterator implement all of the
 * [i]optional[/i] methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    //可重入锁
    private transient final ReentrantLock lock = new ReentrantLock();
    //优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
      /**
     * Thread designated to wait for the element at the head of
     * the queue.  This variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
     * minimize unnecessary timed waiting.  When a thread becomes
     * the leader, it waits only for the next delay to elapse, but
     * other threads await indefinitely.  The leader thread must
     * signal some other thread before returning from take() or
     * poll(...), unless some other thread becomes leader in the
     * interim.  Whenever the head of the queue is replaced with
     * an element with an earlier expiration time, the leader
     * field is invalidated by being reset to null, and some
     * waiting thread, but not necessarily the current leader, is
     * signalled.  So waiting threads must be prepared to acquire
     * and lose leadership while waiting.
     这个线程用于等待队列头元素是否过期。本设计是 Leader-Follower模式的
     变种,用为最小化必须要时间的等待。当一个线程成为leader时,则将会等待
     下一个过期的元素,而其他线程等待是不确定性的。leader线程在take或poll操作
     返回后,必须唤醒其他线程,除非其他线程在过渡期成为了leader。无论任何时候,
     当一个更早过期的元素成为队头时,当前leader将会由于无效被设置为null,其他等待线程
     将会被唤醒,而不是当前leader。
     */
    private Thread leader = null;

    /**
     * Condition signalled when a newer element becomes available
     * at the head of the queue or a new thread may need to
     * become leader.
     当一个队列头的元素可以用,或一个新线程成为leader,条件将会触发
     */
    private final Condition available = lock.newCondition();
     /**
     * Creates a new <tt>DelayQueue</tt> that is initially empty.
     构造为空队列
     */
    public DelayQueue() {}
  }

从上面可以看出DelayQueue内在一个优先级队列,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
再来看相关操作:
   
 /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return <tt>true</tt> (as specified by {@link Collection#add})
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        return offer(e);
    }
/**
     * Inserts the specified element into this delay queue. As the queue is
     * unbounded this method will never block.
     *
     * @param e the element to add
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) {
        offer(e);
    }
    /**
     * Inserts the specified element into this delay queue. As the queue is
     * unbounded this method will never block.
     *
     * @param e the element to add
     * @param timeout This parameter is ignored as the method never blocks
     * @param unit This parameter is ignored as the method never blocks
     * @return <tt>true</tt>
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }

add,put,超时offer操作都是委托给offer操作,来看offer操作:

  
 /**
     * Inserts the specified element into this delay queue.
     *
     * @param e the element to add
     * @return <tt>true</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
	//加锁
        lock.lock();
        try {
	    //委托给内部优先级队列的offer操作
            q.offer(e);
            if (q.peek() == e) {
	        //检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }

    }
从方法来看:
offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;
入队列后,检查队头元素,如果队头元素为当前先添加的元素,
则设置leader为null,唤醒等待available条件的线程。
再来看take操作:
/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue.
     *
     过期元素出队列,如果需要线程等待直到队列中有一个过期元素可以利用
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
	//以可中断方式获取锁
        lock.lockInterruptibly();
        try {
	    //自旋等待,直到有过期元素可以用
            for (;;) {
                E first = q.peek();//获取队头元素
                if (first == null)
                    available.await();//如果队头为null,则等待available条件
                else {
		    //如果队头元素不为空,则获取元素延时时间
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
		        //如果过期,则出队列
                        return q.poll();
                    else if (leader != null)
		        //如果未过期,且leader不为null,则等待available条件
                        available.await();
                    else {
		        //如果未过期,且leader为null,则选举当前线程为leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
			    //以delay为超时时间,超时等待触发available条件
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
			        //当前线程成为leader,超时等待后,take成功,则重置leader为null
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
	        //自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程
                available.signal();
            lock.unlock();
        }
    }


从take方法,可以看出,首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;
自旋的过程为获取队头元素,如果队头为null,则等待available条件,
如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,
如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,
以元素的delay为超时时间,超时等待触发available条件,
超时时间过后触发available条件,最后判断当前线程是否为leader
如果当前线程成为leader,超时等待后,take成功,则重置leader为null;
在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。

再看超时poll:
 
 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element with an expired delay is available on this queue,
     * or the specified wait time expires.
     *
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element with
     *         an expired delay becomes available
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
		        //这一点与take不同,take为等待,poll为超时等待
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    if (nanos < delay || leader != null)
		         //这一点与take不同,take为等待,poll为超时等待,如果nanos小于元素的延时时间,等待时间为超时等待时间nanos
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
			    //如果nanos大于元素的延时时间,在等待元素时间过期后,再等待nanos-delay+timeLeft(元素剩余等待时间)
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,
而超时poll为超时等待。
再看poll操作:
  /**
     * Retrieves and removes the head of this queue, or returns <tt>null</tt>
     * if this queue has no elements with an expired delay.
     *
     * @return the head of this queue, or <tt>null</tt> if this
     *         queue has no elements with an expired delay
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
	    //如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }


再看peek操作:
/**
     * Retrieves, but does not remove, the head of this queue, or
     * returns <tt>null</tt> if this queue is empty.  Unlike
     * <tt>poll</tt>, if no expired elements are available in the queue,
     * this method returns the element that will expire next,
     * if one exists.
     *
     查看队列头元素,不会移除元素,如果队列为空,返回null,如果没有元素过期,
     则将会返回下一个即将过期的元素
     * @return the head of this queue, or <tt>null</tt> if this
     *         queue is empty.
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	    //委托给内部优先级队列,返回队头元素
            return q.peek();
        } finally {
            lock.unlock();
        }
    }


再看remove操作:
   
 /**
     * Removes a single instance of the specified element from this
     * queue, if it is present, whether or not it has expired.
     委托给内部优先级队列,无论元素是否过期,只要元素相等,则移除
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

peek和remove操作直接委托给内部优先级队列。
drainTo操作:
 /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            for (;;) {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    break;
                c.add(q.poll());
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = 0;
            while (n < maxElements) {
                E first = q.peek();
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    break;
                c.add(q.poll());
                ++n;
            }
            return n;
        } finally {
            lock.unlock();
        }
    }

drainTo操作是有peek和poll操作协作完成。


clear:

/**
     * Atomically removes all of the elements from this delay queue.
     * The queue will be empty after this call returns.
     * Elements with an unexpired delay are not waited for; they are
     * simply discarded from the queue.
     */
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.clear();
        } finally {
            lock.unlock();
        }
    }


remainingCapacity:
  
 /**
     * Always returns <tt>Integer.MAX_VALUE</tt> because
     * a <tt>DelayQueue</tt> is not capacity constrained.
     *
     为整数最大值
     * @return <tt>Integer.MAX_VALUE</tt>
     */
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }


size:
 public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }



总结:
DelayQueue内在一个优先级队列PriorityQueue,用于存放元素,一个available条件(ReentrantLock)用于通知一个队列头的元素可以用,或一个新线程成为leader事件。
     offer操作首先加锁,入队列操作委托给内部优先级队列的offer操作;入队列后,检查队头元素,如果队头元素为当前先添加的元素,则设置leader为null,唤醒等待available条件的线程。add,put,超时offer操作都是委托给offer操作。
     take操作首先以可中断方式获取锁,自旋等待,直到有过期元素可以用;自旋的过程为获取队头元素,如果队头为null,则等待available条件,如果队头元素不为空,则获取元素延时时间,如果过期,则出队列,如果未过期,且leader不为null,则等待available条件,
如果未过期,且leader为null,则选举当前线程为leader,以元素的delay为超时时间,超时等待触发available条件,超时时间过后触发available条件,最后判断当前线程是否为leader,如果当前线程成为leader,超时等待后,take成功,则重置leader为null;在自旋结束后,如果leader为null,则队头元素不为null,触发available,唤醒等待available条件的线程。超时poll与take逻辑上基本一致,不同的是在等待available条件上,take为等待,而超时poll为超时等待。poll操作为如果队列为空,或队列头元素为过期,则返回null,否则返回队头的过期元素。
    peek和remove,clear,size操作直接委托给内部优先级队列。drainTo操作是有peek和poll操作协作完成。


附:
//Delayed
package java.util.concurrent;

import java.util.*;

/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 Delayed是一个用于标记一个线程或动作在多少延时后,被执行的迷你接口。
 * <p>An implementation of this interface must define a
 * <tt>compareTo</tt> method that provides an ordering consistent with
 * its <tt>getDelay</tt> method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

有了前面队列文章的分析,LinkedBlockingDeque就没有什么值得看的,贴出源码,一看定义,就知道什么意思了,LinkedBlockingDeque就不在说了。
//LinkedBlockingDeque
package java.util.concurrent;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
 * linked nodes.
 *
 * <p> The optional capacity bound constructor argument serves as a
 * way to prevent excessive expansion. The capacity, if unspecified,
 * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
 * dynamically created upon each insertion unless this would bring the
 * deque above capacity.
 *
 * <p>Most operations run in constant time (ignoring time spent
 * blocking).  Exceptions include {@link #remove(Object) remove},
 * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
 * #removeLastOccurrence removeLastOccurrence}, {@link #contains
 * contains}, {@link #iterator iterator.remove()}, and the bulk
 * operations, all of which run in linear time.
 *
 * <p>This class and its iterator implement all of the
 * <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.6
 * @author  Doug Lea
 * @param <E> the type of elements held in this collection
 */
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>,  java.io.Serializable {

    /*
     * Implemented as a simple doubly-linked list protected by a
     * single lock and using conditions to manage blocking.
     *
     * To implement weakly consistent iterators, it appears we need to
     * keep all Nodes GC-reachable from a predecessor dequeued Node.
     * That would cause two problems:
     * - allow a rogue Iterator to cause unbounded memory retention
     * - cause cross-generational linking of old Nodes to new Nodes if
     *   a Node was tenured while live, which generational GCs have a
     *   hard time dealing with, causing repeated major collections.
     * However, only non-deleted Nodes need to be reachable from
     * dequeued Nodes, and reachability does not necessarily have to
     * be of the kind understood by the GC.  We use the trick of
     * linking a Node that has just been dequeued to itself.  Such a
     * self-link implicitly means to jump to "first" (for next links)
     * or "last" (for prev links).
     */

    /*
     * We have "diamond" multiple interface/abstract class inheritance
     * here, and that introduces ambiguities. Often we want the
     * BlockingDeque javadoc combined with the AbstractQueue
     * implementation, so a lot of method specs are duplicated here.
     */

    private static final long serialVersionUID = -387911632671998426L;

    /** Doubly-linked list node class */
    static final class Node<E> {
        /**
         * The item, or null if this node has been removed.
         */
        E item;

        /**
         * One of:
         * - the real predecessor Node
         * - this Node, meaning the predecessor is tail
         * - null, meaning there is no predecessor
         */
        Node<E> prev;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head
         * - null, meaning there is no successor
         */
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    /**
     * Pointer to first node.
     * Invariant: (first == null && last == null) ||
     *            (first.prev == null && first.item != null)
     */
    transient Node<E> first;

    /**
     * Pointer to last node.
     * Invariant: (first == null && last == null) ||
     *            (last.next == null && last.item != null)
     */
    transient Node<E> last;

    /** Number of items in the deque */
    private transient int count;

    /** Maximum number of items in the deque */
    private final int capacity;

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

    /**
     * Creates a {@code LinkedBlockingDeque} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this deque
     * @throws IllegalArgumentException if {@code capacity} is less than 1
     */
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
}








Exchanger为双向队列模式同步队列的,这个我们抽时间在研究:
我们把Exchanger的JavaDoc放在这里

package java.util.concurrent;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.LockSupport;

/**
 * A synchronization point at which threads can pair and swap elements
 * within pairs.  Each thread presents some object on entry to the
 * {@link #exchange exchange} method, matches with a partner thread,
 * and receives its partner's object on return.  An Exchanger may be
 * viewed as a bidirectional form of a {@link SynchronousQueue}.
 * Exchangers may be useful in applications such as genetic algorithms
 * and pipeline designs.
 *
 * <p><b>Sample Usage:</b>
 * Here are the highlights of a class that uses an {@code Exchanger}
 * to swap buffers between threads so that the thread filling the
 * buffer gets a freshly emptied one when it needs it, handing off the
 * filled one to the thread emptying the buffer.
 * <pre>{@code
 * class FillAndEmpty {
 *   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
 *   DataBuffer initialEmptyBuffer = ... a made-up type
 *   DataBuffer initialFullBuffer = ...
 *
 *   class FillingLoop implements Runnable {
 *     public void run() {
 *       DataBuffer currentBuffer = initialEmptyBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           addToBuffer(currentBuffer);
 *           if (currentBuffer.isFull())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       } catch (InterruptedException ex) { ... handle ... }
 *     }
 *   }
 *
 *   class EmptyingLoop implements Runnable {
 *     public void run() {
 *       DataBuffer currentBuffer = initialFullBuffer;
 *       try {
 *         while (currentBuffer != null) {
 *           takeFromBuffer(currentBuffer);
 *           if (currentBuffer.isEmpty())
 *             currentBuffer = exchanger.exchange(currentBuffer);
 *         }
 *       } catch (InterruptedException ex) { ... handle ...}
 *     }
 *   }
 *
 *   void start() {
 *     new Thread(new FillingLoop()).start();
 *     new Thread(new EmptyingLoop()).start();
 *   }
 * }
 * }</pre>
 *
 * <p>Memory consistency effects: For each pair of threads that
 * successfully exchange objects via an {@code Exchanger}, actions
 * prior to the {@code exchange()} in each thread
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * those subsequent to a return from the corresponding {@code exchange()}
 * in the other thread.
 *
 * @since 1.5
 * @author Doug Lea and Bill Scherer and Michael Scott
 * @param <V> The type of objects that may be exchanged
 */
public class Exchanger<V> 


  • 大小: 39.1 KB
  • 大小: 24.2 KB
  • 大小: 45.3 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics