- 浏览: 478692 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
龘龘龘:
TrueBrian 写道有个问题,Sample 1中,为了控制 ...
What's New on Java 7 Phaser -
龘龘龘:
楼主总结的不错。
What's New on Java 7 Phaser -
TrueBrian:
有个问题,Sample 1中,为了控制线程的启动时机,博主实际 ...
What's New on Java 7 Phaser -
liguanqun811:
不知道楼主是否对zookeeper实现的分布式锁进行过性能测试 ...
Distributed Lock -
hobitton:
mysql的get lock有版本限制,否则get lock可 ...
Distributed Lock
1 Overview
Java 7的并发包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重叠,但是提供了更灵活的用法,例如支持动态调整注册任务的数量等。本文在Phaser自带的示例代码基础上进行一下简单的分析。
2 Glossary
2.1 Registration
Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如 arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。
2.2 Arrival
正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到 达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。
2.3 Termination
Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即 return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。
2.4 Tiering
Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。
3 Sample Usage
3.1 Sample 1
在有些场景下,我们希望控制多个线程的启动时机:例如在并发相关的单元测试中,有时需要控制线程的启动时机,以期获得最大程度的并发,通常我们会使用CountDownLatch,以下是使用Phaser的版本。
import java.util.concurrent.Phaser; public class PhaserTest1 { public static void main(String args[]) { // final int count = 5; final Phaser phaser = new Phaser(count); for(int i = 0; i < count; i++) { System.out.println("starting thread, id: " + i); final Thread thread = new Thread(new Task(i, phaser)); thread.start(); } } public static class Task implements Runnable { // private final int id; private final Phaser phaser; public Task(int id, Phaser phaser) { this.id = id; this.phaser = phaser; } @Override public void run() { phaser.arriveAndAwaitAdvance(); System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id); } } }
以上例子中,由于线程是在一个循环中start,因此start的时机有一定的间隔。本例中这些线程实际开始工作的时机是在所有的线程都调用了phaser.arriveAndAwaitAdvance()之后。
此外,如果留心arriveAndAwaitAdvance()方法的签名,会发现它并没有抛出InterruptedException,实际上,即使 当前线程被中断,arriveAndAwaitAdvance()方法也不会返回,而是继续等待。如果在等待时希望可中断,或者可超时,那么需要使用以下 方法:
awaitAdvance(arrive()) // 等效于arriveAndAwaitAdvance() awaitAdvanceInterruptibly(int phase) awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
3.2 Sample 2
有些时候我们希望只有在某些外部条件满足时,才真正开始任务的执行,例如:
import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.concurrent.Phaser; public class PhaserTest2 { public static void main(String args[]) throws Exception { // final Phaser phaser = new Phaser(1); for(int i = 0; i < 5; i++) { phaser.register(); System.out.println("starting thread, id: " + i); final Thread thread = new Thread(new Task(i, phaser)); thread.start(); } // System.out.println("Press ENTER to continue"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); reader.readLine(); phaser.arriveAndDeregister(); } public static class Task implements Runnable { // private final int id; private final Phaser phaser; public Task(int id, Phaser phaser) { this.id = id; this.phaser = phaser; } @Override public void run() { phaser.arriveAndAwaitAdvance(); System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id); } } }
以上例子中,只有当用户按下回车之后,任务才真正开始执行。需要注意的是,arriveAndDeregister()方法不会被阻塞,并且返回到达时的phase number(arrive方法也是如此)。
3.3 Sample 3
CyclicBarrier支持barrier action, Phaser同样也支持。不同之处是Phaser的barrier action需要改写onAdvance方法来进行定制。
import java.util.concurrent.Phaser; public class PhaserTest3 { public static void main(String args[]) throws Exception { // final int count = 5; final int phaseToTerminate = 3; final Phaser phaser = new Phaser(count) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("====== " + phase + " ======"); return phase >= phaseToTerminate || registeredParties == 0; } }; // for(int i = 0; i < count; i++) { System.out.println("starting thread, id: " + i); final Thread thread = new Thread(new Task(i, phaser)); thread.start(); } } public static class Task implements Runnable { // private final int id; private final Phaser phaser; public Task(int id, Phaser phaser) { this.id = id; this.phaser = phaser; } @Override public void run() { do { try { Thread.sleep(500); } catch(InterruptedException e) { // NOP } System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id); phaser.arriveAndAwaitAdvance(); } while(!phaser.isTerminated()); } } }
本例中的barrier action只是简单地打印了一条信息,此外在超过指定的迭代次数后终止了Phaser。
3.4 Sample 4
在Smaple 3的例子中,主线程在其它工作线程结束之前已经终止。如果希望主线程等待这些工作线程结束,除了使用Thread.join()之外,也可以尝试以下的方式:
import java.util.concurrent.Phaser; public class PhaserTest4 { public static void main(String args[]) throws Exception { // final int count = 5; final int phaseToTerminate = 3; final Phaser phaser = new Phaser(count) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("====== " + phase + " ======"); return phase == phaseToTerminate || registeredParties == 0; } }; // for(int i = 0; i < count; i++) { System.out.println("starting thread, id: " + i); final Thread thread = new Thread(new Task(i, phaser)); thread.start(); } // phaser.register(); while (!phaser.isTerminated()) { phaser.arriveAndAwaitAdvance(); } System.out.println("done"); } public static class Task implements Runnable { // private final int id; private final Phaser phaser; public Task(int id, Phaser phaser) { this.id = id; this.phaser = phaser; } @Override public void run() { while(!phaser.isTerminated()) { try { Thread.sleep(500); } catch(InterruptedException e) { // NOP } System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id); phaser.arriveAndAwaitAdvance(); } } } }
如果希望主线程在特定的phase结束之后终止,那么可以在主线程中调用下述方法:
public static void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) { break; // ... deal with unexpected termination } else { p = phaser.arriveAndAwaitAdvance(); } } phaser.arriveAndDeregister(); }
需要注意的是,awaitPhase方法中的if (phaser.isTerminated()) 分支里需要能够正确处理Phaser终止的情况。否则由于在Phaser终止之后, phaser.register()和arriveAndAwaitAdvance()方法均返回负值,那么上述方法可能陷入死循环。
3.5 Sample 5
以下对Phaser进行分层的例子:
import java.util.concurrent.Phaser; public class PhaserTest6 { // private static final int TASKS_PER_PHASER = 4; public static void main(String args[]) throws Exception { // final int phaseToTerminate = 3; final Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("====== " + phase + " ======"); return phase == phaseToTerminate || registeredParties == 0; } }; // final Task tasks[] = new Task[10]; build(tasks, 0, tasks.length, phaser); for (int i = 0; i < tasks.length; i++) { System.out.println("starting thread, id: " + i); final Thread thread = new Thread(tasks[i]); thread.start(); } } public static void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(i, ph); } } public static class Task implements Runnable { // private final int id; private final Phaser phaser; public Task(int id, Phaser phaser) { this.id = id; this.phaser = phaser; this.phaser.register(); } @Override public void run() { while (!phaser.isTerminated()) { try { Thread.sleep(200); } catch (InterruptedException e) { // NOP } System.out.println("in Task.run(), phase: " + phaser.getPhase() + ", id: " + this.id); phaser.arriveAndAwaitAdvance(); } } } }
需要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),可以考虑使用较小的TASKS_PER_PHASER值,例如4。反之可以适当增大TASKS_PER_PHASER。
评论
Sample 1中,为了控制线程的启动时机,博主实际上是通过线程之间相互等待来实现的,即等待所有的线程都start后,再大家一起执行。这种情形据我了解应该是使用CyclicBarrier而不是CountDownLatch啊?望博主指教一下
这个确实好像搞错了!
Sample 1中,为了控制线程的启动时机,博主实际上是通过线程之间相互等待来实现的,即等待所有的线程都start后,再大家一起执行。这种情形据我了解应该是使用CyclicBarrier而不是CountDownLatch啊?望博主指教一下
发表评论
-
Understanding the Hash Array Mapped Trie
2012-03-30 10:36 0mark -
Atomic Bit Operation in Linux Kernel
2012-02-08 00:27 1980Linux Kernel支持atomic bit operat ... -
A Hierarchical CLH Queue Lock
2012-01-14 19:01 2111A Hierarchical CLH Queue Lock ( ... -
Inside AbstractQueuedSynchronizer (4)
2012-01-08 17:06 3465Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (3)
2012-01-07 23:37 4595Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (2)
2012-01-07 17:54 6304Inside AbstractQueuedSynchroniz ... -
Inside AbstractQueuedSynchronizer (1)
2012-01-06 11:04 7888Inside AbstractQueuedSynchroniz ... -
Code Optimization
2011-10-14 00:11 1558当前开发人员在进行编码的时候,可能很少关注纯粹代码级别的优化了 ... -
Distributed Lock
2011-08-02 22:02 91331 Overview 在分布式系统中,通常会 ... -
Sequantial Lock in Java
2011-06-07 17:00 21671 Overview Linux内核中常见的同步机 ... -
Feature or issue?
2011-04-26 22:23 121以下代码中,为何CglibTest.intercept ... -
Bloom Filter
2010-10-19 00:41 50191 Overview Bloom filt ... -
Inside java.lang.Enum
2010-08-04 15:40 64041 Introduction to enum J ... -
Open Addressing
2010-07-07 17:59 33951 Overview Open addressi ... -
JLine
2010-06-17 09:11 10949Overview JLine 是一个用来处理控 ... -
ID Generator
2010-06-14 14:45 1633关于ID Generator,想 ... -
inotify-java
2009-07-22 22:58 82101 Overview 最近公 ... -
Perf4J
2009-06-11 23:13 84301 Overview Perf4j是一个用于计算 ... -
Progress Estimator
2009-02-22 19:37 1485Jakarta Commons Cookbook这本书 ... -
jManage
2008-12-22 00:40 39141 Overview 由于项目需要, 笔者开发了一个 ...
相关推荐
Java Virtual Machine Support for Non-Java Languages: Java SE 7 introduces a new JVM instruction that simplifies the implementation of dynamically typed programming languages on the JVM. Garbage-First...
of how to use the new Java 7 Phaser class to synchronize tasks divided into phases. Chapter 4, Thread Executors will teach the readers to delegate the thread management to executors. They allow ...
Get to grips with exciting new concurrency features of Java 7, including the Phaser Class and the Fork/Join Framework Successfully delegate thread management to executors Customize some of the most ...
主要介绍了java多线程之Phaser的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
phaser-inspector, Phaser检查器插件允许你检查Phaser游戏 Phaser检查器插件将重新设计 [UPDATE] Phaser检查器插件重新设计和重构,以使它的成为更好的插件。 [UPDATE] Phaser检查器插件支持RenderTexture上的。 ...
使用 Phaser HTML5 game framework 製做的刮刮樂範例程式
java7在并发编程方面,带来了很多令人激动的新功能,这将使你的应用程序具备更好的并行任务性能。 《Java 7并发编程实战手册》是Java 7并发编程的实战指南,介绍了Java 7并发API中大部分重要而有用的机制。全书分为9...
最新版本的phaser文件
Phaser的伟大的开源游戏开发框架,使用HTML5创建游戏。这是一款利用Phaser.js制作仿Nokia经典的贪吃蛇游戏。
Phaser3.22最新版
phaser.min.js,免费,实用,提供phaser.min.js,前端JS
富士施乐(xerox) phaser3155驱动软件,英文版
phaser开发最新版本库文件
phaser2.4.4版本 html5游戏编写利器
Xerox富士施乐Phaser 3200MFP激光打印机驱动For Vista-64。
用 phaser 3 游戏引擎开发的 《围住神经猫》 代码来源于: https://github.com/ganlvtech/phaser-catch-the-cat 我提供的是基于此代码的修改版,修改内容:支持手机端全屏操作。 (原代码在PC浏览器中显示正常,...
Node.js上的相位器 ...安装并需要phaser和@geckos.io/phaser-on-nodejs 。 确保在服务器上以无头模式使用Phaser { type: Phaser.HEADLESS } require ( '@geckos.io/phaser-on-nodejs' ) const Phase
基于phaser的H5小游戏趣味反弹球,包括小球和障碍物的创建、障碍物强度和小球数量升级,单H5文件结构简单易懂,适合独立小游戏开发的快速上手
phaser HTML5 2D游戏引擎框架
富士施乐3116驱动是由富士施乐官方专门为富士施乐xeroxphaser3116型号打印机打造的打印机驱动程序,如果您打印机无法正常打印或者不能连接电脑,您可以下载phaser3116驱动,即可快速帮你解决这类问题。富士施乐...