`

Java信号量—Semaphore

    博客分类:
  • JVM
阅读更多

Semaphore用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。主要通过控制一组虚拟的“许可”,当需要执行操作时首先申请获取许可,如果还有剩余的许可 并且获取成功,就执行操作;如果剩余许可为0,就阻塞当前线程;操作执行完成后释放许可,排队的阻塞线程可以被唤醒重新获取许可继续执行。这里提到排队,其实就是利用AQS的队列进行排队。

 

咋一看跟CountDownLatch有点类似,都维护了一个计数器。不同的是,CountDownLatch一开始就通过await阻塞线程,其他操作不停的对计数器减1(也可以大于1),直到为0时唤醒所有线程;Semaphore是执行操作之前对计数器减1(也可以大于1),执行完成之后释放许可对计数器加1。不难看出CountDownLatch只能使用一次,计数器为0后就不能再次使用了,而Semaphore有进有出,可以一直使用。

 

但Semaphore本质上也是基于AQS实现的,只是在重写AQS的方法时稍有不同。在详细分析Semaphore具体实现之前,先看看Semaphore是如何使用的。这里依旧以游戏为例,总所周知的大型网络游戏“魔兽世界”,在高峰期登陆游戏都需要排队,为什么呢?因为服务器资源有限,如果不做限制 服务器负载达到极限就会崩溃。这里我们用Semaphore来模拟实现“魔兽世界”中的排队,这里假设同一个服务器同一时间只能同时允许10个人同时在线,但现在有20位玩家在排队上线:

Java代码  收藏代码
  1. /** 
  2.  * Created by gantianxing on 2018/1/3. 
  3.  */  
  4. public class SemaphoreTest {  
  5.    
  6.     public static void main(String[] args) {  
  7.         //假设服务器只能承受10个人同时在线  
  8.         Semaphore semaphore = new Semaphore(10,true);  
  9.         //模拟20个玩家线程  
  10.         ExecutorService executorService = Executors.newFixedThreadPool(20);  
  11.         for (int i=0;i<20;i++){  
  12.             executorService.submit(new WowPlayer(semaphore,i+""));  
  13.         }  
  14.         executorService.shutdown();  
  15.     }  
  16. }  
  17.    
  18. class WowPlayer implements Runnable{  
  19.     private Semaphore semaphore;  
  20.     private String name;  
  21.    
  22.     public WowPlayer(Semaphore semaphore,String name) {  
  23.         this.semaphore = semaphore;  
  24.         this.name = name;  
  25.     }  
  26.    
  27.     @Override  
  28.     public void run() {  
  29.         System.out.println("玩家:"+name+"开始排队");  
  30.         try {  
  31.             semaphore.acquire();//获取许可  
  32.             try {  
  33.                 System.out.println("玩家:" + name + "进入游戏");  
  34.                 Thread.sleep(new Random().nextInt(10000));//模拟每位玩家游戏时长 10秒钟以内  
  35.                 System.out.println("玩家:" + name + "离开游戏");  
  36.             }catch (Exception e){  
  37.                 //业务异常  
  38.                 e.printStackTrace();  
  39.             }finally {  
  40.                 //释放许可,最好在finally中释放  
  41.                 semaphore.release();  
  42.             }  
  43.         } catch (Exception e) {  
  44.             e.printStackTrace();  
  45.         }  
  46.    
  47.     }  
  48. }  
  49.    

 

执行main方法,打印日志如下(日志比较长,省略了部分):

 

Java代码  收藏代码
  1. -------前10个玩家不需要排队时长为0,也就是不用排队----  
  2. 玩家:1开始排队  
  3. 玩家:1进入游戏  
  4. ------省略其他8个  
  5. 玩家:9开始排队  
  6. 玩家:9进入游戏  
  7. -----到这里10个许可用完,后面需要登陆的玩家需要排队  
  8. 玩家:14开始排队  
  9. 玩家:18开始排队  
  10. 玩家:3开始排队  
  11. 玩家:11开始排队  
  12. 玩家:15开始排队  
  13. 玩家:13开始排队  
  14. 玩家:17开始排队  
  15. 玩家:10开始排队  
  16. 玩家:7开始排队  
  17. 玩家:19开始排队  
  18.    
  19. -------等到有玩家离开游戏,排队的玩家才能进入游戏  
  20.    
  21. 玩家:9离开游戏  
  22. 玩家:14进入游戏  
  23. 玩家:8离开游戏  
  24. 玩家:18进入游戏  
  25. 玩家:0离开游戏  
  26. 玩家:3进入游戏  
  27. 玩家:6离开游戏  
  28. 玩家:11进入游戏  
  29. 玩家:1离开游戏  
  30. 玩家:15进入游戏  
  31. ----省略其他日志---  
  32.    

 

可以发现前10个玩家可以直接获得“许可”,排队时间为0 登陆后直接进入游戏;后面加入的10个玩家开始排队,为了公平性这里使用了Semaphore的公平构造方法;待前10个玩家有人离开游戏后,排队的10个玩家依次进入游戏。基本用法讲解完毕,下面开始Semaphore实现原理分析:

 

Semaphore实现原理

 

前文已经提到Semaphore是基于AQS实现的,其核心内部类就是实现AQS的子类,在Semaphore中有包含了公平实现和非公平实现。前面示例中为了保证游戏的公平性,排队使用的公平队列。这里需要提一下的是“公平”固然是好事,但是会有性能损失,主要原因是:线程在排队阻塞和被唤醒时都有上下文切换开销;而非公平的的实现,在加入队列前先检查是否存在“许可”,如果有 直接获取,相对公平实现 减少部分开销。所以在不需要严格保证排队顺序的情况下,建议都使用非公平信号量。

 

在Semaphore内部类实现AQS的过程中,为了保证部分方法复用首先定义了一个公共的实现类Sync,然后又分别创建了公平实现FairSync和非公平实现NonfairSync基础自Sync类。

 

首先看Sync类的实现:

Java代码  收藏代码
  1. abstract static class Sync extends AbstractQueuedSynchronizer {  
  2.     private static final long serialVersionUID = 1192457210091910933L;  
  3.    
  4.     Sync(int permits) {  
  5.         //构造方法,用“许可”个数初始化AQS的State字段值  
  6.         setState(permits);  
  7.     }  
  8.    
  9.     final int getPermits() {  
  10.         return getState();  
  11.     }  
  12.    
  13.     //非公平 共享获取 “资源”方法,参数为尝试获取的“资源”个数  
  14.     final int nonfairTryAcquireShared(int acquires) {  
  15.         for (;;) {  
  16.             int available = getState();  
  17.             int remaining = available - acquires;  
  18.    
  19.             if (remaining < 0 ||  
  20.                     //利用自旋,原子方式修改AQS的state值  
  21.                     compareAndSetState(available, remaining))  
  22.                 return remaining;  
  23.         }  
  24.     }  
  25.    
  26.     //共享方式 释放“资源”方法  
  27.     protected final boolean tryReleaseShared(int releases) {  
  28.         for (;;) {  
  29.             int current = getState();  
  30.             int next = current + releases;  
  31.             if (next < current) // overflow  
  32.                 throw new Error("Maximum permit count exceeded");  
  33.             if (compareAndSetState(current, next))  
  34.                 return true;  
  35.         }  
  36.     }  
  37.    
  38.     //动态调整“资源个数”  
  39.     final void reducePermits(int reductions) {  
  40.         for (;;) {  
  41.             int current = getState();  
  42.             int next = current - reductions;  
  43.             if (next > current) // underflow  
  44.                 throw new Error("Permit count underflow");  
  45.             if (compareAndSetState(current, next))  
  46.                 return;  
  47.         }  
  48.     }  
  49.    
  50.     //动态清空 所有“许可”  
  51.     final int drainPermits() {  
  52.         for (;;) {  
  53.             int current = getState();  
  54.             if (current == 0 || compareAndSetState(current, 0))  
  55.                 return current;  
  56.         }  
  57.     }  
  58. }  
  59.    

 

主要方法实现都比较简单,结合给出的注释很好理解。下面接着来看非公平的实现NonfairSync,继承自上述讲的Sync类:

Java代码  收藏代码
  1. static final class NonfairSync extends Sync {  
  2.         private static final long serialVersionUID = -2694183684443567898L;  
  3.    
  4.         //构造方法,没有添加任何新操作  
  5.         NonfairSync(int permits) {  
  6.             super(permits);  
  7.         }  
  8.         //获取资源方法,直接调用Sync定义的 非公平共享获取方法  
  9.         protected int tryAcquireShared(int acquires) {  
  10.             return nonfairTryAcquireShared(acquires);  
  11.         }  
  12.     }  
  13.    

 

最后看下公平的实现FairSync,同样继承自Sync类:

Java代码  收藏代码
  1. static final class FairSync extends Sync {  
  2.         private static final long serialVersionUID = 2014338818796000944L;  
  3.    
  4.         FairSync(int permits) {  
  5.             super(permits);  
  6.         }  
  7.         protected int tryAcquireShared(int acquires) {  
  8.             for (;;) {  
  9.                 if (hasQueuedPredecessors())  
  10.                     return -1;  
  11.                 int available = getState();  
  12.                 int remaining = available - acquires;  
  13.                 if (remaining < 0 ||  
  14.                     compareAndSetState(available, remaining))  
  15.                     return remaining;  
  16.             }  
  17.         }  
  18. }  

 

tryAcquireShared与非公平的实现区别不大,只多了一个hasQueuedPredecessors方法调用,该方法是AQS中定义的方法,主要作用就是判断当前线程是否是头节点或者队列为空,如果不是就进行排队。非公平的实现里如果尝试获取到“许可”,就无需加入队列排队了,这就是根本区别,Doug Lea大神只用了一行代码就实现这个区别,不可谓不巧妙。

 

到这里Semaphore对AQS使用内部类实现讲解完毕,下面开始看下Semaphore的核心方法,这些方法就很简单了,基本都是对上述三个内部类的方法调用,这里只列出几个核心方法即可,其他方法可以自行查阅。

 

Semaphore默认构造方法:

Java代码  收藏代码
  1. public Semaphore(int permits) {  
  2.         sync = new NonfairSync(permits);  
  3. }  

 

可以看到默认是调用AQS的非公平实现,毕竟性能会好些。主要作用就是使用参数“permits”初始化AQS的state字段。

 

Semaphore带公平或非公平参数构造方法:

Java代码  收藏代码
  1. public Semaphore(int permits, boolean fair) {  
  2.         sync = fair ? new FairSync(permits) : new NonfairSync(permits);  
  3. }  

 

主要就是根据参数fair,判断是创建AQS的公平实现还是非公平实现。

 

Semaphore的获取许可方法和释放许可方法:

Java代码  收藏代码
  1. public void acquire() throws InterruptedException {  
  2.         sync.acquireSharedInterruptibly(1);  
  3. }  
  4.    
  5. public void release() {  
  6.         sync.releaseShared(1);  
  7. }  

可以看到Semaphore的获取许可方法,是调用的AQS的“共享可中断获取方法”acquireSharedInterruptibly,之后会再回调Semaphore中的tryAcquireShared方法。说明当线程在使用Semaphore时被阻塞,是可以手动被中断的。

 

另外需要注意的是Semaphore的内部类对AQS的实现是采用的“共享”方式,因为如果有足够的多的“许可”被释放,可以同时唤醒多个线程,这时典型的共享锁的运用场景。

 

总结

 

简单的总结Semaphore,就是它可以用来控制同时访问某一资源的操作数量,或控制同时执行某个指定操作的数量。有点像限流的阀门,在有些场景下可以被固定的线程池代替,比如:Executors.newFixedThreadPool(xx),但它可以比线程池的控制更加细粒度。另外Semaphore可以理解为一种共享的可中断锁。

 

http://moon-walker.iteye.com/blog/2406557

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics