`
沉沦的快乐
  • 浏览: 55807 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

并发编程之AbstractQueuedSynchronizer原理剖析

阅读更多

    简介

       在java同步锁中,除了synchronized关键字之外,还有很多更灵活的显式锁可供选择,比如ReentrantLock,ReentrantReadWriteLock等。甚至如果jdk中的锁不能满足你的需求,你还可以自己来定义自己的显式锁。显式锁都是实现Lock接口,需要自己实现比如lock(),unlock(),trylock()等方法。而lock接口方法的实现基本上都是基于AbstractOwnableSynchronizer(AQS)抽象类。可以说AQS是java的锁中最重要也最基础的类之一。

      AQS是一个半成品的抽象类。它封装好了线程如何等待锁和如何释放锁的规则。比如多个线程竞争锁,没有获得锁的线程将放到FIFO中排队,并且以自旋的方式去尝试获得锁。但是一个线程什么情况下获得锁和释放锁,需要自己去定义。简单的说AQS帮你实现了框架上的规则,但是框架下面的更具体的规则需要自己来实现。举个不恰当的例子——一个吃饭的游戏:一群人吃饭,但是只有一个饭桶,只有获得饭桶的人才能从饭吃桶里吃到饭,每次只有一个人能获得饭桶,没有抢到饭桶的人就去排队。AQS实现了这样一个规则:如果没有人获得饭桶,那么大家一起抢饭桶。谁先抢到谁就获得这个饭桶。其他没有获得饭桶的人就去排队,后面参加进来的人就排在队伍末尾。但是大家并不是老老实实呆在队伍里不动,也不是等着别人把饭桶让给你。而是队伍里的每个人都在关注着饭桶是否被前面得到的人释放了,并一遍一遍的去尝试着抢饭桶,直到抢到饭桶(获得锁)或者自己累了(异常退出)退出抢饭桶的行列,或者他妈喊它回家吃饭了(中断或取消)。这个抢饭桶规则就是AQS给你实现的规则。但是什么情况下被认定为抢到饭桶,这个规则需要你来补充,比如饭桶释放之后传给队列里排第一个的人,或者传给队列里排队最久人,获得传给队列里年龄最小的人等等。这也是就AQS获得锁的过程。如果你得到了饭桶,吃饱了饭就要释放饭桶,让给其他人,这就是释放锁的过程。下面具体讲讲AQS具体的原理。

 

队列里的节点定义

   没有抢到锁的线程将会放到一个队列里面。这个队列是给双向队列,所以节点node的属性包含前驱节点和后记节点,以及排队的线程,同时还包含一些判断获得释放锁需要的元素。下表示Node的所有属性

属性 描述
Node prev

前驱节点,比如某些锁中判断当前节点的线程能否获得锁的一个条件是

前一个节点是头节点。

Node next 后继节点。
Thread thread 竞争锁的线程。
Node nextWaiter

存储condition队列中的后继节点。

1.SHARED:表示该节点线程处于共享模式等待。

2.EXCLUSIVE:表示该节点线程处于排他模式等待。

int waitStatus

表示节点的状态。其中包含的状态有:

CANCELLED,值为1,表示当前的线程被取消;

SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark

CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;

PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

值为0,表示当前节点在sync队列中,等待着获取锁。

 

AQS的属性及描述

属性名 属性描述
Node head FIFO队列头节点
Node tail FIFO队列尾节点
volatile int state 状态。该属性是AQS管理锁的最重要的属性。可以用它来标记当前锁的状态,也可以充当线程重入的计数器。同时在读写锁中,他的高16位和低16位都可以表示不同的含义。因此对这个字段的灵活应用是定制自定义锁的关键。通过CAS的方法来实现对state的同步操作,进而实现锁的获得与释放。
Thread exclusiveOwnerThread 当前拥有排他锁的线程。该字段一般和state联合起来使用,以确定锁的状态。一般state=0时exclusiveOwnerThread为null,表示没有线程获得锁。同时state不等于0时exclusiveOwnerThread也可为null,比如读写锁的读锁。

 

排他模式锁获得锁的方法

 

     上面抢饭桶的例子中只有1个人能获得饭桶,这其实是一种排他锁。前面介绍了AQS提供了半成品的获得锁的方法:一是它已经定义好的一套规则框架,比如轮询,排队等等。二是需要自定义的具体规则,比如公平与非公平等。

排他模式获得锁的实现方法

对应的方法如下:

1.protected boolean tryAcquire(int arg)。这个接口是需要自己实现的具体锁规则。比如ReentrantLock的tryAcquire的规则如下:

 

final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        ReentrantLock定义的规则如下:如果state=0,那么通过compareAndSetState(0, acquires)方法先修改state,如果compareAndSetState的返回值为TRUE,说明state修改成功并且成功获得了锁。这时候其他线程调用compareAndSetState(0, acquires)一定返回false,state也不会修改。然后把exclusiveOwnerThread设置为当前线程,表示当前线程独占了锁。ReentrantLock是线程可重入,所以虽然state不等于0,但是exclusiveOwnerThread等于当前线程,那么也应该获得锁,这时候state充当计数器,state=c + acquires。因为执行这个操作的是同一个线程,所以不需要CAS进行同步。
2.public final void acquire(int arg)。这个方法就是AQS帮你实现了的规则。代码如下:
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
     这段代码先调用自定义的tryAcquire(arg)方法。如果该方法返回为true,那么该方法执行并退出。表示获得锁。如果tryAcquire返回false,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。首先先执行addWaiter(Node.EXCLUSIVE), arg)。这个方法是往把当前线程封装成一个node,然后挂在队列末尾。

 

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
     addWaiter(Node mode)方法也是通过compareAndSetTail方法来实现队列同步的。addWaiter再性能上做了优化。在大部分情况下队列都不会为空,所以先按照队列不为空的情况进行处理把当前node挂到队列末尾。如果当前队列为空或者执行compareAndSetTail同步失败,那么在执行完整的把当前节点挂到队列末尾的的方法enq(node)。

 

 

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) {
                    tail = node;
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    enq(node)方法进行自旋,直到node成功挂到了队列的末尾,期间还处理了队列为空时需要创建头节点的情况。

 

     addWaiter方法执行完并创建node完成之后,那么再执行acquireQueued(final Node node, int arg)方法:

 

final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }
     该方法通过自旋方式去获得锁,要么获得锁返回,要么被中断退出,要么继续自旋等待锁。首先判断当前节点的前驱节点是不是头节点,如果是,则执行自定义的tryAcquire(arg)。如果返回为true,则把当前节点设为头节点并返回。如果p == head && tryAcquire(arg)为false,则执行中断检查shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()。方法shouldParkAfterFailedAcquire(p, node) 是检查并更新没有获得锁的node的状态并返回改节点的线程是否应该阻塞。原代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
	    do {
		node.prev = pred = pred.prev;
	    } while (pred.waitStatus > 0);
	    pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE. Indicate that we
             * need a signal, but don't park yet. Caller will need to
             * retry to make sure it cannot acquire before parking. 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        } 
        return false;
    }
        该方法实现以下逻辑。首先如果前一个节点的状态为SIGNAL,那么表示当前节点的线程可以安全地去获得许可;如果一个节点的状态大于0(大于0的只有一个值1,即取消了),然后往head方向找到一个waitStatus小于等于0的node A,并把当前节点的前驱节点设置为node A,node A的后继节点设置为当前node。即把当前node往head方向连续的的cancel节点摘掉,然后返回false,即当前线程不阻塞;如果前驱节点的状态等于0或-3(-2是共享模式才用的状态),表示当前线程需要一个signal触发当前线程去获得许可,但是不是在这次自旋中去获得许可。所以需要把前驱节点的状态设置为signal,以便下次自旋的时候去获得许可,同时返回false。如果shouldParkAfterFailedAcquire返回true,即当前线程能够安全的去获得许可,那么执行parkAndCheckInterrupt()方法:
 
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
      该方法执行LockSupport.park(this);取获取许可,如果获取不到则一直等待,直到获得一个许可或中断,然后返回Thread.interrupted(),Thread.interrupted()方法会返回线程的中断状态并reset中断状态。
     至此AQS定义的一个完成的获得锁的规则结束了。现在总结下如何AQS是如何实现获得锁的:首先需要自己实现tryAcquire方法,然后acquire方法会调用tryAcquire方法去获得锁,如果得不到则为当前线程创建一个node,加到队列末尾。然后acquireQueued方法自旋,直到当前线程获得锁或中断退出。在自旋过程中,当前线程会处于阻塞状态,直到被唤醒获得锁。

排他模式释放锁的实现方法

     某人获得饭桶后,他吃饱了饭,就需要释放饭桶让给其他人,如果一直抱着饭桶不放,就造成了死锁。现在再来看看释放锁的原理。
   1.boolean tryRelease(int arg),这是需要自己实现的释放锁的规则,同样以ReentrantLock为例子看看它是怎么实现的:
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
       如果当前线程并没有获得锁调用该方法会抛出IllegalMonitorStateException异常,另外ReentrantLock是可重入的,如果state还充当了计数器的作用,只有当state等于0时才释放锁。
      2.public final boolean release(int arg),这个方法是AQS定义好的释放锁规则。
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
      如果当前线程释放锁,并且头结点存在且状态不为0,那么需要对头结点的后继节点进行唤醒操作,调用方法unparkSuccessor():
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling. It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); 

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
     如果头结点状态小于0,则先清除头结点状态,置为0,那么唤醒头结点的下一个节点。如果该节点不存在,或者改节点状态是取消状态,那么从队列末尾开始找,找到最后一个状态没有取消的节点,然后调用LockSupport.unpark(s.thread)方法释放许可,唤醒该节点的线程。为什么只对头结点的后继节点释放许可呢?因为每次只有一个线程获得锁,前面在获得锁acquireQueued方法中说了,只有当前节点的前驱节点是头节点才能获得锁,所以只需要唤醒头结点的线程就可以了。

共享模式锁

    前面举的吃饭的例子中,只有一个人能够获得饭桶吃饭,但是如果这个饭桶足够大,可以允许多个人同时获得饭桶。那么上面讲的获得锁和释放锁就满足不了,因为前面讲的是独占锁。下面来讲讲共享锁,即多个线程获得同一个锁。共享锁的一个典型的例子是读锁。

   共享模式获得锁的实现方法

 1.protected int tryAcquireShared(int arg),该方法是共享锁需要自己实现的接口,以读写锁的读锁例子举例:

 

protected final int tryAcquireShared(int unused) {
            
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            if (!readerShouldBlock(current) &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    cachedHoldCounter = rh = readHolds.get();
                rh.count++;
                return 1;
            }
            return fullTryAcquireShared(current);
        }

      关于读写锁,请参见并发编程之读写锁ReentrantReadWriteLock实现。这里不再赘述。

 

 2.public final void acquireShared(int arg) 。这个方法是AQS实现的共享锁规则:

 

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

   如果当前线程没有获得共享锁,则调用doAcquireShared(arg)方法:

 

 

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

     该方法的主要功能是先创建一个共享模式的node,加到队列的末尾,然后判断当前节点的前驱节点是否是头节点,是的话则调用tryAcquireShared方法。如果tryAcquireShared的返回值大于等于0,则表示获得了共享锁,然后调用setHeadAndPropagate方法,setHeadAndPropagate方法先把当前节点设为为头节点,调用doReleaseShared方法

 

 

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         * Propagation was indicated by caller,
         * or was recorded (as h.waitStatus) by a previous operation
         * (note: this uses sign-check of waitStatus because
         * PROPAGATE status may transition to SIGNAL.)
         * and
         * The next node is waiting in shared mode,
         * or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0) { 
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

 private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases. This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }

         doReleaseShared方法实现的功能如下:遍历队列中的所有节点,如果节点状态为signal,把改siganl状态置为0,并调用unparkSuccessor(h);方法把该节点的后继节点线程唤醒;如果该节点状态为0,则把状态设置为PROPAGATE。

 

    共享模式释放锁的实现方法

1.protected final boolean tryReleaseShared(int unused)。该方法是需要自己实现的共享模式释放锁方法。下面是读写锁的读锁的实现例子:

 

 protected final boolean tryReleaseShared(int unused) {
            HoldCounter rh = cachedHoldCounter;
            Thread current = Thread.currentThread();
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            if (rh.tryDecrement() <= 0)
                throw new IllegalMonitorStateException();
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

     关于读写锁,请参见并发编程之读写锁ReentrantReadWriteLock实现。这里不再赘述。

 

2.releaseShared(int arg)。该方法是AQS实现的方法:

 

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

        调用自定义的tryReleaseShared方法成功,则调用doReleaseShared()。

 

 其他获得锁、释放锁方法

     除了前面介绍几个获得独占锁和排他锁的基本接口之外,AQS还提供了可中断接口acquireInterruptibly(int arg)、acquireSharedInterruptibly(int arg)和超时的接口tryAcquireNanos(int arg, long nanosTimeout)、doAcquireNanos(int arg, long nanosTimeout)、tryAcquireSharedNanos(int arg, long nanosTimeout)、doAcquireSharedNanos(int arg, long nanosTimeout)。这些中断接口和超时基本是只是在前面的基本方法中增加了中断处理逻辑和超时判断逻辑。这里不在详细描述。

 
 
 
    
   
 
 

 

 

 

 

 

 

       

分享到:
评论

相关推荐

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    Java并发编程原理与实战

    线程之间通信之join应用与实现原理剖析.mp4 ThreadLocal 使用及实现原理.mp4 并发工具类CountDownLatch详解.mp4 并发工具类CyclicBarrier 详解.mp4 并发工具类Semaphore详解.mp4 并发工具类Exchanger详解.mp4 ...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    Java并发编程实战

    Java并发编程实战 本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及...

    龙果java并发编程完整视频

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    java并发编程

    第35节线程之间通信之join应用与实现原理剖析00:10:17分钟 | 第36节ThreadLocal 使用及实现原理00:17:41分钟 | 第37节并发工具类CountDownLatch详解00:22:04分钟 | 第38节并发工具类CyclicBarrier 详解00:11:52...

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包...对每个类的核心源码进行详细分析,笔记详细,由浅入深,层层深入,带您剖析并发编程原理

    java锁机制基类AbstractQueuedSynchronizer从设计到实现到应用

    从并发概念、场景分析出发,依次引出锁、等待队列等概念,直至分析清楚java锁机制实现的原理。并以java锁机制实现基类AbstractQueuedSynchronizer的实现为例,从类(核心属性、方法)设计思路,到对关键代码做注释...

Global site tag (gtag.js) - Google Analytics