`
raymond.chen
  • 浏览: 1418038 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

CyclicBarrier的使用

 
阅读更多

CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

 

CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。

 

public class CyclicBarrierTest {
	private static CyclicBarrier barrier;
	private static int threadCount = 2; //每次同步执行的线程数
	
	public static void main(String[] args) {
		barrier = new CyclicBarrier(threadCount, ()->{
			//如果某个线程内部抛异常(中断、失败、超时),Barrier会被标识为broken,则不会执行到这里
			System.out.println("所有线程已成功处理完,此处进行统一处理!");
			
			//重置屏障,开始新一轮同步等待处理
			barrier.reset();
		});
		
		while(true){
			new Writer(barrier, 3000).start();
			try {
				TimeUnit.MILLISECONDS.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	static class Writer extends Thread{
		private CyclicBarrier barrier;
		private long timeout;
		
		public Writer(CyclicBarrier barrier, long timeout){
			this.barrier = barrier;
			this.timeout = timeout;
		}
		
		@Override
		public void run() {
			try {
				int sleep = ThreadLocalRandom.current().nextInt(2000);
				System.out.println(Thread.currentThread().getName() + ": " + sleep);
				TimeUnit.MILLISECONDS.sleep(sleep);
				
				//await:告诉CyclicBarrier本线程已经到达同步点,然后当前线程被阻塞
				barrier.await(timeout, TimeUnit.MILLISECONDS);
				
			} catch (InterruptedException|TimeoutException|BrokenBarrierException ex) {
				System.out.println(Thread.currentThread().getName() + ":" + ex.toString());
			}

			if(barrier.isBroken()){
				System.out.println("CyclicBarrier已Broken");
			}else{
				System.out.println(Thread.currentThread().getName() + ": 所有线程已成功处理完,本线程继续往下执行");
			}
		}
	}
	
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics