`
snake1987
  • 浏览: 71846 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java并发学习之二:线程池(五)

    博客分类:
  • java
阅读更多
之前的线程池已实现了基本的功能:运行每一个线程,而且测试了一下,大约速度是ThreadPoolExecutor的1.5倍(当然,这是有充分的理由的,后文会提到)

之后的版本将准备是实现“优雅退出”和优化(非阻塞)空闲线程队列了,这个步骤想了很久,发现了很多的问题(包括准备的实现方法也在这里列一下):
1.初步构思了几个方法
  • void shutDown():该方法将让池不再接受任务,但会将现有的任务全部运行结束后停止
  • List<Runnable> shutDownNow():该方法会interrupt线程,然后收集未调用的任务,并进一步回收已调用的任务(“已调用”不包括运行中的,因为运行中无法人为停止,只包括已下发到线程,但线程尚未执行的),然后将这些任务返回
  • List<Runnable> shutDownAndWait():跟shutDownNow类似,区别是会等待至线程池完全关闭

2.线程及池的状态,一开始很傻气地用了boolean去实现,比如:isShutDown,isRunning等,而这些变量又无法避免地被声明为volatile,我们知道,对volatile的写和读都会比对线程的内部变量的访问占用更多的消耗,与其用多个boolean,不如用一个runState,只需要判断大小就可以了,但对volatile变量的访问只有一次,而判断大小所占用的cpu时间几乎可以忽略不计
这也是一个原则性的问题了,这里记录下来,算是积累把:
表示一个东西的状态(多个状态),尽量还是使用int,如果有同步问题,则用volatile,CAS对于单个变量的控制还是很有效的
3.怎么回收已经下发,但未执行的任务?这就牵扯到另一件事了,需要获取线程的状态(也可以是任务的执行状态,因为这里目标是回收未执行的任务),这就不得不引入了更多的volatile变量,然后这个状态变量,还必须在每次执行任务的时候进行修改,就变成了一个很大的消耗了(这是不能容忍的)
注:这里不能用Thread.getState()来获得线程的状态,该状态是用以系统监控的,并不是用来控制逻辑的(也就是说它不准确)
4.空闲线程队列如果打算重新实现,这又不可避免地需要加入更多的volatile和与程序逻辑相关的happen-before关系了,这会让本来的程序逻辑变得模糊,而且出现问题的几率变得非常大
5.因为要对每个线程进行interrupt,所以又不得不再维护一个线程队列,所以又不得不
……

发现实现当初的目标越来越麻烦了

带着这些问题,最终还是去看了ThreadPoolExecutor的实现,看看大师是怎么做的

发现ThreadPoolExecutor的性能优化只做在了减少锁的范围上,使用的是ReentrantLock,而大部分的参数都使用了volatile,目的只是为了保证可见性,几乎没有出现利用happen-before规则的代码,也就是说,在ThreadPoolExecutor的实现上,优化程度只是在锁的级别上,并没有考虑进一步的优化。而查看类的作者,发现是Doug Lea

回头想想,的确,线程池是不同于AQS的框架的,因为该线程池的实现更注重的是稳定,可复用性,灵活性,安全性。一般说来,线程池调用时间与真正任务的执行时间不是一个数量级的,所以又何必去太计较那点性能的消耗呢?
因此之前所说的“运行速度是ThreadPoolExecutor的1.5倍”的原因也就出来了:任务太简单了,所以大大提高了任务调用时间所占的比例,在现实生活中应该是很少出现这样的情况的。而又说明了另一点,每样功能的实现都是要消耗性能的(特指在并发中),所以在写要求高性能的相关的实现时,应该尽量将需求给落实了,减少功能的需求,才能得到更好的性能

这里再写一下看ThreadPoolExecutor的收获
1.它可以让你注册一个RejectedExecutionHandler,来控制你的下发被拒绝的任务
2.它允许你注册一个threadFactory,来创建线程(这样可以自由地订制自己所需要的线程)
3.里面有一个方法很值得学习的:
我们在正常逻辑中,会有一些常见场景和例外场景(像可能常见场景不需要加锁,但例外场景需要),例外场景应该用尽量少的变量来判断是否进入,而常见场景应该与例外场景完全分离(即使有方法可以重用)
举个例子
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {//常见场景
                if (runState != RUNNING || poolSize == 0)//判断是否进入例外场景
                    ensureQueuedTaskHandled(command);//例外场景,有加锁动作
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

4.类中定义的变量,都用注释说明了被哪一个锁守卫的(也就是说使用上,必须在在这个锁lock状态下才允许修改),这让我想起了《java concurrency in practice》中对使用并发相关annotation的建议,之后再去找找网上有没有提供这个库,如果在编译期就能根据annotation检查错误那就更好了
5.使用了BlockingQueue接口的原有定义来实现:为了减少线程的切换消耗,每个线程在任务队列空了之后,再进行一定时间的等待。(虽然没有知道明确地定义,但根据之前阅读的一些文章,大概推断jvm的实现上,如果时间很短,是不会将线程切换出去的,而是用一个循环机制来实现的)
6.线程的等待与唤醒也是通过BlockingQueue来实现的,ThreadPoolExecutor只维护池的状态,并不会维护线程的任何状态信息,线程的基本动作是由BlockingQueue控制的,这其实在一定的程度上,将并发的复杂度降低了,让ThreadPoolExecutor的功能变得更简单,更容易理解,更容易扩展了
7.优化关键步骤:像线程池,最关键的步骤(也就是被调用次数最多,整个实现中消耗最大的部分)就是getTask,execute,和runTask方法了,在其他方法中,可以没必要去追求什么效率,因为调用次数太少了,像shutdown(就一次),或者addThread,都是很少的,所以为了避免错误,我们可以考虑加锁。但对于关键步骤,我们应该尽可能地优化,少加锁或者不加锁,像3所说的小技巧
8.ThreadPoolExecutor也并没有回收已下发到线程中的任务,而是将它运行完(也就是在getTask动作跟runTask动作之间,并没有什么机制来保证能及时发现关闭事件)

总的来说,之前所制定的计划——实现一个高效而功能强大的线程池——还是有点问题。
就算是JDK的ThreadPoolExecutor,也做不到这一点。每个功能的实现是应该有具体需求的,是优先考虑性能还是其他,都是要有依据的,优先一方面就得牺牲另一方面。像JDK的ThreadPoolExecutor也是为了实现可复用,稳定,全功能,牺牲了一定程度的性能。
0
0
分享到:
评论
1 楼 zq3062211015 2014-03-27  
楼主是大神,膜拜啊,不过有个地方说的不太对啊,

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  !!!其实这里就加锁了
            if (runState == RUNNING && workQueue.offer(command)) {//常见场景 
                if (runState != RUNNING || poolSize == 0)//判断是否进入例外场景 
                    ensureQueuedTaskHandled(command);//例外场景,有加锁动作 
            } 
            else if (!addIfUnderMaximumPoolSize(command)) 
                reject(command); // is shutdown or saturated 
        } 

相关推荐

Global site tag (gtag.js) - Google Analytics