JUC代码浅析[6]——基于AQS的CyclicBarrier
CyclicBarrier是一种同步机制允许一组线程相互等待,等到所有线程都到达一个屏障点才退出await方法,它没有直接实现AQS而是借助ReentrantLock来实现的同步机制。它是可循环使用的,而CountDownLatch是一次性的,另外它体现的语义也跟CountDownLatch不同,CountDownLatch减少计数到达条件采用的是release方式,而CyclicBarrier走向屏障点(await)采用的是Acquire方式,Acquire是会阻塞的,这也实现了CyclicBarrier的另外一个特点,只要有一个线程中断那么屏障点就被打破,所有线程都将被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的),这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。屏障点被打破的CyclicBarrier将不可再使用(会抛出BrokenBarrierException)除非执行reset操作。
下面是一个使用的例子
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable()
{
public void
run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
这个例子的场景是每个Worker线程处理完一行数据后,等待其他线程到达屏障点。当所有线程都到达屏障点时进行一次合并操作(mergeRows方法),然后每个Worker线程继续处理下一行数据.如此反复。
下面的代码片段说明了如何实现这种机制,整个过程为
1首先检查屏障点有没有打破,如果打破了就直接抛出异常退出。
2检查当前线程是否被中断了,如果中断了就打破屏障点,并抛出异常退出。
3检查当前线程是否最后一个到达屏障点,如果是的话就设置下一轮屏障,并唤醒所有线程,如果需要的话执行barrierCommand,退出。
4.以上都不满足的话,循环等待在trip这个condition上。如果设置了等待超时时间,那么只要有一个线程等待超时后就会打破屏障点唤醒所有线程,并抛出异常TimeoutException退出,其他线程可能会抛出BrokenBarrierException异常。所以如果不设置超时时间并且最后一个线程由于某种原因不能到达屏障点,那么所有线程都一直死等待了,因此才需要上面几步的检查。
整个过程还有很多逻辑来处理执行屏障点任务引发的异常、park线程超时引发的中断异常和超时异常等
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted,
or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we
had not
// been interrupted, so this interrupt is
deemed to
// "belong" to subsequent
execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
CyclicBarrier对外API只有7个:
l
public CyclicBarrier(int parties) 创建一个CyclicBarrier,指定参与者数量
l
public CyclicBarrier(int parties,
Runnable barrierAction)创建一 个CyclicBarrier,指定参与者数量。并且指定所有参与者到达屏障点之后需要执行的动作。
l
public int await() throws
InterruptedException, BrokenBarrierException在所 有参与者到达屏障点之前一直等待(也就是说等待所有参与者都调用了await方法才往下执行)
l
public int await(long timeout, TimeUnit
unit)
throws InterruptedException,BrokenBarrierException,TimeoutException 跟await()方法一样,只是指定了等待超时时间,具体实现请看上面的分析
l
public boolean isBroken()屏障点是否被打破
l
public void reset()重置屏障点(屏障点打破之后只能调用这个方法才可重新使用)
l
public int getNumberWaiting()处于等待状态的参与者的个数,也就是调用了await方法的参与者 个数
分享到:
相关推荐
狂神说JUC代码
Java 多线程与并发(10_26)-JUC锁_ 锁核心类AQS详解
juc 的aqs介绍。
主要介绍了java CyclicBarrier的相关资料,文中示例代码非常详细,帮助大家更好的理解和学习,感兴趣的朋友可以了解下
JUC代码收集,java高并发多线程学习
Java 并发编程中的 JUC(java.util.concurrent)库以及其核心组件 AQS(AbstractQueuedSynchronizer)在构建高性能、可伸缩性的多线程应用方面具有重要的地位。 AQS 是 JUC 中的核心组件,它提供了一个框架,让...
个人学习的时候对JUC做的笔记,包括所有代码,学习视频是尚硅谷周阳老师的JUC,需要有一定的基础才能看懂,这是个人笔记,不一定适用于所有人,谢谢!
为了学习JUC,AQS是基础中的基础,所以我们首先深入了解下AQS。 一、锁的介绍 为了了解AQS的源码,我们需要先大概下锁中的一些功能 1.1 乐观锁/悲观锁 乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同...
JUC是什么 线程 进程 / 线程 线程状态 wait / sleep 并发 / 并行 Lock 使用Lock锁 可重入锁 公平锁 / 非公平锁 Synchronized / Lock 线程通讯 wait()、notify()和notifyAll() 虚假唤醒 Condition 定制化通信 多线程...
JUC代码演示 Java多线程并发 源于B站https://www.bilibili.com/video/BV1Kw411Z7dF/?p=32&spm_id_from=333.1007.top_right_bar_window_history.content.click课程自我项目记录。 适合有一定基础的朋友。
本资源描述了Java并发常见的问题AQS的加锁解锁的过程
juc学习代码。。。。
juc并发编程脑图以及相关示例代码
【尚硅谷】大厂必备技术之JUC并发编程视频 配套资料,自己根据视频整理 pdf 课件,和代码 视频地址:...
ReentrantLock Lock 加锁过程源码分析图,AQS 源码分析
详细阐述了ReentrantLock通过AQS获取锁到释放锁的过程,附有关键方法的源码及注释
juc笔记:这些知识点是怎么积累进来的呢,下面我以JUC的学习过程为例子来讲解我的知识框架图记录过程, 首先我们知道,JUC就是java.util.concurrent包,俗称java并发包,那首先我们要知道java并发包是用来干嘛 的...
用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;包含下载地址
从JUC中的AQS引入,讲解Java volatile与AQS锁内存可见性
juc入门案例演示代码