- 浏览: 130461 次
- 性别:
- 来自: ...
文章分类
最新评论
随着多线程基础总结的增多,却明显的感觉知道的越来越少,好像转了一圈又回到了什么都不懂的起点。不过还是试着介绍一下队列的并发实现,努力尽快的驱散迷雾。队列这个数据结构已经很熟悉了,利用其先进先出的特性,多数生产消费模型的首选数据结构就是队列。对于有多个生产者和多个消费者线程的模型来说,最重要是他们共同访问的Queue是线程安全的。JDK中提供的线程安全的Queue的实现还是很丰富的:ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue,ConcurrentLinkedQueue等等,多数情况下使用这些数据结构编写并发程序足够了。ArrayBlockingQueue的实现之前的总结中已经有介绍,所以这次是分析一下LinkedBlockingQueue和ConcurrentLinkedQueue的源码实现。
首先从简单的开始,先看看LinkedBlockingQueue线程安全的实现。之所以介绍它是因为其实现比较典型,对比ArrayBlokcingQueue使用一个ReentrantLock和两个Condition维护内部的数组来说,它使用了两个ReentrantLock,并且分别对应一个Condition来实现对内部数据结构Node型变量的维护。
这里仅仅展示部分源码,主要的方法在后面的分析中列出。分析之前明确一个最基本的概念。天天念叨着编写线程安全的类,什么是线程安全的类?那就是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(改变,遍历,查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类的编写不是线程安全的。
明确了这个基本的概念就可以很好的理解这个Queue的实现为什么是线程安全的了。在LinkedBlockingQueue的所有共享的全局变量中,final声明的capacity在构造器生成实例时就成了不变量了。而final声明的count由于是AtomicInteger类型的,所以能够保证其操作的原子性。剩下的final的变量都是初始化成了不变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的。
首先从上面的源码可以看到insert(E x),extract()是真正的操作head,last来入队和出对的方法,但是由于是私有的,所以不能被直接访问,不用担心线程的问题。实际入队的公开的方法是put(E e),offer(E e)和offer(E e, long timeout, TimeUnit unit)。put(...)方法与offer(...)都是把新元素加入到队尾,所不同的是如果不满足条件put会把当前执行的线程扔到等待集中等待被唤醒继续执行,而offer则是直接退出,所以如果是需要使用它的阻塞特性的话,不能直接使用poll(...)。
put(...)方法中加入元素的操作使用this.putLock来限制多线程的访问,并且使用了可中断的方式:
代码段(1)是阻塞操作,代码段(2)是count递增和唤醒等待的操作。两者之间的insert(e)才是入队操作,其实际是操作的队尾引用last,并且没有牵涉到head。所以设计两个锁的原因就在这里!因为出队操作take(),poll()实际是执行extract()仅仅操作队首引用head。增加了this.takeLock这个锁,就实现了多个不同任务的线程入队的同时可以进行出对的操作,并且由于两个操作所共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。假设count换成int,则相应的putLock内的count++和takeLock内的count--有可能相互覆盖,最终造成count的值被腐蚀,故这种设计必须使用原子操作类。
我之前说过,保证类的线程安全只要保证head和last的操作的线程安全,也就是保证insert(E x)和extract()线程安全即可。那么上面的put方法中的代码段(1)放在a,b之间,代码段(2)放在c,d之间不是更好?毕竟锁的粒度越小越好。单纯的考虑count的话这样的改变是正确的,但是await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的那个线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。而这两段代码中包含notFull.await()和notFull.signal()这两句使得(1),(2)必须放在lock保护块内。这里说明主要是count本身并不需要putLock或者takeLock的保护,从
可以看出count的访问是不需要任何锁的。而在put等方法中,其与锁机制的混用很容易造成迷惑。最后put中的代码d的作用主要是一个低位及时通知的作用,也就是队列刚有值试图获得takeLock去通知等待集中的出队线程。因为c==0意味着count.getAndIncrement()原子递增成功,所以count > 0成立。类似作用的代码:
在take和poll中也有出现,实现了高位及时通知。
分析完了put,对应的offer,take,poll方法都是类似的实现。下面看看遍历队列的操作:
这个方法很简单主要是要清楚一点:这个操作执行时不允许其他线程再修改队首和队尾,所以使用了fullyLock去获取putLock和takeLock,只要成功则可以保证不会再有修改队列的操作。然后就是安心的遍历到最后一个元素为止了。
另外在offer(E e, long timeout, TimeUnit unit)这个方法中提供了带有超时的入队操作,如果一直不成功的话,它会尝试在timeout的时间内入队:
其内部循环使用notFull.awaitNanos(nanos)方法反复的计算剩余时间的大概值用于实现延时功能。nanos<=0则放弃尝试,直接退出。
整体而言,LinkedBlockingQueue的实现还是很清晰的。相对于后面要介绍的ConcurrentLinkedQueue来说,它属于简单的实现。这些看似复杂的数据结构的实现实质都是多线程的基础的综合应用。就好像数学中千变万化的难题其实都是基础公式的组合一样,如果有清晰的基础认知,还是能找到自己分析的思路的。本来是想从mina中找找类似的实现,不过很遗憾的是它好像仅仅实现了一个非线程安全的循环队列,然后在其基础上使用synchronized进行封装成线程安全的Queue。
首先从简单的开始,先看看LinkedBlockingQueue线程安全的实现。之所以介绍它是因为其实现比较典型,对比ArrayBlokcingQueue使用一个ReentrantLock和两个Condition维护内部的数组来说,它使用了两个ReentrantLock,并且分别对应一个Condition来实现对内部数据结构Node型变量的维护。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; /** * 节点数据结构 */ static class Node<E> { /** The item, volatile to ensure barrier separating write and read */ volatile E item; Node<E> next; Node(E x) { item = x; } } /** 队列的容量 */ private final int capacity; /** 持有节点计数器 */ private final AtomicInteger count = new AtomicInteger(0); /** 头指针 */ private transient Node<E> head; /** 尾指针 */ private transient Node<E> last; /** 用于读取的独占锁*/ private final ReentrantLock takeLock = new ReentrantLock(); /** 队列是否为空的条件 */ private final Condition notEmpty = takeLock.newCondition(); /** 用于写入的独占锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 队列是否已满的条件 */ private final Condition notFull = putLock.newCondition(); private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } private void insert(E x) { last = last.next = new Node<E>(x); } private E extract() { Node<E> first = head.next; head = first; E x = first.item; first.item = null; return x; } private void fullyLock() { putLock.lock(); takeLock.lock(); } private void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } ... }
这里仅仅展示部分源码,主要的方法在后面的分析中列出。分析之前明确一个最基本的概念。天天念叨着编写线程安全的类,什么是线程安全的类?那就是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(改变,遍历,查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类的编写不是线程安全的。
明确了这个基本的概念就可以很好的理解这个Queue的实现为什么是线程安全的了。在LinkedBlockingQueue的所有共享的全局变量中,final声明的capacity在构造器生成实例时就成了不变量了。而final声明的count由于是AtomicInteger类型的,所以能够保证其操作的原子性。剩下的final的变量都是初始化成了不变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的。
首先从上面的源码可以看到insert(E x),extract()是真正的操作head,last来入队和出对的方法,但是由于是私有的,所以不能被直接访问,不用担心线程的问题。实际入队的公开的方法是put(E e),offer(E e)和offer(E e, long timeout, TimeUnit unit)。put(...)方法与offer(...)都是把新元素加入到队尾,所不同的是如果不满足条件put会把当前执行的线程扔到等待集中等待被唤醒继续执行,而offer则是直接退出,所以如果是需要使用它的阻塞特性的话,不能直接使用poll(...)。
put(...)方法中加入元素的操作使用this.putLock来限制多线程的访问,并且使用了可中断的方式:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //----------------a putLock.lockInterruptibly();//随时保证响应中断 //--------b try { //*****************************(1)********************************* try { while (count.get() == capacity) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } //*****************************end********************************* insert(e);//真正的入队操作 //********************(2)********************** c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); //******************end********************** } finally { putLock.unlock(); } //-------------------------c if (c == 0) //---------------d signalNotEmpty(); }
代码段(1)是阻塞操作,代码段(2)是count递增和唤醒等待的操作。两者之间的insert(e)才是入队操作,其实际是操作的队尾引用last,并且没有牵涉到head。所以设计两个锁的原因就在这里!因为出队操作take(),poll()实际是执行extract()仅仅操作队首引用head。增加了this.takeLock这个锁,就实现了多个不同任务的线程入队的同时可以进行出对的操作,并且由于两个操作所共同使用的count是AtomicInteger类型的,所以完全不用考虑计数器递增递减的问题。假设count换成int,则相应的putLock内的count++和takeLock内的count--有可能相互覆盖,最终造成count的值被腐蚀,故这种设计必须使用原子操作类。
我之前说过,保证类的线程安全只要保证head和last的操作的线程安全,也就是保证insert(E x)和extract()线程安全即可。那么上面的put方法中的代码段(1)放在a,b之间,代码段(2)放在c,d之间不是更好?毕竟锁的粒度越小越好。单纯的考虑count的话这样的改变是正确的,但是await()和singal()这两个方法执行时都会检查当前线程是否是独占锁的那个线程,如果不是则抛出java.lang.IllegalMonitorStateException异常。而这两段代码中包含notFull.await()和notFull.signal()这两句使得(1),(2)必须放在lock保护块内。这里说明主要是count本身并不需要putLock或者takeLock的保护,从
public int size() { return count.get(); }
可以看出count的访问是不需要任何锁的。而在put等方法中,其与锁机制的混用很容易造成迷惑。最后put中的代码d的作用主要是一个低位及时通知的作用,也就是队列刚有值试图获得takeLock去通知等待集中的出队线程。因为c==0意味着count.getAndIncrement()原子递增成功,所以count > 0成立。类似作用的代码:
if (c == capacity) signalNotFull();
在take和poll中也有出现,实现了高位及时通知。
分析完了put,对应的offer,take,poll方法都是类似的实现。下面看看遍历队列的操作:
public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) a[k++] = p.item; return a; } finally { fullyUnlock(); } }
这个方法很简单主要是要清楚一点:这个操作执行时不允许其他线程再修改队首和队尾,所以使用了fullyLock去获取putLock和takeLock,只要成功则可以保证不会再有修改队列的操作。然后就是安心的遍历到最后一个元素为止了。
另外在offer(E e, long timeout, TimeUnit unit)这个方法中提供了带有超时的入队操作,如果一直不成功的话,它会尝试在timeout的时间内入队:
for (;;) { ...//入队操作 if (nanos <= 0) return false; try { nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to a non-interrupted thread throw ie; } }
其内部循环使用notFull.awaitNanos(nanos)方法反复的计算剩余时间的大概值用于实现延时功能。nanos<=0则放弃尝试,直接退出。
整体而言,LinkedBlockingQueue的实现还是很清晰的。相对于后面要介绍的ConcurrentLinkedQueue来说,它属于简单的实现。这些看似复杂的数据结构的实现实质都是多线程的基础的综合应用。就好像数学中千变万化的难题其实都是基础公式的组合一样,如果有清晰的基础认知,还是能找到自己分析的思路的。本来是想从mina中找找类似的实现,不过很遗憾的是它好像仅仅实现了一个非线程安全的循环队列,然后在其基础上使用synchronized进行封装成线程安全的Queue。
发表评论
文章已被作者锁定,不允许评论。
-
一道位操作的趣味编程题
2010-03-14 10:50 2080看到一道很有意思的编程题:大厅里有64盏灯,每盏灯都编 ... -
一道字符串截取的编程题
2010-03-11 10:52 2272最近接触到一道字符串截取的编程题:编写一个截取字符串的 ... -
一道多线程趣味热身题
2010-02-28 18:01 1912保持对知识点或者技术的熟悉度对于程序员至关重要,要学会 ... -
疑似Google多线程面试题的Java实现
2010-02-24 17:39 4903来到一个完全陌生的地方,即将一切从新开始,内心兴奋又忐 ... -
Mina的线程池实现分析(2)
2010-02-10 17:31 4516分析了I/O事件的存储,下面看看多个Worker同时工 ... -
Mina的线程池实现分析(1)
2010-02-10 17:28 11571线程池是并发应用中,为了减少每个任务调用的开销增强性能 ... -
多线程基础总结十一--ConcurrentLinkedQueue
2010-02-03 17:52 12835ConcurrentLinkedQueue充分使用了a ... -
LinkedBlockingQueue应用--生产消费模型简单实现
2010-01-29 20:45 8132之前介绍时LinkedBlockingQueue提到了 ... -
号称放倒一片的一道J2SE基础题的个人理解
2010-01-23 14:07 2788近日无意中看到一道Java基础题,号称在接受测试的10 ... -
多线程基础总结九--Mina窥探(1)
2010-01-21 23:46 5393一直以来的多线程的基础总结都是脱离应用的,但是要说多线 ... -
多线程基础总结八--ReentrantReadWriteLock
2010-01-15 23:22 7506说到ReentrantReadWriteLock,首先 ... -
多线程基础总结七--ReentrantLock
2010-01-09 23:17 7672之前总结了部分无锁机制的多线程基础,理想的状态当然是利 ... -
关于atomic问题的一点理解
2009-12-30 16:42 2434之前看到一个帖子是关于atomic使用的,当时没有仔细 ... -
多线程基础总结六--synchronized(2)
2009-12-18 18:45 1866早在总结一时,我就尽量的把synchronized的重点 ... -
多线程基础总结五--atomic
2009-12-17 19:46 3545在简单介绍java.util.c ... -
多线程基础总结四--ThreadLocal
2009-12-16 19:48 2715说到ThreadLocal,首先 ... -
多线程基础总结三--volatile
2009-12-15 20:09 2521前面的两篇总结简 ... -
多线程基础总结二--Thread
2009-12-12 23:27 2663对于Thread来说 ... -
多线程基础总结一--synchronized(1)
2009-12-12 23:23 3066最近写关于并发的小应 ... -
由destory-method引发的IOC容器设计的思考
2009-12-07 16:51 1678第一次读Spring的源 ...
相关推荐
linkedblockingqueue测试程序
【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 3.java内存模型以及happens-before规则 4.彻底理解synchronized 5.彻底理解volatile 6.你以为你真的了解final吗...
LinkedBlockingQueue 首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: BlockingQueue...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
JAVA线程基本学习, JAVA多线程的特性= 线程池: 本质上是一个对象池, 用来管理线程资源. 在任务执行前, 需要从线程池中拿出线程来执行. 在任务执行完成之后, 需要把线程放回线程池. 线程池好处: 降低资源的消耗...
LinkedBlockingQueuejava.pdf
单向链表源码 博文链接:https://uule.iteye.com/blog/1561753
多线程集合IOand so onJava中的集合?1.核心接口:Iterator(迭代器接口)Iterable(可迭代接口) -> Collection(集合接口)-> List(列表,线性表接口) :ArrayList、LinkedList-> Set(元素不重复的集合接口):HashSet、...
JDK容器学习之Queue:LinkedBlockingQueue
NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943
│ 高并发编程第二阶段04讲、多线程的休息室WaitSet详细介绍与知识点总结.mp4 │ 高并发编程第二阶段05讲、一个解释volatile关键字作用最好的例子.mp4 │ 高并发编程第二阶段06讲、Java内存模型以及CPU缓存不一致...
│ 高并发编程第二阶段04讲、多线程的休息室WaitSet详细介绍与知识点总结.mp4 │ 高并发编程第二阶段05讲、一个解释volatile关键字作用最好的例子.mp4 │ 高并发编程第二阶段06讲、Java内存模型以及CPU缓存不一致...
并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解
主要介绍了详细分析Java并发集合LinkedBlockingQueue的用法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
主要介绍了java中LinkedBlockingQueue与ArrayBlockingQueue的异同,需要的朋友可以参考下
NULL 博文链接:https://kanpiaoxue.iteye.com/blog/2101309
1. 编写程序,使用两个线程,一个队列, 其中一个线程从键盘读取数据,放入到队列中,直到读取的数据是字符串quit则结束,线程的任务就是循环读取数据直到... (b) 必须使用java.util.concurrent.LinkedBlockingQueue.
LinkedBlockingQueue :由容器/列表支持的有界阻塞队列 ConcurrentRingBuffer :由片支持的有界无锁队列 安装 go get - u github . com / theodesp / blockingQueues 用法 非阻塞API queue , _ := NewArrayBlock