`
747017186
  • 浏览: 318684 次
社区版块
存档分类
最新评论

Java之CyclicBarrier使用

 
阅读更多

来源:https://blog.csdn.net/hanchao5272/article/details/79779639

本章主要对CyclicBarrier进行学习。

 

1.CyclicBarrier简介

 

CyclicBarrier,是JDK1.5的java.util.concurrent并发包中提供的一个并发工具类。

 

所谓Cyclic即 循环 的意思,所谓Barrier即 屏障 的意思。

 

所以综合起来,CyclicBarrier指的就是 循环屏障,虽然这个叫法很奇怪,但是确能很好地表示它的作用。

 

其作用在JDK注释中是这样描述的:

 

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. 

CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. 

The barrier is called cyclic because it can be re-used after the waiting threads are released.

翻译过来,如下:

 

CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。

在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用CyclicBarrier很有帮助。

这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的。

CyclicBarrier的简单理解

 

其实,我更喜欢[人满发车]这个词来理解CyclicBarrier的作用:

 

长途汽车站提供长途客运服务。

当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。

等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。

CyclicBarrier的应用场景

 

CyclicBarrier常用于多线程分组计算。

 

2.CyclicBarrier方法说明

 

CyclicBarrier提供的方法有:

 

——CyclicBarrier(parties)

 

初始化相互等待的线程数量的构造方法。

 

——CyclicBarrier(parties,Runnable barrierAction)

 

初始化相互等待的线程数量以及屏障线程的构造方法。

 

屏障线程的运行时机:等待的线程数量=parties之后,CyclicBarrier打开屏障之前。

 

举例:在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。

 

——getParties()

 

获取CyclicBarrier打开屏障的线程数量,也成为方数。

 

——getNumberWaiting()

 

获取正在CyclicBarrier上等待的线程数量。

 

——await()

 

在CyclicBarrier上进行阻塞等待,直到发生以下情形之一:

 

在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。

当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。

其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

——await(timeout,TimeUnit)

 

在CyclicBarrier上进行限时的阻塞等待,直到发生以下情形之一:

 

在CyclicBarrier上等待的线程数量达到parties,则所有线程被释放,继续执行。

当前线程被中断,则抛出InterruptedException异常,并停止等待,继续执行。

当前线程等待超时,则抛出TimeoutException异常,并停止等待,继续执行。

其他等待的线程被中断,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

其他等待的线程超时,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

其他线程调用CyclicBarrier.reset()方法,则当前线程抛出BrokenBarrierException异常,并停止等待,继续执行。

——isBroken()

 

获取是否破损标志位broken的值,此值有以下几种情况:

 

CyclicBarrier初始化时,broken=false,表示屏障未破损。

如果正在等待的线程被中断,则broken=true,表示屏障破损。

如果正在等待的线程超时,则broken=true,表示屏障破损。

如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。

——reset()

 

使得CyclicBarrier回归初始状态,直观来看它做了两件事:

 

如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。

将是否破损标志位broken置为false。

3.CyclicBarrier方法练习

 

3.1.练习一

 

练习目的:

 

了解CyclicBarrier(parties)/getParties()/await()/getNumberWaiting()的基本用法。

理解循环的意义。

示例代码:

 

 //构造函数1:初始化-开启屏障的方数

CyclicBarrier barrier0 = new CyclicBarrier(2);

//通过barrier.getParties()获取开启屏障的方数

LOGGER.info("barrier.getParties()获取开启屏障的方数:" + barrier0.getParties());

System.out.println();

//通过barrier.getNumberWaiting()获取正在等待的线程数

LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:初始----" + barrier0.getNumberWaiting());

System.out.println();

new Thread(() -> {

    //添加一个等待线程

    LOGGER.info("添加第1个等待线程----" + Thread.currentThread().getName());

    try {

        barrier0.await();

        LOGGER.info(Thread.currentThread().getName() + " is running...");

    } catch (InterruptedException e) {

        e.printStackTrace();

    } catch (BrokenBarrierException e) {

        e.printStackTrace();

    }

    LOGGER.info(Thread.currentThread().getName() + " is terminated.");

}).start();

Thread.sleep(10);

//通过barrier.getNumberWaiting()获取正在等待的线程数

LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:添加第1个等待线程---" + barrier0.getNumberWaiting());

Thread.sleep(10);

System.out.println();

new Thread(() -> {

    //添加一个等待线程

    LOGGER.info("添加第2个等待线程----" + Thread.currentThread().getName());

    try {

        barrier0.await();

        LOGGER.info(Thread.currentThread().getName() + " is running...");

    } catch (InterruptedException e) {

        e.printStackTrace();

    } catch (BrokenBarrierException e) {

        e.printStackTrace();

    }

    LOGGER.info(Thread.currentThread().getName() + " is terminated.");

}).start();

Thread.sleep(100);

System.out.println();

//通过barrier.getNumberWaiting()获取正在等待的线程数

LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());

 

//已经打开的屏障,再次有线程等待的话,还会重新生效--视为循环

new Thread(() -> {

    LOGGER.info("屏障打开之后,再有线程加入等待:" + Thread.currentThread().getName());

    try {

        //BrokenBarrierException

        barrier0.await();

    } catch (InterruptedException e) {

        e.printStackTrace();

    } catch (BrokenBarrierException e) {

        e.printStackTrace();

    }

    LOGGER.info(Thread.currentThread().getName() + " is terminated.");

 

}).start();

System.out.println();

Thread.sleep(10);

LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());

Thread.sleep(10);

new Thread(() -> {

    LOGGER.info("屏障打开之后,再有线程加入等待:" + Thread.currentThread().getName());

    try {

        //BrokenBarrierException

        barrier0.await();

    } catch (InterruptedException e) {

        e.printStackTrace();

    } catch (BrokenBarrierException e) {

        e.printStackTrace();

    }

    LOGGER.info(Thread.currentThread().getName() + " is terminated.");

 

}).start();

Thread.sleep(10);

LOGGER.info("通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---" + barrier0.getNumberWaiting());

 

运行结果:

 

2018-04-01 13:27:55 INFO - barrier.getParties()获取开启屏障的方数:2

 

2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:初始----0

 

2018-04-01 13:27:55 INFO - 添加第1个等待线程----Thread-0

2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:添加第1个等待线程---1

 

2018-04-01 13:27:55 INFO - 添加第2个等待线程----Thread-1

2018-04-01 13:27:55 INFO - Thread-1 is running...

2018-04-01 13:27:55 INFO - Thread-0 is running...

2018-04-01 13:27:55 INFO - Thread-1 is terminated.

2018-04-01 13:27:55 INFO - Thread-0 is terminated.

 

2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---0

 

2018-04-01 13:27:55 INFO - 屏障打开之后,再有线程加入等待:Thread-2

2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---1

2018-04-01 13:27:55 INFO - 屏障打开之后,再有线程加入等待:Thread-3

2018-04-01 13:27:55 INFO - Thread-3 is terminated.

2018-04-01 13:27:55 INFO - Thread-2 is terminated.

2018-04-01 13:27:55 INFO - 通过barrier.getNumberWaiting()获取正在等待的线程数:打开屏障之后---0

 

从运行结果,可以更好的理解循环的意义。

 

3.2.练习二

 

练习目的:

 

熟悉reset()的用法

理解回归初始状态的意义

实例代码:

 

CyclicBarrier barrier2 = new CyclicBarrier(2);

//如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生

LOGGER.info("如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生");

barrier2.reset();

System.out.println();

 

Thread.sleep(100);

//如果是一个已经打开一次的CyclicBarrier,则reset()之后,什么也不会发生

ExecutorService executorService2 = Executors.newCachedThreadPool();

//等待两次

for (int i = 0; i < 2; i++) {

    executorService2.submit(() -> {

        try {

            barrier2.await();

            LOGGER.info("222屏障已经打开.");

        } catch (InterruptedException e) {

            //e.printStackTrace();

            LOGGER.info("222被中断");

        } catch (BrokenBarrierException e) {

            //e.printStackTrace();

            LOGGER.info("222被重置");

        }

    });

}

barrier2.reset();

 

Thread.sleep(100);

System.out.println();

//如果是一个 有线程正在等待的线程,则reset()方法会使正在等待的线程抛出异常

executorService2.submit(() -> {

    executorService2.submit(() -> {

        try {

            barrier2.await();

            LOGGER.info("333屏障已经打开.");

        } catch (InterruptedException e) {

            //e.printStackTrace();

            LOGGER.info("333被中断");

        } catch (BrokenBarrierException e) {

            LOGGER.info("在等待过程中,执行reset()方法,等待的线程抛出BrokenBarrierException异常,并不再等待");

            //e.printStackTrace();

        }

    });

});

Thread.sleep(100);

barrier2.reset();

executorService2.shutdown();

break;

 

执行结果:

 

2018-04-01 16:53:12 INFO - 如果是一个初始的CyclicBarrier,则reset()之后,什么也不会发生

 

2018-04-01 16:53:13 INFO - 222屏障已经打开.

2018-04-01 16:53:13 INFO - 222屏障已经打开.

 

2018-04-01 16:53:13 INFO - 在等待过程中,执行reset()方法,等待的线程跑出BrokenBarrierException异常,并不再等待

 

3.3.练习三

 

练习目的:

 

练习await()/await(timeout,TimeUnit)/isBroken()的使用方法

理解破损标志位broken的状态转换

实例代码:

 

CyclicBarrier barrier1 = new CyclicBarrier(3);

ExecutorService executorService = Executors.newCachedThreadPool();

//添加一个用await()等待的线程

executorService.submit(() -> {

    try {

        //等待,除非:1.屏障打开;2.本线程被interrupt;3.其他等待线程被interrupted;4.其他等待线程timeout;5.其他线程调用reset()

        barrier1.await();

    } catch (InterruptedException e) {

        LOGGER.info(Thread.currentThread().getName() + " is interrupted.");

        //e.printStackTrace();

    } catch (BrokenBarrierException e) {

        LOGGER.info(Thread.currentThread().getName() + " is been broken.");

        //e.printStackTrace();

    }

});

Thread.sleep(10);

LOGGER.info("刚开始,屏障是否破损:" + barrier1.isBroken());

//添加一个等待线程-并超时

executorService.submit(() -> {

    try {

        //等待1s,除非:1.屏障打开(返回true);2.本线程被interrupt;3.本线程timeout;4.其他等待线程被interrupted;5.其他等待线程timeout;6.其他线程调用reset()

        barrier1.await(1, TimeUnit.SECONDS);

    } catch (InterruptedException e) {

        LOGGER.info(Thread.currentThread().getName() + " is interrupted.");

        //e.printStackTrace();

    } catch (BrokenBarrierException e) {

        LOGGER.info(Thread.currentThread().getName() + " is been reset().");

        //e.printStackTrace();

    } catch (TimeoutException e) {

        LOGGER.info(Thread.currentThread().getName() + " is timeout.");

        //e.printStackTrace();

    }

});

Thread.sleep(100);

LOGGER.info("当前等待线程数量:" + barrier1.getNumberWaiting());

Thread.sleep(1000);

LOGGER.info("当前等待线程数量:" + barrier1.getNumberWaiting());

LOGGER.info("当等待的线程timeout时,当前屏障是否破损:" + barrier1.isBroken());

LOGGER.info("等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。");

 

System.out.println();

Thread.sleep(5000);

//通过reset()重置屏障回初始状态,也包括是否破损

barrier1.reset();

LOGGER.info("reset()之后,当前屏障是否破损:" + barrier1.isBroken());

LOGGER.info("reset()之后,当前等待线程数量:" + barrier1.getNumberWaiting());

executorService.shutdown();

 

运行结果:

 

2018-04-01 17:01:16 INFO - 刚开始,屏障是否破损:false

2018-04-01 17:01:16 INFO - 当前等待线程数量:2

2018-04-01 17:01:17 INFO - pool-1-thread-1 is been broken.

2018-04-01 17:01:17 INFO - pool-1-thread-2 is timeout.

2018-04-01 17:01:17 INFO - 当前等待线程数量:0

2018-04-01 17:01:17 INFO - 当等待的线程timeout时,当前屏障是否破损:true

2018-04-01 17:01:17 INFO - 等待的线程中,如果有一个出现问题,则此线程会抛出相应的异常;其他线程都会抛出BrokenBarrierException异常。

 

2018-04-01 17:01:22 INFO - reset()之后,当前屏障是否破损:false

2018-04-01 17:01:22 INFO - reset()之后,当前等待线程数量:0

 

3.4.练习四

 

练习目的:

 

练习CyclicBarrier(int parties, Runnable barrierAction)的用法

理解屏障线程的意义

实例代码:

 

//构造器:设置屏障放开前做的事情

CyclicBarrier barrier3 = new CyclicBarrier(2, () -> {

    LOGGER.info("屏障放开,[屏障线程]先运行!");

    try {

        Thread.sleep(2000);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

    LOGGER.info("[屏障线程]的事情做完了!");

});

for (int i = 0; i < 2; i++) {

    new Thread(() -> {

        LOGGER.info(Thread.currentThread().getName() + " 等待屏障放开");

        try {

            barrier3.await();

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (BrokenBarrierException e) {

            e.printStackTrace();

        }

        LOGGER.info(Thread.currentThread().getName() + "开始干活...干活结束");

    }).start();

}

 

运行结果:

 

2018-04-01 17:01:56 INFO - Thread-0 等待屏障放开

2018-04-01 17:01:56 INFO - Thread-1 等待屏障放开

2018-04-01 17:01:56 INFO - 屏障放开,[屏障线程]先运行!

2018-04-01 17:01:58 INFO - [屏障线程]的事情做完了!

2018-04-01 17:01:58 INFO - Thread-1开始干活...干活结束

2018-04-01 17:01:58 INFO - Thread-0开始干活...干活结束

 

4.应用场景

 

场景说明:

 

模拟多线程分组计算

有一个大小为50000的随机数组,用5个线程分别计算10000个元素的和

然后在将计算结果进行合并,得出最后的结果。

重点分析:

 

用5个线程分别计算:定义一个大小为5的线程池。

计算结果进行合并:定义一个屏障线程,将上面5个线程计算的子结果信息合并。

实例代码:

 

/**

* <p>CyclicBarrier-循环屏障-模拟多线程计算</p>

 *

 * @author hanchao 2018/3/29 22:48

 **/

public static void main(String[] args) {

    //数组大小

    int size = 50000;

    //定义数组

    int[] numbers = new int[size];

    //随机初始化数组

    for (int i = 0; i < size; i++) {

        numbers[i] = RandomUtils.nextInt(100, 1000);

    }

 

    //单线程计算结果

    System.out.println();

    Long sum = 0L;

    for (int i = 0; i < size; i++) {

        sum += numbers[i];

    }

    LOGGER.info("单线程计算结果:" + sum);

 

    //多线程计算结果

    //定义线程池

    ExecutorService executorService = Executors.newFixedThreadPool(5);

    //定义五个Future去保存子数组计算结果

    final int[] results = new int[5];

 

    //定义一个循环屏障,在屏障线程中进行计算结果合并

    CyclicBarrier barrier = new CyclicBarrier(5, () -> {

        int sums = 0;

        for (int i = 0; i < 5; i++) {

            sums += results[i];

        }

        LOGGER.info("多线程计算结果:" + sums);

    });

 

    //子数组长度

    int length = 10000;

    //定义五个线程去计算

    for (int i = 0; i < 5; i++) {

        //定义子数组

        int[] subNumbers = Arrays.copyOfRange(numbers, (i * length), ((i + 1) * length));

        //盛放计算结果

        int finalI = i;

        executorService.submit(() -> {

            for (int j = 0; j < subNumbers.length; j++) {

                results[finalI] += subNumbers[j];

            }

            //等待其他线程进行计算

            try {

                barrier.await();

            } catch (InterruptedException e) {

                e.printStackTrace();

            } catch (BrokenBarrierException e) {

                e.printStackTrace();

            }

        });

    }

 

    //关闭线程池

    executorService.shutdown();

}

 

运行结果:

 

2018-04-01 17:05:47 INFO - 单线程计算结果:27487277

2018-04-01 17:05:47 INFO - 多线程计算结果:27487277

--------------------- 

作者:hanchao5272 

来源:CSDN 

原文:https://blog.csdn.net/hanchao5272/article/details/79779639 

版权声明:本文为博主原创文章,转载请附上博文链接! 

 

该类就是等到所有的线程都到达指定的地点之后,在进行下一步的执行过程。

package thread.cyclicbarrier;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class CyclicBarrierTest {
	public static void main(String[] args) {
		ExecutorService pool = Executors.newCachedThreadPool();
		final CyclicBarrier cb = new CyclicBarrier(3);//申明的等待的线程有3个那么就立即往下执行
													//遇到下一个等待点则继续等待只要满足有3个线程就继续往下执行
													//无论在哪个等待点
		for (int i = 0; i < 3; i++) {
			final int c = i;
			pool.execute(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep((long)(Math.random()*1000));
						System.out.println("线程"+Thread.currentThread().getName()+"已经到达集合点1,当前已经有"+cb.getNumberWaiting()+"在等待!"
							+(cb.getNumberWaiting()+1==3?"人到齐了,一起走吧!":""));
						cb.await();//如果有线程提前到达之后,则再次等待后续的线程到达。全部到达之后则继续往下执行
						
						Thread.sleep((long)(Math.random()*1000));
						System.out.println("线程"+Thread.currentThread().getName()+"已经到达集合点2,当前已经有"+cb.getNumberWaiting()+"在等待!"
							+(cb.getNumberWaiting()+1==3?"人到齐了,一起走吧!":""));
						cb.await();//如果有线程提前到达之后,则再次等待后续的线程到达。全部到达之后则继续往下执行
						
						Thread.sleep((long)(Math.random()*1000));
						System.out.println("线程"+Thread.currentThread().getName()+"已经到达集合点3,当前已经有"+cb.getNumberWaiting()+"在等待!"
							+(cb.getNumberWaiting()+1==3?"人到齐了,一起走吧!":""));
						cb.await();//如果有线程提前到达之后,则再次等待后续的线程到达。全部到达之后则继续往下执行
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}finally {
						
					}
				}
			});
		}
		pool.shutdown();
	}
}

 

 可以看到每隔线程到达障碍点之后就会等到其他线程,直到等待线程数量为3个,则继续往下去执行。代码当中有多个障碍点,无论哪种情况只要等待下线程数目达到3个 就会立即执行。

比如:第一个障碍点有1个线程等待,第二障碍点有2个线程等待,总的等待线程数目已经达到了3个。则:那么第一个障碍点线程立即执行,第二个障碍点线程也是立即执行。

注意:所以每次申明等待线程数目,最好是总线程数的约数。这样才不会出现死锁的现象(申明等待线程数为3,总线程数为4,这样容易出现死锁的现象,切记!)

 

 

 

 

 

  • 大小: 24.1 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics