`
whitesock
  • 浏览: 478556 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

Inside AbstractQueuedSynchronizer (2)

阅读更多

Inside AbstractQueuedSynchronizer (1)

Inside AbstractQueuedSynchronizer (2)

Inside AbstractQueuedSynchronizer (3)

Inside AbstractQueuedSynchronizer (4)

 

3 AbstractQueuedSynchronizer

 

3.1 Inheritance

    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer继承自Object并且实现了Serializable接口,它只有一个成员变量private transient Thread exclusiveOwnerThread以及对应的getter/setter方法。该成员变量用于保存当前拥有排他访问权的线程。需要注意的是,该成员变量没有用volatile关键字修饰。


3.2 State

    AbstractQueuedSynchronizer用一个int(private volatile int state)来保存同步状态,以及对应的getter/setter/compareAndSetState方法。Java6新增了一个AbstractQueuedLongSynchronizer,它用一个long来保存同步状态,貌似目前没有被java.util.concurrent中的其它synchronizer所使用。对于ReentrantLock,state为0则意味着锁没有被任何线程持有;否则state保存了持有锁的线程的重入次数。

 

3.3 WaitQueue

    WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的线程。它的实现是"CLH" (Craig, Landin, and Hagersten) lock queue的一个变种。

 

    标准的CLH lock queue通常被用来实现spin lock,它通过TheadLocal变量pred引用队列中的前一个节点(Node本身没有指向前后节点的引用),以下是标准的CLH lock queue的一个参考实现:

public class ClhSpinLock {
    private final ThreadLocal<Node> pred;
    private final ThreadLocal<Node> node;
    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

    public ClhSpinLock() {
        this.node = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return new Node();
            }
        };

        this.pred = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return null;
            }
        };
    }

    public void lock() {
        final Node node = this.node.get();
        node.locked = true;
        Node pred = this.tail.getAndSet(node);
        this.pred.set(pred);
        while (pred.locked) {}
    }

    public void unlock() {
        final Node node = this.node.get();
        node.locked = false;
        this.node.set(this.pred.get());
    }

    private static class Node {
        private volatile boolean locked;
    }
}

    其逻辑并不复杂:对于lock操作,只需要通过一个CAS操作即可将当前线程对应的节点加入到队列中,并且同时获得了predecessor节点的引用,然后就是等待predecessor释放锁;对于unlock操作,只需要将当前线程对应节点的locked成员变量设置为false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node对象,这是对GC友好的一个优化。如果不考虑这个优化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解决了cache-coherence traffic的问题:每个线程在busy loop的时候,并没有竞争同一个状态,而是只判断其对应predecessor的锁定状态。如果你担心false sharing问题,那么可以考虑将锁定状态padding到cache line的长度。此外,CLH spin lock通过FIFO的队列保证了锁竞争的公平性。

 

    AbstractQueuedSynchronizer的静态内部类Node维护了一个FIFO的等待队列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能。sucessor的作用是为了支持线程阻塞:在Inside AbstractQueuedSynchronizer (1) 中提到过,AbstractQueuedSynchronizer通过LockSupport实现了线程的block/unblock,因此需要通过successor引用找到后续的线程并将其唤醒(CLH spin lock因为是spin,所以不需要显式地唤醒)。此外,Node中还包括一个volatile int waitStatus成员变量用于控制线程的阻塞/唤醒,以及避免不必要的调用LockSupport的park/unpark方法。需要注意的是,虽然AbstractQueuedSynchronizer在绝大多数情况下是通过LockSupport进行线程的阻塞/唤醒,但是在特定情况下也会使用spin lock,static final long spinForTimeoutThreshold = 1000L这个静态变量设定了使用spin lock的一个阈值。

 

    对WaitQueue进行enqueue操作相关的代码如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

    addWaiter方法中的那个if分支,其实是一种优化,如果失败那么会调用enq方法进行enqueue。需要注意的是,以上代码中对next引用的设定是在enqueue成功之后进行的。这样做虽然没有并发问题,但是在判断一个node是否有sucessor时,不能仅仅通过next == null来判断,因为enqueue和设置next引用这两个步骤不是一个原子操作。

 

    对WaitQueue进行dequeue操作相关的代码如下:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

    setHead方法没有用到锁,也没有使用CAS,这样没有并发问题?没有,因为这个方法只会被持有锁的线程所调用,此时只需要将head指向持有锁的线程对应的node即可。

 

4
3
分享到:
评论
4 楼 east_java 2012-12-12  
LZ,看这篇文章http://www.iteye.com/topic/1114231,觉得是否有道理?
3 楼 lixieinstein 2012-05-23  
yxs_815 写道
lixieinstein 写道
您好,看的您的文章很受启发。
但是有一点不太理解,请问ClhSpinLock中的ThreadLocal<Node> pred有什么作用呢?
在lock时可以通过tail来获取前任节点,然后判断其状态来决定是否自选等待,那为什么还需要一个本地线程的pred来储存前任节点呢?在unlock时,只需要将当前节点的locked状态修改为false就可以释放后继结点,this.node.set(this.pred.get()); 貌似没有实质性的作用呀?

和 lixieinstein 的想法一致,感觉用AtomicReference<Node> pred, 就行了,连ThreadLocal<Node> node都没必要吧? 这样锁的实现应该不可以重入。

谢谢回复,:D ,不可以重入,如果要重入就要判断当前线程是否持锁线程,然后还要对加锁次数和释放锁次数进行操作。这个例子让我对CLH队列有了很好的理解,呵呵
2 楼 yxs_815 2012-05-21  
lixieinstein 写道
您好,看的您的文章很受启发。
但是有一点不太理解,请问ClhSpinLock中的ThreadLocal<Node> pred有什么作用呢?
在lock时可以通过tail来获取前任节点,然后判断其状态来决定是否自选等待,那为什么还需要一个本地线程的pred来储存前任节点呢?在unlock时,只需要将当前节点的locked状态修改为false就可以释放后继结点,this.node.set(this.pred.get()); 貌似没有实质性的作用呀?

和 lixieinstein 的想法一致,感觉用AtomicReference<Node> pred, 就行了,连ThreadLocal<Node> node都没必要吧? 这样锁的实现应该不可以重入。
1 楼 lixieinstein 2012-05-03  
您好,看的您的文章很受启发。
但是有一点不太理解,请问ClhSpinLock中的ThreadLocal<Node> pred有什么作用呢?
在lock时可以通过tail来获取前任节点,然后判断其状态来决定是否自选等待,那为什么还需要一个本地线程的pred来储存前任节点呢?在unlock时,只需要将当前节点的locked状态修改为false就可以释放后继结点,this.node.set(this.pred.get()); 貌似没有实质性的作用呀?

相关推荐

Global site tag (gtag.js) - Google Analytics