`
he_wen
  • 浏览: 234861 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

线程池代码完全剖析(三)

阅读更多

线程池代码完全剖析(三)

一、上篇文章剖析了设计线程池的第一条思路,下面分析第二条和第三条思路

 

第二条思路:用户请求的任务,而线程池是如何分配的线程给请求用户或者说是以什么样的策略方式

 

当用户调用代码

 QueuedThreadPool tp= new QueuedThreadPool();
 tp.setMinThreads(5);
        tp.setMaxThreads(10);
        tp.setMaxIdleTimeMs(1000);
        tp.setSpawnOrShrinkAt(2);
        tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
        
        tp.start();
        tp.dispatch(_job);

 

 

就开始调用任务,具体流程请看dispatch方法:

 

 public boolean dispatch(Runnable job) 
    {  
        if (!isRunning() || job==null)
            return false;

        PoolThread thread=null;
        boolean spawn=false;
            
        synchronized(_lock)
        {
            // Look for an idle thread
            int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);
            else
            {
                // queue the job
                _queued++;
                if (_queued>_maxQueued)
                    _maxQueued=_queued;
                _jobs[_nextJobSlot++]=job;
                //循环的存放待留的任务
                if (_nextJobSlot==_jobs.length)
                    _nextJobSlot=0;
                /***
                 * 由于队列是一个由数组组成的循环数组,而_nextJobSlot指针是任务加入
                 * 队列数组添加时候就++,而_nextJob指针只要从队列数组中取数据的时候
                 * ++;也就是说当加入任务的数量比取出任务的数量要快队列的数组长度时候,
                 * 就扩充数组的长度即:_nextJobSlot==_nextJob
                 */
                if (_nextJobSlot==_nextJob)
                { //扩充任务队列
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务
                    if (split>0)//把队列剩下的任务复制给扩展的队列中
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)//为什么还要复制呢?不是前面的任务已经执行完,应该没有必要?
                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                    
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }
                  //当队列任务超出了_spawnOrShrinkAt变量值时,就要创建新的线程
                spawn=_queued>_spawnOrShrinkAt;
            }
        }
        
        if (thread!=null)
        {
            thread.dispatch(job);
        }
        else if (spawn)
        {
            newThread();
        }
        return true;
    }

 

注意有的代码不明白请看上面代码的注释:这个也是通过调式后才能理解设计者的意图

1、如果闲散集合中中有闲散线程,那么就取出一个线程处理请求的任务,代码:

          int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);

               if (thread!=null)
              {
                  thread.dispatch(job);
             }
2、如何闲散集合没有线程,那么就把请求的任务放在队列中(等待有空闲的线程,注意该队列是一个循环的队列),

      代码:  _jobs[_nextJobSlot++]=job;

 

3、如果队列中存放任务已经满了,就扩充这个队列,代码:

     if (_nextJobSlot==_nextJob)
                { //扩充任务队列
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务
                    if (split>0)//把队列剩下的任务复制给扩展的队列中
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)//

                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                   
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }

4、如何spanw=_queued>_spawnOrShrinkAt即当前队列任务数量超过了变量限制的数目时,线程池就要新建一个 线   程;代码:

        else if (spawn)
        {
            newThread();
        }

5、说明调用ThreadPool的dispatch方法,这个方法只能是在闲散集合中有线程时候才会调用,代码:

 

       /* 这个是从闲散的数组中取出来的线程
         * */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }

 

二、主要是将ThreadPool类

 

 public class PoolThread extends Thread 
    {
    	//this是保持_job的同步,这个变量主要是等待分配任务
        Runnable _job=null;

        /* ------------------------------------------------------------ */
        PoolThread()
        {
            setDaemon(_daemon);
            setPriority(_priority);
        }
        
        /* ------------------------------------------------------------ */
        /** BoundedThreadPool run.
         * Loop getting jobs and handling them until idle or stopped.
         */
        public void run()
        {
            boolean idle=false;
            Runnable job=null;
            try
            {
                while (isRunning())
                {   
                    // Run any job that we have.
                    if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;//这个标志等任务调度完后,在放到闲散线程集合里
                        todo.run();
                    }
                    
                    synchronized(_lock)
                    {
                        // is there a queued job?
                        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob++];
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

                        // Should we shrink?
                        final int threads=_threads.size();
                        if (threads>_minThreads && 
                            (threads>_maxThreads || 
                             _idle.size()>_spawnOrShrinkAt))   
                        {
                        	
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }
                        /**
                         * 线程池中在刚刚在池中创建线程的时候没有任务安排
                         * 所以同时也是闲散线程的数目,同时任务完成后,该线程就要归还
                         * 给闲散集合
                         */
                        if (!idle)
                        {   
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            
//                            System.out.println(_idle.size());
                            idle=true;
                        }
                    }

                    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }
                }
            }
            catch (InterruptedException e)
            {
                Log.ignore(e);
            }
            finally
            {
                synchronized (_lock)
                {
                    _idle.remove(this);
                }
                synchronized (_threadsLock)
                {
                    _threads.remove(this);
                }
                synchronized (this)
                {
                    job=_job;
                }
                
                // we died with a job! reschedule it
                if (job!=null)
                {
                    QueuedThreadPool.this.dispatch(job);
                }
            }
        }
        
        /* ------------------------------------------------------------ 
         * 这个是从闲散的数组中取出来的线程
         * */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }
    }

 

该类是QueueThreadPool的内部类,主要是创建线程维护线程池的状态,该类的run方法,下面会详细剖析,该方法是该类的一个核心方法,非常经典。。。

 

 三、下面说第三条思路线程池中线程的生命周期

 

线程的生命周期也就是想说一下ThreadPool类中的run()方法。。。。

1、如果有一个工作任务的时候,线程就要执行该任务;

         if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;//这个标志等任务调度完后,在放到闲散线程集合里
                        todo.run();
                    }

那么这个工作任务是从什么地方来的呢?

 

来源一:就是从队列中取出任务

来源二:客户端调用dispatch方法时,线程池中还有闲散线程,于是就执行了ThreadPool中的dispatch方法,

                void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }

然后就通知run方法中:

 

    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }

这样就获得了一个任务

2、任务执行完后,要归还给闲散集合

                /**
                         * 线程池中在刚刚在池中创建线程的时候没有任务安排
                         * 所以同时也是闲散线程的数目,同时任务完成后,该线程就要归还
                         * 给闲散集合
                         */
                        if (!idle)
                        {  
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            idle=true;
                        }

3、当队列中有任务时,队列就取出任务来执行任务

        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob++];
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

4、run方法中主要是while循环,只有当线程池需要缩减线程数目时,该线程的生命才会结束,即跳出循环体

       final int threads=_threads.size();
                        if (threads>_minThreads &&
                            (threads>_maxThreads ||
                             _idle.size()>_spawnOrShrinkAt))  
                        {
                         
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }

只有当线程数目大于线程池限定的最小数目并且当前线程数目大于线程池限定的最大数目或者闲散集合的数目大于线程池中变量_spawnOrShrinkAt时,同时还要满足((now-_lastShrink)>getMaxIdleTimeMs()这个要求才会让线程生命周期结束。

 

已经详细说明了三个思路。。。

 

四、下面还想说明一下,如何安全的关闭线程池

 

 /** Stop the BoundedThreadPool.
     * New jobs are no longer accepted,idle threads are interrupted
     * and stopJob is called on active threads.
     * The method then waits 
     * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
     * stop, at which time killJob is called.
     */
    protected void doStop() throws Exception
    {   
        super.doStop();
        
        long start=System.currentTimeMillis();
        for (int i=0;i<100;i++)
        {
            synchronized (_threadsLock)
            {
                Iterator iter = _threads.iterator();
                while (iter.hasNext())
                    ((Thread)iter.next()).interrupt();
            }
             Thread.yield();
            if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
               break;
            
            try
            {
                Thread.sleep(i*100);
            }
            catch(InterruptedException e){}
            
            
        }

        // TODO perhaps force stops
        if (_threads.size()>0)
            Log.warn(_threads.size()+" threads could not be stopped");
        /***
         * 通知那些让出CPU权限的线程
         */
        synchronized (_joinLock)
        {
            _joinLock.notifyAll();
        }
    }

 

 关闭的时候新的工作不在接收,中断闲散集合中的线程;停止正在工作任务的线程,设计者主要处理方法时根据时间和循环,来杀死工作的线程

 

五、总结

 

我认为(这个只是我个人的看法)

该停止线程池中的线程工作还是有缺点,如何优雅的关闭线程池中正在工作的线程,主要做的工作:

1、应该要知道有哪些线程正在执行

2、哪些工作的线程需要马上中断

3、哪些工作线程必须要处理完才能中断

4、必须做到这些点式非常必要的。。。。

 

反正里面的设计思想非常值得学习和思考,为什么要这样设计

 

下面有Junit测试代码和源程序,我会上传过来。。。

 

 

分享到:
评论
4 楼 he_wen 2011-04-14  
谢谢你提的建议 确实是理解有点偏差  还有想说的是,本人现在在读研究生想在六月左右找一份实习的工作:
  主要兴趣是对于高并发感兴趣,对多线程和网络的理论有深入的研究,希望有一个实践的平台

希望各位推荐推荐 谢谢
3 楼 feidian1028 2011-04-13  
/***
                * 由于队列是一个由数组组成的循环数组,而_nextJobSlot指针是任务加入
                * 队列数组添加时候就++,而_nextJob指针只要从队列数组中取数据的时候
                * ++;也就是说当加入任务的数量比取出任务的数量要快队列的数组长度时候,
                * 就扩充数组的长度即:_nextJobSlot==_nextJob
                */ 
               if (_nextJobSlot==_nextJob) 
               { //扩充任务队列 
                   Runnable[] jobs= new Runnable[_jobs.length+_maxThreads]; 
                   int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务 
                   if (split>0)//把队列剩下的任务复制给扩展的队列中 
                       System.arraycopy(_jobs,_nextJob,jobs,0,split); 
                   if (_nextJob!=0)//为什么还要复制呢?不是前面的任务已经执行完,应该没有必要? 
我的理解,lz这段代码的理解有问题。
1.因为是循环数组,_nextJobSlot==_nextJob的情况是_nextJobSlot比_nextJob多了一个循环,就像跑步时跑得快的要拉开跑慢的整整一圈时候的情况,通常我们叫套圈。
在这里,表示的意思是整个jobs数据已经都放满了待处理的任务,所以需要处理。
2.而下面两个数组的拷贝,我理解,只是纯粹得改变一下数组中任务的位置。
2 楼 beforezero 2011-03-18  
最烦的就是一上来就上代码或配置文件的。多少来点自己的思想啊
1 楼 sebatinsky 2011-03-18  
占个位置,先看看,,,,

相关推荐

    阿里巴巴编码规范 基础技能认证 考题分析(考题+答案).docx

    对于暂时被注释掉,后续可能恢复使用的代码片断,在注释代码上方,统一规定使用三 个斜杠(///)来说明注释掉代码的理由。 D .不要在视图模板中加入任何复杂的逻辑。 多选 4.关于分页查询,下列哪些说法符合《阿里...

    精易模块[源码] V5.15

    6、新增“类_任务栏”可以显示隐藏任何第三方窗口图标,相当于易中的(不在任务栏显示),带【实例】演示。 7、新增“类_线程池1”中的“等待”方法。 8、修复“编码_Utf8到Ansi“分配内存失败BUG,感谢易友【仁鹰】...

    《Java并发编程的艺术》源代码

    Java线程之间的通信对程序员完全透明,内存可见性问题很容易困扰Java程序员,本章试图揭开Java内存模型的神秘面纱。 第4章从介绍多线程技术带来的好处开始,讲述了如何启动和终止线程以及线程的状态,详细阐述了多...

    javaSE代码实例

    13.6.3 利用正则式对字符串进行分析 268 13.7 小结 269 第14章 集合框架——强大的对象管理器 270 14.1 Object类——所有类的超类 270 14.1.1 toString方法的重写 270 14.1.2 equals方法的意义 271 ...

    vc++ 应用源码包_6

    内部包含:mp3播放器Lrc歌词同步源程序代码分析、mp3播放器+支持歌词同步显示哦、简单音乐播放器。 mfc 解码 视频音频解码部分。 MFC_MultiSender_OVER 文件传送,多文件(超大文件)传送功能的实现,含文档。 ...

    vc++ 应用源码包_5

    内部包含:mp3播放器Lrc歌词同步源程序代码分析、mp3播放器+支持歌词同步显示哦、简单音乐播放器。 mfc 解码 视频音频解码部分。 MFC_MultiSender_OVER 文件传送,多文件(超大文件)传送功能的实现,含文档。 ...

    vc++ 开发实例源码包

    内部包含:mp3播放器Lrc歌词同步源程序代码分析、mp3播放器+支持歌词同步显示哦、简单音乐播放器。 mfc 解码 视频音频解码部分。 MFC_MultiSender_OVER 文件传送,多文件(超大文件)传送功能的实现,含文档。 ...

    最简单的爬虫-WebMagic 0.73 源码

    作为爬虫框架,它使用httpclient作为获取网页工具、使用Jsoup作为分析页面定位抓取内容、使用ExecutorService线程池作为定时增量抓取、Jdiy作为持久层框架。不熟悉这些名词的同学们可以先行百度一下这些都是什么,起...

    vc++ 应用源码包_1

    内部包含:mp3播放器Lrc歌词同步源程序代码分析、mp3播放器+支持歌词同步显示哦、简单音乐播放器。 mfc 解码 视频音频解码部分。 MFC_MultiSender_OVER 文件传送,多文件(超大文件)传送功能的实现,含文档。 ...

    vc++ 应用源码包_2

    内部包含:mp3播放器Lrc歌词同步源程序代码分析、mp3播放器+支持歌词同步显示哦、简单音乐播放器。 mfc 解码 视频音频解码部分。 MFC_MultiSender_OVER 文件传送,多文件(超大文件)传送功能的实现,含文档。 ...

    vc++ 应用源码包_3

    内部包含:mp3播放器Lrc歌词同步源程序代码分析、mp3播放器+支持歌词同步显示哦、简单音乐播放器。 mfc 解码 视频音频解码部分。 MFC_MultiSender_OVER 文件传送,多文件(超大文件)传送功能的实现,含文档。 ...

    精通ANDROID 3(中文版)1/2

    6.1.1 完全利用代码来构建UI  6.1.2 完全使用XML构建UI  6.1.3 使用XML结合代码构建UI  6.2 Android中的常见控件  6.2.1 文本控件  6.2.2 按钮控件  6.2.3 ImageView控件  6.2.4 日期和时间控件  ...

    精通Android 3 (中文版)2/2

    6.1.1 完全利用代码来构建UI  6.1.2 完全使用XML构建UI  6.1.3 使用XML结合代码构建UI  6.2 Android中的常见控件  6.2.1 文本控件  6.2.2 按钮控件  6.2.3 ImageView控件  6.2.4 日期和时间控件  ...

    ebsite for net4.0网站建设系统 v3.0 正式版.zip

    可以应用于模板里的某一个地方,比如要在模板中调用最新新闻,或今日排行榜数据等,这些都可以直接在后台创建官方已经开发好的部件,只要做简的配置即可,部件还支持模板,所以输出的html代码是完全可定义的。...

    疯狂JAVA讲义

    学生提问:构造器是创建Java对象的途径,是不是说构造器完全负责创建Java对象? 141 5.5.2 构造器的重载 142 学生提问:为什么要用this来调用另一个重载的构造器?我把另一个构造器里的代码复制、粘贴到这个构造器...

Global site tag (gtag.js) - Google Analytics