`

AQS-预备-背景

阅读更多

AbstractQueuedSynchronizer:

 

背景:

在对一个资源做同步的时候,要求每一个时刻只有有限个线程可以操作资源,对于synchronized 而言,更是严格到了只有一个线程可以操作资源,根据业务的需要,对应能够或者允许操作资源的线程可以得到继续执行,不能拿到资源的线程要进行等待(park),直到其他线程释放资源的时候,等待的线程可以尝试获取资源,拥有后可以继续执行。

 

为了记录没有拿到资源的线程,我们可以定义一个数据组或者一列表来记录,一旦有其他线程释放资源的时候,我们可以unpark这些(或者第一个)等待的线程。

一种非常糙的仅仅为了说明这种效果的实现是:

/**
	 * @author xinchun.wang
	 */
	public static class AQSDemo {
		private static List<Thread> list = Collections.synchronizedList(new ArrayList<Thread>());

		private volatile AtomicInteger state;

		public AQSDemo(int state) {
			this.state = new AtomicInteger(state);
		}

		public void acquire() {
			if (state.getAndDecrement() <= 0) {
				list.add(Thread.currentThread());
				LockSupport.park(Thread.currentThread());
			}
		}

		public void release() {
			state.incrementAndGet();
			if(list.size() >0){
				LockSupport.unpark(list.remove(0));
			}
		}
	}

测试:

public static void main(String[] args) {
		final AQSDemo demo = new AQSDemo(1);

		new Thread("A") {
			public void run() {
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("Thread A entered");
				demo.acquire();
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("Thread A over");
				demo.release();

			};
		}.start();
		
		new Thread("B") {
			public void run() {
				System.out.println("Thread B entered");
				demo.acquire();
				try {
					Thread.sleep(900);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("Thread B over");
				demo.release();

			};
		}.start();
		
	}

  

 

基于这种思想,AbstractQueuedSynchronizer 提供了更为丰富并且有用的功能:

1、支持共享和独占的操作。独占模式:意味着统一时刻只有一个线程能够操纵资源,比如锁;共享模式:对应信号量或者许可的概念,共享表达的是state语义可以多个线程来修改,即在不超过一定范围的情况下,其他线程可以继续获得许可,如果成功可以继续执行。

2、公平和非公平:synchronized 是不公平的,一个线程从synchronized代码块离开后,其他线程不确定那个能够进入临界逻辑;而AbstractQueuedSynchronizer 通过FIFO队列的模型,只需要拿head节点对应的线程,即可获得等待最久的线程呢个,从而维护公平性。

3、线程的唤醒:state状态变动的时候,需要对park的线程进行唤醒。对应CountDownLatch的功能要求,当一个线程改变了其内部状态后,多个park的线程可以得到执行;对应ReentrantLock 则要求,只能一个线程得到唤醒。AbstractQueuedSynchronizer  通过Node节点,保存了线程信息,以及模式信息(独占/共享),对应独占模式,仅仅unpark一个线程,对应共享模式,则可以唤醒多个线程。

 

 

 

概述:

为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础子类必须通过定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步。而使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。 

 

此类支持默认的独占 模式和共享 模式之一,或者二者都支持。

处于独占模式下时,其他线程试图获取该锁将无法取得成功。

在共享模式下,多个线程获取某个锁可能(但不是一定)会获得成功。此类并不“了解”这些不同。

此类会机械地意识到当在共享模式下成功获取某一锁时,下一个等待线程(如果存在)也必须确定自己是否可以成功获取该锁。

处于不同模式下的等待线程可以共享相同的 FIFO 队列。

通常,实现子类只支持其中一种模式,但两种模式都可以在(例如)ReadWriteLock 中发挥作用。

只支持独占模式或者只支持共享模式的子类不必定义支持未使用模式的方法。 

 

 

 

使用:

子类一般通过实现以下5个方法,仅仅通过cas改变state的状态

 

为了将此类用作同步器的基础,需要适当地重新定义以下方法,这是通过使用 getState()、setState(int) 和/或 compareAndSetState(int, int) 方法来检查和/或修改同步状态来实现的: 

 

独占模式,尝试获取许可( 目前实现用到的为:ReentrantReadWriteLock,ReentrantLock)

tryAcquire(int) 

tryRelease(int) 

 

共享模式:(CountDownLatch,Semaphore,CyclicBarrier,FutureTask )

//试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。

//此方法总是由执行 acquire的线程来调用。

//如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。 

//-1 : 失败;

//0:获取成功但其后续共享模式下的获取不能成功

//1:获取成功并且其后续共享模式下的获取可能够成功,则返回正值,在这种情况下,后续等待线程必须检查可用性

tryAcquireShared(int) 

tryReleaseShared(int) 

方法将报告同步对于当前线程是否是独占的

isHeldExclusively() 

 

默认情况下,每个方法都抛出 UnsupportedOperationException。这些方法的实现在内部必须是线程安全的,通常应该很短并且不被阻塞。定义这些方法是使用此类的唯一 受支持的方式。其他所有方法都被声明为 final,因为它们无法是各不相同的。 

// 

 

 

独占同步的核心采用以下形式: 

 

 Acquire:(获取许可)

     if (!tryAcquire(arg)) {

        enqueue thread if it is not already queued;

        possibly block current thread;

     }

 

 

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 

 

 

 Release:(释放许可)

     if (tryRelease(arg))

        unblock the first queued thread

  

  

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

 

 (共享模式与此类似,但可能涉及级联信号。) 

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics