本文章的主题主要是在应用程序中寻找可以并行的任务,也就是把问题分成几个子问题。
一、讲解Executor和Executors以及ThreadPoolExecutor
public interface Executor{
void execute(Runnable command) ;
}
执行已提交的 Runnable
任务的对象。
Executors是一个工厂类,它创建不同类型的线程池,主要是根据不同的参数配置。
下面还是主要说明ThreadPoolExecutor类中的构造方法,如:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
具体流程如下:
1)当池子大小小于corePoolSize就新建线程,并处理请求
2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理
3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理
4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁
ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。
下面分析一下创建几个经典的线程池是怎么做到的:
1、一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2、
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
二、讲解ExecutorService
由于创建了线程池说明是一个非常复杂的程序,所以就需要对它进行管理,也就是说他们的生命周期。
ExecutorService是Executor的父类,它定义了一个灵活的执行策略以及对任务的批量处理,讲解这个接口之前,需要讲解一下Future接口和Callable接口。
public interface Future<V>
boolean |
cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。 |
V |
get() 如有必要,等待计算完成,然后获取其结果。 |
V |
get(long timeout, TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 |
boolean |
isCancelled() 如果在任务正常完成前将其取消,则返回 true。 |
boolean |
isDone() 如果任务已完成,则返回 true。 |
由于是异步获得结果,重点说明get()方法。FutureTask实现了Future接口并且实现了Runnable接口,所以在执行任务的时候可以调用run方法,如:
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); // call to c.compute happens here
}
try {
return f.get();
执行任务还有另外一个方法就是委托给线程:new Thread(ft).start()
下面讲解ExecutorService接口:
由于Executor接口一次只能执行一个任务对多个任务不支持,而且对任务执行中参数的异常不可以管理以及取消提交的任务都不支持,对于以上的需求,就扩展了ExecutorService接口。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
自从Executor引进了Service后,该应用就能提供一种优雅的方式关闭应用程序,并且提供一些影响关闭状态的消息。
ExecutorService生命周期有三个状态:运行状态、正在关闭状态、终止状态,他是刚刚创建的时候就是运行状态。
shutdow方法允许已经被提交的任务被执行但是不接收新的任务;shutdownNow方法是立即关闭:试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
invokeAll方法时批量的提交任务返回一个future集合,这个方法在下面的文章能应用。。。
分享到:
相关推荐
azkaban-executor-server-2.5.0-tar.gz azkaban-web-server-2.5.0-tar.gz azkaban-sql-script-2.5.0-tar.gz
高效率 快捷操作
该文档详细记录了Executor框架结构、使用示意图、ThreadPoolExecutor使用示例、线程池原理分析、几种常见线程池(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的详解以及线程池大小确定等内容
xxl-job-executor-sample-springboot-2.2.0.jar 与xxl-job配套的执行器包,用于 Docker-compose搭建xxl-job(并配置Python3环境xxl-job执行器) 中相应的文件
python库,解压后可用。 资源全名:qcg_pilotjob_executor_api-0.12.3-py3-none-any.whl
资源来自pypi官网。 资源全名:qcg_pilotjob_executor_api-0.12.3-py3-none-any.whl
普罗米修斯执行者 -am-executor是一个HTTP服务器,它从接收警报,并执行将警报详细信息设置为环境变量的给定命令。 建造 要求 1.克隆此存储库 git clone https://github.com/imgix/prometheus-am-executor.git 2....
NULL 博文链接:https://jsx112.iteye.com/blog/865884
xxl-job-executor-go-master
官方版本,亲测可用
官方版本,亲测可用
官方版本,亲测可用
async-global-executor-在async-executor和async-io之上构建的全局执行器async-global-executor在async-executor和async-io之上构建的全局执行器功能async-io:如果启用,则async-global -executor将在内部使用async...
python库,解压后可用。 资源全名:Flask_Executor-0.3.1-py3-none-any.whl
包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。 异步计算的结果 。...
001 - Spark框架 - 简介.avi 002 - Spark框架 - Vs Hadoop.avi 003 - Spark框架 - 核心模块 - 介绍.avi 005 - Spark框架 - ...020 - Spark框架 - 核心概念 - Executor & Core & 并行度.avi 023 - SparkCore - 分布式
xxl-job-executor的gin中间件背景xxl-job-executor-go是xxl-job的golang执行器,可以独立运行,有时候我们要与项目或者框架(如:gin框架)集成起来合并为一个服务,本项目因此而生。执行器项目地址与gin集成示例...
executor 查找工具,方便查找应用软件,可以自定义文件快捷键。
执行程序同步sls错误日志 执行者-被动同步-存储在sls的服务的错误日志 在适当的内容的基础上添加服务责任人并@
Executor框架是Java并发编程中的一个重要工具,它提供了一种管理线程池的方式,使得我们可以更方便地管理线程的生命周期和执行线程任务。 原子操作是指不可被中断的操作,要么全部执行成功,要么全部不执行。原子...