`
greemranqq
  • 浏览: 966200 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

ThreadPoolExecutor 源码分析(一)-- 线程工作流程

阅读更多

一、序言

       关于“池”的概念,我的理解是它是为了让我们更快的获得资源,节省时间,在我所知的所有池(线程池、连接池、常量池、缓存池、对象池等等),都是这个作用,这里我们仅仅分享线程池的相关理解。

       1.我们什么时候要用线程池?

       在JAVA 里面我们一切都是对象,线程(Thread)同样也是对象,只要是对象那么就要涉及创建、使用、回收等三个主要步骤。通常情况下,创建线程的时间 和 回收(销毁)线程的时间的开销,由JVM控制,而使用过程由我们控制。假设我们在使用时间很短,并且发生频率很高的情况下,那么线程的频繁创建和销毁就会占用大量的的时间,为了减少这种开销,我们利用线程池技术,创建一个线程池,用的时候从里面拿,不用了放回去,再空闲的时间再进行销毁,能节省时间。

        并且在一定程度上,无状态的线程是可以复用的,可以减少对象的创建。

 

 

二、功能介绍和设计:我们将分析JAVA线程池,然后写一个出来方便理解

        1.创建线程池:

 // 这是JDK 提供的此线程池的创建
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

 

        a.corePoolSize :线程池基本线程数,因为一般池的创建分为两种,一种是初始化的时候就进行创建默认的最小连数目,还有一种是当使用的时候才创建,直到创建的数目超过基本数值的时候,后面的获取,才根据池里面的数量空闲状态进行返回,在此之前,都是一直创建,直到超过基本值。

 

        b.maximumPoolSize:线程池最大容量

 

        c.keepAliveTime:线程活动保持时间 ,线程在池里面的空闲的时间,相当于外包人员,在长时间没有接到任务的时候,就让他离职!减少负担。

 

        d.unit:时间单位,这个可以参考TimeUtil 提供了很多精确的时间类型

 

     e.workQueue:工作队列,这里负责维护多个任务调度,比如当我们进网吧的时候是需要刷卡的,当人比较多的时候需要排队,而且同时还要应对结账的人员,这里用队列的方式进行管理。

     至于BlockingQueue 的实现,我们后面再介绍。

         

     f.threadFactory:线程工厂,这里面提供了多种线程的创建方式

 

     g:RejectedExecutionHandler:饱和策略,和缓存池类似,我们不可能无限制的分配下去,当线程数量到最大值的时候,我们需要用一种策略进行处理。

       具体的处理策略我们也留到后面。

      

    我们的设计如下:

    

 

 

     2.功能体现:

     既然是线程池,里面肯定放的是线程,同时我们肯定要有执行线程的方法execute,先来看看JDK这部分主体代码:

     

/**
 * 自己实现线程池,为了方便理解,按照JDK 的进行编写
 * 
 * @author Ran
 */
public class ThreadPool {

	private volatile int corePoolSize;
	private volatile int maximumPoolSize;
	private volatile ThreadFactory threadFactory;
	private final BlockingQueue<Runnable> workQueue;
	private volatile long keepAliveTime;
	private volatile RejectedExecutionHandler handler;
	// 当前实际线程数
	private volatile int   poolSize;
	// 状态锁,主要用于对poolSize,corePoolSize,maximumPoolSize,runState,workers 更新时的锁定
	private final ReentrantLock mainLock = new ReentrantLock();
	
	// 线程的一些状态
	volatile int runState;   
	static final int RUNNING = 0;   
        // 不接受新任务了,但是已经加入队列的还会执行
	static final int SHUTDOWN = 1; 
        // 停止了,队列里面的任务也不执行了
	static final int STOP = 2; 
        // 全部停止,会关闭所有正在执行的线程 
	static final int TERMINATED = 3; 
	
	// 存放工作线程的集合
	private final HashSet<Worker> workers = new HashSet<Worker>();
	
	// 记录线程池到达的最高峰值 的线程数
	private int largestPoolSize;

	// 构造
	public ThreadPool(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		if (corePoolSize < 0 || maximumPoolSize <= 0
				|| maximumPoolSize < corePoolSize || keepAliveTime < 0)
			throw new IllegalArgumentException();
		if (workQueue == null || threadFactory == null || handler == null)
			throw new NullPointerException();
		this.corePoolSize = corePoolSize;
		this.maximumPoolSize = maximumPoolSize;
		this.workQueue = workQueue;
		this.keepAliveTime = unit.toNanos(keepAliveTime);
		this.threadFactory = threadFactory;
		this.handler = handler;
	}
	
	// 执行方法
	public void execute(Runnable command) {
	        if (command == null)
	            throw new NullPointerException();
	        // 如果线程池数,超过我们的基本连接数,直接执行下面
	        // 如果当前线程数小于基本线程数,就创建并执行 ,返回
	        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
	        	// 如果创建失败再次检测,如果有正在取线程的就放进去
	            if (runState == RUNNING && workQueue.offer(command)) {
	                if (runState != RUNNING || poolSize == 0)
	                	// 如果线程池不处于运行状态,或者是第一个线程进入,执行
	                	// 确保始终有一个线程执行该任务
	                    ensureQueuedTaskHandled(command);
	                    
	            }else if (!addIfUnderMaximumPoolSize(command))
	            	// 否则就执行其他策略
	                reject(command); // is shutdown or saturated
	        }
	    }
	
	  // 添加并返回
	  private boolean addIfUnderCorePoolSize(Runnable firstTask) {
	        Thread t = null;
	        final ReentrantLock mainLock = this.mainLock;
	        mainLock.lock();
	        try {
	        	// 可以看出,当前线程数是小于基本线程数,并且线程池处于活动状态
	        	// 那么就进行创建 添加操作
	            if (poolSize < corePoolSize && runState == RUNNING)
	                t = addThread(firstTask);
	        } finally {
	            mainLock.unlock();
	        }
	        if (t == null)
	            return false;
	        //然后会启动该线程 
	        t.start();
	        return true;
	    }
	  
	    // 添加操作
	    private Thread addThread(Runnable firstTask) {
	        Worker w = new Worker(firstTask);
	        // 创建,这路的创建,如果传的null, 会new 一个,默认在DefaultThreadFactory                  // 里面 可以看到
	        Thread t = threadFactory.newThread(w);
	        if (t != null) {
	        	// 可以看出,创建的线程实际是worker 对象,里面封装了很多内容
	            w.thread = t;
	            // 然后保存进去。返回
	            workers.add(w);
	            int nt = ++poolSize;
	            if (nt > largestPoolSize)
	                largestPoolSize = nt;
	        }
	        return t;
	    }
	    
	    
	    // 排队后从新检查状态,如果处于非RUNNING 状态,会把刚才队列里面的清掉
	    // 确保有一个线程来处理这个任务(前提是addThread 要成功)
	    private void ensureQueuedTaskHandled(Runnable command) {
	        final ReentrantLock mainLock = this.mainLock;
	        mainLock.lock();
	        boolean reject = false;
	        Thread t = null;
	        try {
	        	// 重新检查状态
	            int state = runState;
	            // 如果挂了,就把刚才那个移除返回true,执行处理的策略了
	            if (state != RUNNING && workQueue.remove(command))
	                reject = true;
	            else if (state < STOP &&
	                     poolSize < Math.max(corePoolSize, 1) &&
	                     !workQueue.isEmpty())
	            	// 如果线程池禁止添加新任务 了,并且队列不为空,并且基本数未满
	                t = addThread(null);
	        } finally {
	            mainLock.unlock();
	        }
	        if (reject)
	            reject(command);
	        else if (t != null)
	            t.start();
	    }
	    // 调用拒绝执行的策略
	    void reject(Runnable command) {
	        //handler.rejectedExecution(command, this);
	    }
	    
	    // 队列添加失败(一般是满了)的时候,在满足条件的情况下,会再次创建新
	    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
	        Thread t = null;
	        final ReentrantLock mainLock = this.mainLock;
	        mainLock.lock();
	        try {
	            if (poolSize < maximumPoolSize && runState == RUNNING)
	                t = addThread(firstTask);
	        } finally {
	            mainLock.unlock();
	        }
	        if (t == null)
	            return false;
	        t.start();
	        return true;
	    }
	    
	    
	    // 这是内部类,里面对封装了,我们对线程
	    private final class Worker implements Runnable {
	    	 // 针对每个线程任务进行控制,
	    	 private final ReentrantLock runLock = new ReentrantLock();
	         private Runnable firstTask;
	         Thread thread;

			public Worker(Runnable firstTask) {
				this.firstTask = firstTask;
			}

			@Override
			public void run() {
				
			}
	    }
}

 

 

执行方法小结:

        上面代码,估计大家看着有点乱,这里我进行描述,然后借助图例,然后再回过头进行理解,就清楚了。整个执行方法无非就坐了几件事情:

         1.当前线程数poolSize 小于 corePoolSize基本线程数,也就是说连最小、最基本的线程数都没满足,那么就会执行addIfUnderCorePoolSize方法。

         1.1 addIfUnderCorePoolSize 主要是把一个线程包装成Worker对象,会执行addThread方法,用工厂创建线程,然后保存在我们的集合里,然后执行该线程,OK 返回true,失败返回false.

 

         2.当上面条件不满足的情况,我们会看workQueue 是否满了,如果workQueue.offer(command) 成功,表示未满,就保存进队列。

         2.1成功判断如果是第一个进入的线程,poolSize == 0,那么会执行ensureQueuedTaskHandled方法,该方法会再次验证线程池处于什么状态,如果是非RUNNING了,就把刚才加入队列的线程,移除,然后执行拒绝的策略,如果remove 失败了,会创建一个线程来,保证该

 

         3. 如果workQueue.offer(command) 失败,说明队列满了,会执行addIfUnderMaximumPoolSize 方法,该方法是去判断线程池是否满了,未满的情况下 ,创建(包装)线程,并且执行,返回true, 否则返回false.

执行拒绝策略。

 

         好吧,如果你还无法理解,我还是用实际例子:

         假设我有4台电脑,准备让4个人帮我打文件:corePoolSize = 4.这时候来了3个人 3<4. 我就给他们穿上工作服(Worker包装),然后 让他们做事(addIfUnderCorePoolSize),过了一会又来2个人,这时候发现人够了,那么就把新来的放进等候室(队列)(workQueue.offer(command) ),然后人来多了,再让他们去等候间的时候,发现人满了,这是就要用你指定的策略了(RejectedExecutionHandler)。当然还有一种可能是也许第一个人进来的时候,可以add 失败,也许电脑就坏了,或者卡死了(!RUNNING),这时候就会为了确这个人有活干(ensureQueuedTaskHandled)你得重新检查一下,重新弄弄系统什么的吧!

        下面我copy 的逻辑图,给大家再理解理解,图片来源:http://ifeve.com/java-threadpool/

       

 

 

 

 3. 细节处理

     上面我们初步解释了连接池的工作原理,但是里面线程怎么工作,怎么管理,以及淘汰策略怎么完成的,这些现在进行解释:

      3.1 线程如何工作?

      我们回到worker 里面的run 方法:

      

		@Override
		public void run() {
			try {
				Runnable task = firstTask;
				firstTask = null;
				// 一直从队列里面取,知道执行完成
				while (task != null || (task = getTask()) != null) {
					runTask(task);
					task = null;
				}
			} finally {
				workerDone(this);// 退出
			

    上面主要有getTask 和 runTask ,我们分别来看看 这里面做了什么吧;

    

        Runnable getTask() {
			for (;;) {
				try {
					int state = runState;
					// 放弃队列里面任务执行,直接返回
					if (state > SHUTDOWN)
						return null;
					Runnable r;
					// 该状态,会返回已经加入队列连的线程
					if (state == SHUTDOWN) // Help drain queue
						r = workQueue.poll();
					// 如果超过的基本线程,并且allowCoreThreadTimeOut                                          // 参数允许回收
			else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
				// 获取超时,那么就会回收
				r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
					else
					// 否则就一直等待可以线程进入
					r = workQueue.take();
					if (r != null)
					return r;
					// 判断是否可以退出
					if (workerCanExit()) {
						// 如果已经停了,就中断所有工作的线程(除当                                                // 前线程)
						if (runState >= SHUTDOWN) // Wake up others
							interruptIdleWorkers();
						return null;
					}
					// Else retry
				} catch (InterruptedException ie) {
					// On interruption, re-check runState
				}
			}
		}

    

    我们可以看出,getTask主要是返回队列中的任务,过程中根据池的不同状态做不同处理,获得task之后,我我们再看看runTask 的执行。

    

	private void runTask(Runnable task) {
			final ReentrantLock runLock = this.runLock;
			runLock.lock();
			try {
			// 这段代码的锁和线程池的锁不一样,用多次判断!我不明所以。。
			// 在JDK 1.7 里面的变化挺大的,大家可以去参考里面的。
			if (runState < STOP && Thread.interrupted() && runState >= STOP)
				thread.interrupt();
				boolean ran = false;
				// 这是在任务之前执行方法,方便你重写的的。
				beforeExecute(thread, task);
				try {
					// 看出来了,还是用的Runable 的run 方法
					task.run();
					ran = true;
					// 这里也方便重写
					afterExecute(task, null);
					// 这里会记录完成任务的数量
					++completedTasks;
				} catch (RuntimeException ex) {
					if (!ran)
						afterExecute(task, ex);
					throw ex;
				}
			} finally {
				runLock.unlock();
			}
		}

 

 虽然解释得不很清楚,我相信至少能理解线程在线程池里面主要的额工作流程了。

 

 

小结:

       1.上面分析了JDK 线程池的基本实现原理,仅供参考

       2.JDK1.7 的变化挺大的,我表示很无奈,以后分析源码,还是往JDK1.8 上面靠吧,不然被淘汰了都不知道!

      3.整个流程有些在没加锁的地方,老是喜欢用多次判断的方式,因为volatile的可见性,确实可以这么做,我不得不吐槽,写得并不好,这里JDK 1.7 里面进行的大量重构!

      4.有写得不好,或者不明白的地方,欢迎大家一起提出,分享,由于篇幅和知识吸收的关系,里面的其他策略,下次分享,这里仅仅分析主要工作流程。

 

 

 

 

  • 大小: 64.4 KB
分享到:
评论
1 楼 肥啦A梦 2014-04-09  
获益不少

相关推荐

Global site tag (gtag.js) - Google Analytics