ArrayBlockingQueue
public interface BlockingQueue<E> extends Queue<E> { /** * 添加元素 */ boolean add(E e); //将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。 void put(E e) throws InterruptedException; //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要) boolean offer(E e); //将指定元素插入此队列中 /** * 删除元素 */ E take() throws InterruptedException; //检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待 E poll(long timeout, TimeUnit unit) throws InterruptedException; //检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要) boolean remove(Object o); //从队列中删除指定元素 E peek(); }
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- private final E[] items;
- /** items index for next take, poll or remove */
- private int takeIndex;
- /** items index for next put, offer, or add. */
- private int putIndex;
- private int count;
- /*ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,
- * 由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue
- */
- private final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
- ...
- }
/** * 构造函数 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
- final int inc(int i) {
- return (++i == items.length)? 0 : i;
- }
- //塞数据、取数据内部主要引用这两个方法
- //塞数据:
- private void insert(E x) {
- items[putIndex] = x;
- putIndex = inc(putIndex);
- ++count;
- notEmpty.signal();
- }
- //取数据:
- private E extract() {
- final E[] items = this.items;
- E x = items[takeIndex];
- items[takeIndex] = null;
- takeIndex = inc(takeIndex);
- --count;
- notFull.signal();
- return x;
- }
put()/take() 可打断,并且会等待
put()/take() 可打断,并且会等待 /** * 放数据 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } /** * 取数据 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } } public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } }
从 extract()方法里可见,tabkeIndex维护一个可以提取/移除元素的索引位置,因为takeIndex是从0递增的,所以这个类是FIFO队列。
putIndex维护一个可以插入的元素的位置索引。
count显然是维护队列中已经存在的元素总数。
ArrayBlockingQueue是JAVA5中的一个阻塞队列,能够自定义队列大小,当插入时,如果队列已经没有空闲位置,那么新的插入线程将阻塞到该队列,一旦该队列有空闲位置,那么阻塞的线程将执行插入。从队列中取数据为:take,放数据为:put
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
相关推荐
ArrayBlockingQueue源码分析.docx
ArrayBlockingQueue源码解析__动力节点共23页.pdf.zip
并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解
Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理,动力节点口口相传的Java黄埔军校
主要介绍了java并发之ArrayBlockingQueue详细介绍的相关资料,需要的朋友可以参考下
主要介绍了详细分析Java并发集合ArrayBlockingQueue的用法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue常用方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
主要介绍了java中LinkedBlockingQueue与ArrayBlockingQueue的异同,需要的朋友可以参考下
在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,是ArrayBlockingQueue, ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先...
Java并发编程学习宝典(漫画版),Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习宝典(漫画版)Java并发编程学习...
ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
今天小编就为大家分享一篇关于Java源码解析阻塞队列ArrayBlockingQueue功能简介,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9...
3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. ...
数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链...