Java并发-ReentrantLock源码分析
在Java5.0之前,在协调对共享对象的访问时可以使用的机制只有synchronized和volatile。Java 5.0增加了一种新的机制:ReentrantLock。ReentrantLock没有使用任何内建机制,仅仅从语言层面上实现了与synchronized相同的语义,而且还提供了跟多的功能,这一切都要归功于Java并发大师Doug Lea。在Java 5.0中,Doug Lea写了一个同步框架,AbstractQueuedSynchronizer(可以参考Doug Lea的论文:http://ifeve.com/aqs)。在AbstractQueuedSynchronizer(简称AQS)中,Doug Lea提供了两种管理状态的机制:独占模式和共享模式,在java.util.concurrency包里,很多类都是依赖与AQS构建的,关于AQS的源码,请参考《Java并发-AbstractQueuedSynchronizer源码分析》。现在我们看一下ReentrantLock内容。
ReentrantLock的结构
ReentrantLock是一个可重入的互斥锁,与内置的synchronized具有相同的语义,但是功能更强大。关于显式锁的用法我们不做介绍,重点是关注它是如何实现的。如Doug Lea所建议,ReentrantLock有一个内部类继承自AQS,就是Sync,锁的行为和实现都是由这个类来完成,ReentrantLock有一个Sync的引用。ReentrantLock支持公平和非公平两种策略,这两种策略分别由Sync的子类来实现。所以ReentrantLock的结构是这样的:
(1)ReentrantLock持有一个Sync的引用,具体的行为由Sync来完成,ReentrantLock的方法仅仅是简单的调用Sync的方法;
(2)Sync继承自AQS,它实现同步器的基本功能;
(3)Sync有两个子类,NonfairSync实现非公平策略下的独占锁,公平策略由FairSync来实现。
ReentrantLock有两个构造方法,默认是非公平策略,还有一个带参数以控制公平策略,这部分代码如下:
/** * 一个可重入的互斥锁,与内置的synchronized具有相同的语义,但是功能更强大 */ public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** 所有的实现都有sync来实现 */ private final Sync sync; /** * 无参构造方法,使用非公平策略 */ public ReentrantLock() { sync = new NonfairSync(); } /**true 公平策略;false 非公平策略 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }从上面的代码片段中可以看出,如果使用公平模式,则创建FairSync对象,如果使用非公平策略,则使用NonfairSync对象。关于ReentrantLock的源码分析,重点在于如何获得锁,以及如何释放锁。
实现方式
AQS属于定义的模板方法,在获取状态和释放状态时,所有继承自AQS的类都是调用AQS的acquire(int)和release(int)方法,这两个方法的逻辑都已经写好,对于不同的实现,子类只需要重写tryAcquire和tryRelease方法即可。AQS的acquire和release方法如下:
/** * 以独占模式获取,忽略中断。 * 首先调用tryAcquire尝试获取,如果失败则加入队列 * 然后再次尝试获取,直到成功。 * 此方法可用于实现Lock.lock */ public final void acquire(int arg) { //首先尝试获取,失败后加入队列 //再次尝试获取,直到成功后再返回 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * 释放独占的资源,此方法可用来实现Lock.unlock操作 */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }在AQS的内部维护着一个同步队列,如果当前线程获取不到状态,则进入同步队列,并挂起,等待状态可用时被唤醒,具体的请参考AQS的实现,在这里我们需要知道:
(1)AQS里维护了一个状态state,int型,通过状态可以达到线程的访问控制,AQS提供了getState(),setState(),compareAndSet()方法来操作次状态;
(2)AQS维护了一个同步队列,如果当前线程获取不到状态,则进入同步队列中等待;
对于ReentrantLock,这个状态就代表锁,由于是可重入的,state的值就代表线程持有锁的次数。所以,真正对我们重要的是tryAcquire和tryRelease的行为。此时,我们有两种方式来继续我们分析,一种是按内部类来分析,先分析Sync,在分析它的两个子类;一种是按功能来分析,先分析加锁的实现,在分析解锁的实现。由于ReentrantLock是重点,所以我还是倾向于从实现锁的角度来分析。
获取锁
ReentrantLock获取锁的代码如下:
/** * 获得锁,如果锁没有被占用,获得锁,计数为1,立即返回; * 如果锁是有当前线程占用,计数加1,返回; * 如果锁被其他线程占用,阻塞; */ public void lock() { sync.lock(); }可以看到它果然是把实现代理给了同步器,在前面关于ReentrantLock的结构中,我们已经知道了,sync要么是FairSync的实例,要么是NonfairSync的实例。那我们就分别看看公平锁和非公平锁的实现。
公平策略的锁获取
/** AQS acquire(1) */ final void lock() { acquire(1); } /** * 公平的tryAcquire(),成功获得锁,返回true,不成功,返回false,不会阻塞; * 由于是公平的,所以在锁可用时,需要判断当前线程是否是等待时间最长的 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //c==0说明没有线程获得锁,这里是公平锁的实现,还有判断是否有线程比当前线程等待的时间长 //如果当前线程就是等待时间最长的,使用CAS修改状态,成功则说明获得锁,返回true if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }//如果锁已被占用,判断是否是当前线程占用 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }前面已经说了,acquire是一个模板方法,它负责完成状态的获取,根据子类的tryAcquire来实现不同的目的,所以重点是tryAcquire。公平的意思是,当出现多线程争用时,锁倾向于将访问权授予等待时间最长的线程。上面tryAcquire包含的逻辑是:
(1)首先获得当前线程和锁的状态
(2)如果锁是可用的,但是还不能直接获取,因为实现的是公平策略,要先看看AQS的同步队列中是否有比自己等待时间还长的线程,如果没有,则使用CAS修改锁的状态,修改成功,则获得锁成功;修改失败,则说明获得锁失败;
(3)如果锁已经被占用,由于是可重入的,所以要判断锁是不是被当前线程锁占用。如果是,则将锁计数加1,返回true,表示获取锁成功;
非公平策略的锁获取
/** * 使用闯入策略来实现非公平的锁,如果闯入失败,则进入正常的获取流程 * 首先就去尝试修改状态,如果修改成功,则获得锁,如果没有成功,就去同步队列排队把 */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /** * 调用Sync的nonfairTryAcquire(acquires) */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }非公平策略使用闯入来实现锁的获取,如果当前线程需要获得锁,首先就去尝试修改锁的状态,如果修改成功,则获得锁成功,修改失败,则进入同步队列中等待。公平策略和非公平策略的却别就是是否使用闯入,如果是非公平策略,线程二话不说,上来就尝试获取锁,而公平策略则是直接进入同步队列中去。在上面的代码中,tryAcquire又由nonfairTryAcquire来实现,这个方法是Sync实现的,不明白Doug Lea为什么要把非公平策略的实现放到父类。还是来看看nonfairTryAcquire方法吧:
/** * 非公平的tryLock,tryAcquire的行为由子类实现,线程不会被阻塞。首先判断状态 * 1.如果状态是0,说明锁可用,则使用CAS修改状态,如果成功,由当前线程获得,则返回true * 2.如果锁已经被占用,判断锁的拥有者是不是当前线程,如果是,设置状态,返回true,注意溢出 * 3.如果锁已经被其他线程占用,获取失败,返回false */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果锁可用,设置state的状态,返回 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }//如果锁已经被占用了,那么判断是不是由当前线程占用的,如果是,设置状态 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }和公平策略的tryAcquire差不多,所不同的就是当锁可用时,公平策略需要判断是否有等待时间更长的线程,而非公平策略则直接尝试修改以获得锁。
释放锁
使用unlock释放锁,公平和非公平在释放锁的行为是一样的。ReentrantLock的unlock方法:
/** * 尝试去释放锁,调用AQS的release(1),AQS.release(1)的行为: * 首先调用tryRelease(1),由于Sync重写了tryRelease;如果返回true表示锁可用 * 然后,去唤醒后继节点。 */ public void unlock() { sync.release(1); }还是调用AQS的release方法,AQS的release去调用具体的tryRelease方法。tryRelease由Sync来实现,其代码如下:
/** * 尝试释放锁,设置锁的计数。经过释放后如果锁可用返回true,否则返回false */ protected final boolean tryRelease(int releases) { int c = getState() - releases; //如果锁不是由当前线程持有,出错了,抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c==0说明没有线程占用锁 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }其包含的逻辑:
(1)首先获得锁被持有的次数,减去1,在ReentrantLock中acquire和release中的所有参数都被写死为1,因为一次lock代表获取锁一次,unlock代表解锁1次;
(2)如果锁不是由当前线程持有的,抛异常;
(3)判断锁是否被持有,如果锁已经空闲,则返回true,如果锁仍然被占有,返回false。
到这里就把ReentrantLock最基本的实现原理讲明白了,可以看到最关键的实现还是由AQS来完成的。ReentrantLock还包含了tryLock()方法,tryLock是通过Sync的nonfairTryAcquire来实现的,如果锁可用则尝试获取,不论成功或者失败,都立即返回。还有可中断和超时的tryLock,具体用法参考API,其实现都是由AQS来实现的。
附录源码
/** * 一个可重入的互斥锁,与内置的synchronized具有相同的语义,但是功能更强大 */ public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** 所有的实现都有sync来实现 */ private final Sync sync; /** * 基本的同步器实现,继承自AQS,子类通过继承它来实现公平的和非公平的策略, * 使用AQS的state字段代表持有锁的次数 */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Lock.lock方法的实现,是一个抽象方法 */ abstract void lock(); /** * 非公平的tryLock,tryAcquire的行为由子类实现,线程不会被阻塞。首先判断状态 * 1.如果状态是0,说明锁可用,则使用CAS修改状态,如果成功,由当前线程获得,则返回true * 2.如果锁已经被占用,判断锁的拥有者是不是当前线程,如果是,设置状态,返回true,注意溢出 * 3.如果锁已经被其他线程占用,获取失败,返回false */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果锁可用,设置state的状态,返回 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }//如果锁已经被占用了,那么判断是不是由当前线程占用的,如果是,设置状态 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } /** * 尝试释放锁,设置锁的计数。经过释放后如果锁可用返回true,否则返回false */ protected final boolean tryRelease(int releases) { int c = getState() - releases; //如果锁不是由当前线程持有,出错了,抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c==0说明没有线程占用锁 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** 返回锁是否由当前线程持有 */ protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } //依赖于外部类的方法 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** 序列化 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } /**Sync子类包括两个方法void lock(),会阻塞;tryAcquire()不会阻塞*/ /** * 非公平锁的实现 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 使用闯入策略来实现非公平的锁,如果闯入失败,则进入正常的获取流程 * 首先就去尝试修改状态,如果修改成功,则获得锁,如果没有成功,就去同步队列排队把 */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /** * 调用Sync的nonfairTryAcquire(acquires) */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 公平锁 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; /** AQS acquire(1) */ final void lock() { acquire(1); } /** * 公平的tryAcquire(),成功获得锁,返回true,不成功,返回false,不会阻塞; * 由于是公平的,所以在锁可用时,需要判断当前线程是否是等待时间最长的 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //c==0说明没有线程获得锁,这里是公平锁的实现,还有判断是否有线程比当前线程等待的时间长 //如果当前线程就是等待时间最长的,使用CAS修改状态,成功则说明获得锁,返回true if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }//如果锁已被占用,判断是否是当前线程占用 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } /** * 无参构造方法,使用非公平策略 */ public ReentrantLock() { sync = new NonfairSync(); } /**true 公平策略;false 非公平策略 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * 获得锁,如果锁没有被占用,获得锁,计数为1,立即返回; * 如果锁是有当前线程占用,计数加1,返回; * 如果锁被其他线程占用,阻塞; */ public void lock() { sync.lock(); } /** * 可中断的锁获取,通过AQS acquireInterruptibly(1)来实现。 * 获得锁,如果锁没有被占用,获得锁,计数为1,立即返回; * 如果锁是有当前线程占用,增加计数1,返回; * 如果锁被其他线程占用,阻塞当前线程,在西面两个条件发生之前,一直休眠: * (1)锁有当前线程获得; * (2)当前线程被中断; * 如果锁被当前线程获得,则计数为1。 * 如果当前线程在进入方法时中断状态被设置或者在等待锁期间被其他线程阻塞, * 则抛出InterruptionException,并清楚当前线程的中断状态。 * 这是一个显式中断点,应该悠闲考虑响应。 */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** 调用Sync的nonfairTryAcquire(1) */ public boolean tryLock() { return sync.nonfairTryAcquire(1); } /** 可中断的超时获取锁,超时或者中断都会终止 */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 尝试去释放锁,调用AQS的release(1),AQS.release(1)的行为: * 首先调用tryRelease(1),由于Sync重写了tryRelease;如果返回true表示锁可用 * 然后,去唤醒后继节点。 */ public void unlock() { sync.release(1); } /** * Returns a {@link Condition} instance for use with this * {@link Lock} instance. * * <p>The returned {@link Condition} instance supports the same * usages as do the {@link Object} monitor methods ({@link * Object#wait() wait}, {@link Object#notify notify}, and {@link * Object#notifyAll notifyAll}) when used with the built-in * monitor lock. * * <ul> * * <li>If this lock is not held when any of the {@link Condition} * {@linkplain Condition#await() waiting} or {@linkplain * Condition#signal signalling} methods are called, then an {@link * IllegalMonitorStateException} is thrown. * * <li>When the condition {@linkplain Condition#await() waiting} * methods are called the lock is released and, before they * return, the lock is reacquired and the lock hold count restored * to what it was when the method was called. * * <li>If a thread is {@linkplain Thread#interrupt interrupted} * while waiting then the wait will terminate, an {@link * InterruptedException} will be thrown, and the thread's * interrupted status will be cleared. * * <li> Waiting threads are signalled in FIFO order. * * <li>The ordering of lock reacquisition for threads returning * from waiting methods is the same as for threads initially * acquiring the lock, which is in the default case not specified, * but for <em>fair</em> locks favors those threads that have been * waiting the longest. * * </ul> * * @return the Condition object */ public Condition newCondition() { return sync.newCondition(); } /** * 查询当前线程持有锁的次数(可重入),持有次数是典型的仅仅用来测试和调试的方法 * @return 0 如果当前线程不持有锁;num,持有的次数。 */ public int getHoldCount() { return sync.getHoldCount(); } /** 判断锁是否被当前线程持有。*/ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 查询锁是否被占用,此方法用于监视系统状态,不用于同步 */ public boolean isLocked() { return sync.isLocked(); } /** * 判断是否是公平锁 * @return 如果构造的是FairSync的实例,则返回ture */ public final boolean isFair() { return sync instanceof FairSync; } /** * 返回持有锁的线程,如果锁是可用状态则返回null。此方法实际是由 * AbstractOwnableSynchronizer.getExclusiveOwnerThread()实现 * AbstractOwnableSynchronizer里使用exclusiveOwnerThread保存当前 * 拥有状态的线程。 */ protected Thread getOwner() { return sync.getOwner(); } /** * 检查是否有线程在同步队列中等待获取锁,有AQS hasQueuedThread()实现 * 注意:注意,因为随时可能发生取消,所以返回 true 并不保证有其他线程ever acquire此锁。 * 此方法主要用于监视系统状态。 */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 查询给定线程是否正在等待获取此锁。 * 注意,因为随时可能发生取消,所以返回true并不保证此线程ever acquire。 * 此方法主要用于监视系统状态 */ public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } /** * 返回正等待获取此锁的线程估计数。 * 该值仅是估计的数字,因为在此方法遍历内部数据结构的同时,线程的数目可能动态地变化。 * 此方法用于监视系统状态,不用于同步控制。 */ public final int getQueueLength() { return sync.getQueueLength(); } /** * 返回一个 collection,它包含可能正等待获取此锁的线程。 * 因为在构造此结果的同时实际的线程 set 可能动态地变化,所以返回的 collection 仅是尽力的估计值。 * 所返回 collection 中的元素没有特定的顺序。此方法用于加快子类的构造速度,以提供更多的监视设施。 */ protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } }转载请注明:喻红叶《Java并发-ReentrantLock源码分析》
相关推荐
主要为大家详细介绍了Java并发系列之ReentrantLock源码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf 9、并发容器 (Map、List、Set) 实战及其原理.pdf 10、阻塞队列BlockingQueue 实战及其原理分析.pdf
5、Condition源码分析 6、ReentrantReadWriteLock底层实现原理 7、并发工具类CountDownLatch 、CyclicBarrier和Semaphore底层实现原理 8、线程池原理和如何使用线程池 9、ThreadLocal 为什么会内存泄漏 10、Volatile...
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
java8 源码 AQS补充材料 美团技术团队《从ReentrantLock的实现看AQS的原理及应用》 测试 老钱《打通Java任督二脉--并发数据结构的基石》 ...waterstone《Java并发AQS详解》 英文论文的中文翻译: AQS作者的英文论文:
高并发编程第三阶段11讲 AtomicXXXFieldUpdater源码分析及使用场景分析.mp4 高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过...
高并发编程第三阶段11讲 AtomicXXXFieldUpdater源码分析及使用场景分析.mp4 高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过...
7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf 9、并发容器 (Map、List、Set) 实战及其原理.pdf 10、阻塞队列BlockingQueue 实战及其原理分析.pdf
7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf 9、并发容器 (Map、List、Set) 实战及其原理.pdf 10、阻塞队列BlockingQueue 实战及其原理分析.pdf
7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf 9、并发容器 (Map、List、Set) 实战及其原理.pdf 10、阻塞队列BlockingQueue 实战及其原理分析.pdf
7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf 9、并发容器 (Map、List、Set) 实战及其原理.pdf 10、阻塞队列BlockingQueue 实战及其原理分析.pdf
源码分析 AQS同步器原理(实现各种同步器)模板方法模式,继承并冲重写AQS方法 主要是加锁和解锁 java 创建线程池相关 done 不推荐使用Excutors工具类去创建默认的几种线程池。会有OOM风险...要么核心线程树可能过多...
(这部分通过查看ThreadPoolExecutor的源码分析--getTask()部分); unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性(时间单位) workQueue:一个阻塞队列,用来存储等待执行的任务...