`

线程池

 
阅读更多

 

个人学习笔记,如有错误欢迎指正。。

 

一 线程池的介绍

      如果需要线程来执行的任务,可以使用 new Thread 来创建线程,Thread.start()方法执行线程,线程执行完之后,不可再利用。还有其它任务需要线程来执行,需要重复上述步骤。

        这样就形成,一个任务创建一条线程来执行,对于大量任务来说,频繁创建和销毁线程都需要耗费系统资源,这种情况可以使用线程池来提升性能减少开销。

 

       线程池中的线程创建完成后,可以重复的执行不同的任务(Runnable或Callable),实现了线程的重复利用,减少了创线程的数量。

       线程池实现的简要示意:

public class ThreadPool {
	private final BlockingQueue<Runnable> queue ;
	public ThreadPool (int threadNum){
		this.queue = new ArrayBlockingQueue<Runnable>(10);
		for(int i=0;i<threadNum;i++){
			new WorkerThread().start();
		}
	}
	public void execute(Runnable runnable) throws Exception{
		this.queue.put(runnable);
	}
	private final class WorkerThread extends Thread{
		@Override
		public void run() {
			// TODO Auto-generated method stub
			try{
				queue.take().run();//不断的从任务队列中取出任务来执行
			}catch(Exception e){}
		}
		
	}
}

 

 

 线程池的构建器:

  

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
          用给定的初始参数和默认的线程工厂及处理程序创建新的 ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
          用给定的初始参数创建新的 ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
          用给定的初始参数创建新的 ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
          用给定的初始参数创建新的 ThreadPoolExecutor

 

 

 

参数说明:

    corePoolSize 核心线程数 就是线程池中创建多少条线程来处理任务。

workQueue 任务队列,用于存放需要处理的任务。线程从这个任务队列中取出需要处理的任务。

maximumPoolSize 最大线程数,当任务积压时(任务队列已满,再也存不下更多的任务时),最多创建多少条线程来来处理任务。

keepAliveTime 线程空闲超时时间,当实际线程数大于  corePoolSize ,且线程空闲时间超过 keepAliveTime ,则销毁该线程,用于保持实际线程数==  corePoolSize

 threadFactory 线程工厂,用于创建线程 。

 handler 当线程池不再理处理更的线程时,对于当于任务调用handler.rejectedExecution(command, this)来处理。

线程的创建过程:

成员变量     private volatile int   poolSize; 表示当前实际线程数据。

    execute(Runnable command)方法被调用的时候:

如果 poolSize<corePoolSize ,则创建新的线程来运行 Runnable任务,同时poolSize++;

如果 poolSize>=corePoolSize,则将 Runnable任务存入工作队列workQueue,当前实际运行的线程数==corePoolSize,这些线程运行完当前任务后,会从 workQueue.take()出任务来执行。

如果 workQueue已满,任务无法再加入队列,如果maximumPoolSize> poolSize,则创那建一条线程来运行当前任务并poolSize++; 如果 poolSize== maximumPoolSize,则当前已经创建了最大的线程数,不能再创建线程了。 对于当前的任务则调用handler.rejectedExecution(command, this);线程池提供了四种 handler:

(未指定时默认使用ThreadPoolExecutor.AbortPolicy

 

 

static class ThreadPoolExecutor.AbortPolicy
          它将抛出 RejectedExecutionException.
static class ThreadPoolExecutor.CallerRunsPolicy
          它直接在 execute 方法的调用线程中运行任务,当前执行的execute方法会直接调用Runnable.run方法,直至任务执行完成,excecute方法才会返回;如果执行程序已关闭,则会丢弃该任务。
static class ThreadPoolExecutor.DiscardOldestPolicy
          它放弃最旧的未处理请求,即将工作队列头上的任务舍弃,然后重试 execute(当前任务);如果执行程序已关闭,则会丢弃该任务。
static class ThreadPoolExecutor.DiscardPolicy
          默认情况下它将放弃当前任务。

 

二  exectue方法源码分析

exectue方法源码分析:

 

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//poolSize < corePoolSize,调用addIfUnderCorePoolSize创那建新的线程来运行任务
            if (runState == RUNNING && workQueue.offer(command)) {//poolSize = corePoolSize//任务加入工作队列
                if (runState != RUNNING || poolSize == 0)//
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command)) //poolSize < maximumPoolSize时增加线程来运行任务,
                reject(command); //当poolSize == maximumPoolSize时,调用handler.rejectedExecution(command, this);
        }
    }

 

 

 

 

 现在看一下线程池中的线程对象,是如何运行任务的:

    private final class Worker implements Runnable {

        private final ReentrantLock runLock = new ReentrantLock();

        private Runnable firstTask;

        Thread thread;

        Worker(Runnable firstTask) {//第一次运行的任务
            this.firstTask = firstTask;
        }

        /**
         * Runs a single task between before/after methods.
         */
        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                
                boolean ran = false;
                beforeExecute(thread, task);//扩展点,运行任务之前调用
                try {
                    task.run();//运行任务
                    ran = true;
                    afterExecute(task, null);;//扩展点,运行完任务后调用
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

        /**
         * Main run loop
         */
        public void run() {//线程执行入口
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {//无线循环,获取任务
                    runTask(task);//运行任务
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }
     Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;//线程STOP或TERMINATED状态,返回NULL来退出当前线程
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)//实际线程数>corePoolSize或 允许线程销毁(通过 public void allowCoreThreadTimeOut(boolean value)方法设定)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);//线程空闲超时时间内未得到任务,则返回null,用于当前线程退出,如果得到任务则运行
                else
                    r = workQueue.take();//队列有任务则取出,没有任务则阻塞
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

 

三 线程池的扩展点:

protected void beforeExecute(Thread t, Runnable r) { }

--任务运行之前调用

protected void afterExecute(Runnable r, Throwable t) { }

--任务运行之后调用。

 protected void terminated() { }

--线程池终止结束

 

四 线程池的四种状态:

RUNNING:接收新的任务并处理队列任务

SHUTDOWN:不接收新的任务,但处理队列任务

STOP:不接收新的任务,不处理队列任务,并中断所有进行中的任务。

TERMINATED:与STOP相同并所有线程结束

 

五 线程池的关闭:

1.shutdown()方法

调用shutdown()方法之后,更改线程池的状态为 shutdown并设置中断标志等操作后就返回了,这时线程池并没有立即停止运行:exectue对于新入的任务调用 reject(command)来拒绝任务,各个线程需要把工作队列中的任务都处理完成,才能退出。

 

  源码分析:

    

    public void shutdown() {
        

	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < SHUTDOWN)
                runState = SHUTDOWN;//更改状态为SHUTDOWN

            try {
                for (Worker w : workers) {
                    w.interruptIfIdle();//如果线程没有运行任务,则设置线程中断标志
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            tryTerminate(); // 如果线程数为0,并任务队列为空则结束
        } finally {
            mainLock.unlock();
        }
    }

  void interruptIfIdle() {
            final ReentrantLock runLock = this.runLock;
            if (runLock.tryLock()) {//线程未执行任务,则设定线程中断标志
                try {
                    thread.interrupt();
                } finally {
                    runLock.unlock();
                }
            }
        }

//线程在shutdown状态下的运行情况及对中断的响应
Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // 如果shutdown了,则直接取任务队列中的任务,队列为空则返回null,而不是调用TASK(没有任务时等待)
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();//线程被唤醒后,检测到中断标志,抛出异常InterruptedException,从而进入下一个循环
                if (r != null)//如果有工作任务,则继续执行任务
                    return r;
                if (workerCanExit()) {//如果没有工作任务,再次判断工人队列为空,则当前线程可以退出
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;//退出当前线程
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }


 

 

 2. List<Runnable> shutdownNow()

    shutdownNow方法运行后,更改线程池状态为STOP,并将所有线程设置中断,工作队列清空,返回未处理的任务列表。这时线程池未必已停止,因为可能有正在运行中的线程,需要线程都退出之后,线程池才真正停止。

源码分析:

public List<Runnable> shutdownNow() {
	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < STOP)
                runState = STOP;//设置STOP状态

            try {
                for (Worker w : workers) {
                    w.interruptNow();//所有线程设置中断标志
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            List<Runnable> tasks = drainQueue();//清空任务队列
            tryTerminate(); // 如果当前没有线程运行,则试着完全停止
            return tasks;//返回未处理的任务列表
        } finally {
            mainLock.unlock();
        }
    }

//Stop状态下线程的运行情况及对中断的响应
  Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)//如果是stop状态则直接返回null从而退出线程
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();//take方法会响应中断,抛出异常,并从循环开始再次执行,从而退出
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
 
  

 

上述两个停止方法 shutDown() 和 shutDownNow()被调用后,线程池未必立即停止运行,那什么时候线程池已经停止运行了呢? awaitTermination方法返回真,则已停止。

源码分析:

 

   

 public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runState == TERMINATED)
                    return true;//线程池已停止立即返回真
                if (nanos <= 0)//等待时间已用完或设定的等待时间>=0,则立即返回假
                    return false;
                nanos = termination.awaitNanos(nanos);//阻塞等待线程池停止,如果已停止则被唤醒,超时也被唤醒
            }
        } finally {
            mainLock.unlock();
        }
    }

 

 六 可获取线程任务执行结果的方法 sumit(Callable task)

    方法 public <T> Future<T> submit(Callable<T> task) ,提交一个任务让线程池执行,并返回Future对象用于获取任务的执行结果。submit()方法执行后会立即返回,不会等待任务执行完成。 可以通过isDone()判断线程是否执行完成。get()方法获取结果(任务未执完则阻塞)

   future可用方法:

 

boolean cancel(boolean mayInterruptIfRunning)
          试图取消对此任务的执行。
 V

get()
          如果任务未完成,则阻塞,直至任务完成获取其结果。

    异常:

 

CancellationException - 如果计算被取消。ExecutionException - 如果计算抛出异常。InterruptedException - 如果当前的线程在等待时被中断。

 

 V

get(long timeout, TimeUnit unit)

         如果任务未完成,则阻塞指定时间,如指定时间内任务完成,则获取其结果。

      异常:

CancellationException - 如果计算被取消。
ExecutionException - 如果计算抛出异常。
InterruptedException - 如果当前的线程在等待时被中断。
TimeoutException - 如果等待超时。
<!-- ========= END OF CLASS DATA ========= -->
 boolean isCancelled()
          如果在任务正常完成前将其取消,则返回 true
 boolean isDone()
          如果任务已完成,则返回 true

 

 

 ThreadPoolExecutor 线程池 submit 方法 将Callable对象转为Runnable对象并调用 execute(Runnable)方法来执行任务。

源码分析:

 

 public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);//将Callable对象转为Runnable对象,调用execute(Runnable)方法执行任务
        return ftask;
    }


 

  RunnableFuture 类实现了Future,提供了get()等方法。

  RunnableFuture 类实现了Runnable,线程池通过execute方法调用它的run()方法来执行任务。

  主要使用了AbstractQueuedSynchronizer实现阻塞及唤醒。

  源码分析看看run方法做了什么:

  

   	    void innerRun() {//run方法直接调用这个方法
            if (!compareAndSetState(0, RUNNING))//如果state为0,则改为Running,state初始化状态为0,如果不为0,则说明该任务被(可能其它线程)调用过了,不能再调用立即返回。
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // 重新检查线程状态
                    innerSet(callable.call());//callable.call()调用任务,结束后调用innerSet设置结果
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
    }    
    
  void innerSet(V v) {
	 for (;;) {
		int s = getState();
		if (s == RAN)//任务状态为已完成则直接返回
		    return;
                if (s == CANCELLED) {
                    releaseShared(0);//任务取消,释放线程共锁
                    return;
                }
		if (compareAndSetState(s, RAN)) {//将state状态变为已完成
                    result = v;//结果赋值
                    releaseShared(0);//调用tryReleaseShared方法,tryReleaseShared方法返回真,唤醒被阻塞的线程
                    done();
		    return;
                }
            }
    }
    
     protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;//返回真,唤醒调用GET方法阻塞线程返回任务结果
        }
        
   }
    
    //get结果方法直接调用本方法
      V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);//方法内部调用tryAcquireShared,如果任务未完成,则阻塞,直至被唤醒,唤醒时任务已完成了,才能顺序执行,返回结果或抛出异常
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
        //方法返回-1,则表示任务未完成,线程阻塞,等待唤醒
        //方法返回1,表示任务已完成,程序顺序执行
        protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
  其它submit方法:
 
Future<?>

submit(Runnable task)
          提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。

         该 Future.get()方法阻塞直至任务完成,返回null. 

        (关心的是任务何时完成,并不关心任务的结果)

<T> Future<T>
submit(Runnable task, T result)
          提交一个 Runnable 任务用于执行,并返回一个 Future,该 Future.get()方法阻塞直至任务完成,返回方法参数中的result

 

 七 批量执行任务并返回结果的方法

 

 

<T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
          执行给定的任务,阻塞直至所有任务完成时,返回保持任务状态和结果的 Future 列表。Future.get()会立即返回结果,因为任务已执行完成了。 
<T> List<Future<T>>

invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
          执行给定的任务,阻塞直至所有任务在超时前执行完成,返回所有任务的Future 列表。这时所有的Future.isDone==true;

       如果有任务未在超时时间内完成,由于超时返回所有任务的Future 列表,

并取消未完成的任务。Future.isDone==true;为完成任务,Future.isDone==false为未完成的任务。

 

<T> T
invokeAny(Collection<Callable<T>> tasks)
          执行给定的任务,阻塞直至某一个任务最先完成(也就是未抛出异常),则返回该任务的执行结果,并取消其它任务
<T> T

invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
          执行给定的任务,阻塞直至某一个任务最先完成,返回该任务的结果,并取消其它任务。

     如果给定的超时期满前没有任何任务完成,抛出TimeoutException。

 

 

 八 线程池任务抛出异常时的处理

   如果任务抛出异常即 Runnable任务对象的run方法抛出异常,会导至执行该任务的线程退出。 减少了线程数。 有新任务进行时,发现pooSize<corePoolSize ,则创那建新线程。 

   如果很多任务抛出异常,又有新任务进入,就导至线程的频繁的创建与退出。 

  如果很多任务抛出异常,没有新任务进入,则不会创建新的线程,会导至可利用的线程越来越少,对于大的队列,影响了并发,使整体任务处理速度变慢。

  当前所有线程任务 抛出异常时,这些线程会都退出,但任务队列中还有需要处理的线程,则线程池只会创建一条新的线程执行任务。

  可能通过监控getCorePoolSize()方法的返回值确定实际线程数,需要增加线程可以调用prestartCoreThread 来启动一条核心线程 或prestartAllCoreThreads

 

 源码分析:

 

   public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);//抛出异常,则该方法被调用,该方法执行完成后,就退出了run方法,当前线程不可再利用了。
            }
        }
    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//线程列表中移走该线程
            if (--poolSize == 0)//实际线程数-1
                tryTerminate();//如果所有线程任务都报错,则poolSize会等于0,则进行tryTerminate方法
        } finally {
            mainLock.unlock();
        }
    } 
    
   private void tryTerminate() {
        if (poolSize == 0) {如果所有线程任务都报错,则poolSize会等于0,状态是Running
            int state = runState;
            //如果工作队列中没有任务,则不做任何处理,新入任务execute方法会启用新的线程
            if (state < STOP && !workQueue.isEmpty()) {//工作队列中有任务
                state = RUNNING; // disable termination check below
                Thread t = addThread(null);//创建一条新的线程用来处理队列中的任务
                if (t != null)
                    t.start();
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }    

 九 定时执行或需要周期性重复执行的任务(ScheduledThreadPoolExecutor

 

方法说明:

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的 ScheduledFuture。
 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的一次性操作。
 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
          创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
          创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

 

内部实现主要使用延迟队列DelayQueue,延迟队列DelayQueue是Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满。此队列不允许使用 null 元素。

  DelayQueue使用装饰模式,在内部队列PriorityQueue的基础上提供的时间延迟的功能

源码分析:

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;//虽然first != null, first.getDelay(TimeUnit.NANOSECONDS) > 0 说明未到期则返回NULL
            else {
                E x = q.poll();
                assert x != null;
                if (q.size() != 0)//队列不为空,则唤醒其它线程,如TAKE方当阻塞的线程
                    available.signalAll();
                return x;
            }
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();//队列中没有对象则阻塞
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {//队列中有队象,但未到期,也阻塞
                        long tl = available.awaitNanos(delay);
                    } else {
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // wake up other takers
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }
 

 

 DelayQueue 队列中的对象,只有到期才能TAKE出来,调度线程池依据此功能实现了 延迟指定时间再执行任务的功能,如:

 

<V> ScheduledFuture<V>
schedule(Callable<V> callable, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的 ScheduledFuture。
 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的一次性操作。

 

 

 

那么重复周期性执行的任务又如何实现的呢? 

看源码:

 

        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();//调用RUN方法,并将任务状态初始化,以便下次再调用
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isTerminating()))) {
                long p = period;
                if (p > 0)
                    time += p;//计算下一次调用时间
                else
                    time = now() - p;//计算下一次调用时间
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
//重新加入队列,等待下一周期TAKE出来执行。
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            if (isPeriodic())//如果需要周期性执行
                runPeriodic();
            else
                ScheduledFutureTask.super.run();//只需一次执行
        }
    }
 

 

 

 

 十 提供线程池实例化功能的工厂类 

 

static ExecutorService newCachedThreadPool()
          创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
          创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。
static ExecutorService newFixedThreadPool(int nThreads)
          创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
          创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
          创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
          创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
static ExecutorService newSingleThreadExecutor()
          创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
          创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。
static ScheduledExecutorService newSingleThreadScheduledExecutor()
          创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
          创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

 

 

 十一  可立即获得线程任务执行结果的服务类ExecutorCompletionService

ThreadPoolExecutor  提供了执行一组线程任务的方法:

List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

该方法阻塞直至所有任务完成。

每个线程任务执行时间长短不一,即使最短线程执行完成,也不能返回结果,需要等待所有线程完成时才返回,这些线程中可能存在需要执行很长时间的线程。 

 ExecutorCompletionService 类提供了某条线程任务执行完成后,可立即获得其结果的功能,而不需要等待其它线程的完成。 

实现原理是:每个线程任务(FutureTask)执行完成后,将FutureTask存入BlockingQueue,并对外提供了BlockingQueue的poll及task方法。

API方法:

 

Future<V>

poll()
         调用  BlockingQueue.poll 获取已完成的任务(移出队列),如果队列为空则返回NULL.

         队列为空的原因:

1.此时没有任务完成

2.所有完成任务都已队列中移走 (需要根据任务数量判断是否再次poll)

 

 

 Future<V> poll(long timeout, TimeUnit unit)
      调用  BlockingQueue.poll 获取已完成的任务(移出队列),等待指定时间后队列仍为空则返回NULL.

 队列为空的原因:

1.此时没有任务完成

2.所有完成任务都已队列中移走 (需要根据任务数量判断是否再次poll)

 Future<V> submit(Callable<V> task)
          将Callable转换为 FutureTask ,提交给线程池执行,任务执行完成后,将任务 FutureTask 存入 BlockingQueue队列
 Future<V> submit(Runnable task, V result)
            将Runnable转换为 FutureTask ,提交给线程池执行,任务执行完成后,将任务 FutureTask 存入 BlockingQueue队列
 Future<V>

take()
          调用  BlockingQueue.take获取已完成的任务 (移出队列),如果队列为空则阻塞,直至有任务完成。 

   注意:需要根据任务数量判断是否再次take,如果没有任务了,再take会一至阻塞

 

 

 

实现原理:

   submit方法提交任务时将runnable或Callable类转换为 FutureTask ,  FutureTask 实现了Runnable。

线程池执行任务时是调用  FutureTask 的run方法,run方法执行结束时调用done方法, done方法是属于模版模式,供子类重写。

 

 void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();//任务完成时调用
		    return;
                }
            }
        }

 

 protected void done() { }

 

 ExecutorCompletionService  中实现了done方法,方法中将任务存入队列:

 private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

 

分享到:
评论

相关推荐

    线程池线程池线程池线程池

    线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池线程池...

    java线程池概念.txt

    corePoolSize:核心池的大小,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; ...

    阻塞线程池 阻塞线程池 阻塞线程池

    阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池...

    Python 使用threading+Queue实现线程池示例

    一、线程池 1、为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3&gt;T2,那...

    线程池  

    VC实现线程池

    易语言真正的线程池简易实现

    易语言简易线程池的实现。 ——V雪落有声V原创。转载请保留。前文:。为了能充分理解本篇文章的内容,需要了解的知识如下:。1.事件对象的使用:http://baike.baidu.com/view/751499.htm。2.信号量的使用:...

    Java简单线程池 线程池中文文档

    简单的线程池程序+中文文档 包结构: com.tangkai.threadpool --SimpleThread.java 工作线程 --TestThreadPool.java 程序入口 --ThreadPoolManager.java 线程池管理类

    Windows下一个比较完美的线程池实现和示例

    Windows下一个比较完美的线程池实现和示例 本线程池提供了如下功能: 1.能根据任务个数和当前线程的多少在最小/最大线程个数之间自动调整(Vista后的系统有 SetThreadpoolThreadMaximum 等函数有类似功能); 2.能方便...

    C#管理线程池的类ThreadManager

    C#管理线程池的类 /* How to use Thread Classs * * ============== * public ELMService() { InitializeComponent(); etm.ClalThreadPool("EmailThreads", (uint)ApplicationInfo.EmailParsingThreads); ...

    线程池.zip,互斥锁+条件变量+队列,实现线程池,包括线程池的创建,塞任务,和销毁线程池

    线程池

    易语言线程池操作例程(解决内存不断升高的问题)

     因为本人是个小白,多线程经常用,但是线程池并没有用过,(一听到线程池,总感觉高大上)。但是近期写彩票软件的时候发现,多线程长期操作会导致内容不断的升高直至报错,遂想起了线程池,完善后发现不是一般的叼...

    一个通用的Java线程池类

    2.然后根据提示运行java命令执行示例程序,观看线程池的运行结果 目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来...

    C++实现线程池详解(基于boost源码以及封装等线程池)

    一、要实现高效的线程池,可以考虑以下几点 二、实现线程池可以按照以下步骤进行 三、简单的C++线程池代码示例 四、 基于boost编写的源码库 - 线程池 4.1 基于boost编写的源码库地址 4.2 boost线程池的先进先出、...

    VC简单的线程池使用实例

    1.线程池管理器(ThreadPoolManager):用于创建并管理线程池 2.工作线程(WorkThread): 线程池中线程 3.任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。 4.任务队列:用于存放没有处理的...

    如何使用线程池

    以下示例显示如何使用线程池。首先创建 ManualResetEvent 对象,此对象使程序能够知道线程池何时运行完所有的工作项。接着,尝试向线程池添加一个线程。如果添加成功,则添加其余的线程(本例中为 4 个)。然后...

    linux线程池创建c实现

    linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现

    windows线程池,使用Windows自带的线程池api功能,比你写的线程池性能好得多

    使用Windows自带的线程池功能,比你写的线程池性能好得多

    Linux下通用线程池的构建

    什么是线程池?简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放回池中...

    一个简单线程池的实现

    这是一个简单线程池的实现,虽然有很多bug,但是能够简单地实现线程池。

    Java 自己实现线程池

    Java开发,Android开发,自己实现线程池,明白线程池的实现机制

Global site tag (gtag.js) - Google Analytics