并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable
,然后在提交给一个Executor
执行,Executor
在执行时使用内部的线程池完成操作。由此,任务提交者不需要再创建管理线程,使用更方便,也减少了开销。有两种任务:Runnable
和Callable
,Callable是需要返回值的任务。Task Submitter把任务提交给Executor执行,他们之间需要一种通讯手段,这种手段的具体实现,通常叫做Future
。Future
通常包括get ,cancel,get(timeout) 等等。Future
也用于异步变同步的场景。
伪代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { Object result = "..."; return result; } }; Future<Object> future = executor.submit(task); // 等待到任务被执行完毕返回结果 future.get(); // 等待3秒,超时后会抛TimeoutException future.get(3, TimeUnit.SECONDS); |
Executors
包含Executor
、ExecutorService
、ScheduledExecutorService
、ThreadFactory
和 Callable
类的工厂和实用方法。支持以下各种方法:
- 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
- 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
- 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
- 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
- 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。
具体的方法说明如下:
-
callable(PrivilegedAction action)
返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。 -
callable(PrivilegedExceptionAction action)
返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果。 -
callable(Runnable task)
返回 Callable 对象,调用它时可运行给定的任务并返回 null。 -
callable(Runnable task, T result)
返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。 -
defaultThreadFactory()
返回用于创建新线程的默认线程工厂。 -
newCachedThreadPool()
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。 -
newCachedThreadPool(ThreadFactory threadFactory)
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程 -
newFixedThreadPool(int nThreads)
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。 -
newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程 -
newScheduledThreadPool(int corePoolSize)
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 -
newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 -
newSingleThreadExecutor()
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。 -
newSingleThreadExecutor(ThreadFactory threadFactory)
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。 -
newSingleThreadScheduledExecutor()
创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 -
newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 -
privilegedCallable(Callable callable)
返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象。 -
privilegedCallableUsingCurrentClassLoader(Callable callable)
返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象。 -
privilegedThreadFactory()
返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。 -
unconfigurableExecutorService(ExecutorService executor)
返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。 -
unconfigurableScheduledExecutorService(ScheduledExecutorService executor)
返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。
ScheduledExecutorServices
尽管ExecutorService
接口非常有用,但某些任务仍需要以计划方式执行,比如以确定的时间间隔或在特定时间执行给定的任务。这就是 ScheduledExecutorService
的应用范围,它扩展了ExecutorService
。
例如创建一个每隔 5 秒跳一次的 “心跳” 命令,使用ScheduledExecutorService
可以轻松实现:
1 2 3 4 5 6 7 8 9 10 |
public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(1); Runnable pinger = new Runnable() { public void run() { System.out.println("PING!"); } }; ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS); } |
不用过于担心线程,不用过于担心用户希望取消心跳时会发生什么,也不用明确地将线程标记为前台或后台;只需将所有的计划细节留给ScheduledExecutorService
。如果用户希望取消心跳,scheduleAtFixedRate
调用将返回一个ScheduledFuture
实例,它不仅封装了结果(如果有),还拥有一个cancel
方法来关闭计划的操作。
下面是一个完整的示例,并行计算数组的和。
利用CompletionService
,生产者submit()
执行的任务。使用者take()
已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService
的take
方法是,会返回按完成顺序放回任务的结果,CompletionService
内部维护了一个阻塞队列BlockingQueue
,如果没有任务完成,take()
方法也会阻塞。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConcurrentCalculator { private ExecutorService exec; private CompletionService<Long> completionService; private int cpuCoreNumber; class SumCalculator implements Callable<Long> { private int[] numbers; private int start; private int end; public SumCalculator(final int[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } public Long call() throws Exception { Long sum = 0l; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } } public ConcurrentCalculator() { cpuCoreNumber = Runtime.getRuntime().availableProcessors(); exec = Executors.newFixedThreadPool(cpuCoreNumber); completionService = new ExecutorCompletionService<Long>(exec); } public Long sum(final int[] numbers) { for (int i = 0; i < cpuCoreNumber; i++) { int increment = numbers.length / cpuCoreNumber + 1; int start = increment * i; int end = increment * i + increment; if (end > numbers.length) end = numbers.length; SumCalculator subCalc = new SumCalculator(numbers, start, end); if (!exec.isShutdown()) { completionService.submit(subCalc); } } return getResult(); } public Long getResult() { Long result = 0l; for (int i = 0; i < cpuCoreNumber; i++) { try { Long subSum = completionService.take().get(); result += subSum; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return result; } public void close() { exec.shutdown(); } public static void main(String[] args) { int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 34 }; ConcurrentCalculator calc = new ConcurrentCalculator(); Long sum = calc.sum(numbers); System.out.println(sum); calc.close(); } } |
相关推荐
2.3 线程本地存储(Java.lang.ThreadLocal) 15 2.4 线程阻塞 17 2.4.1 调用sleep(millisecond)使任务进入休眠状态 17 2.4.2 等待输出与输入 17 2.4.3 对象锁不可用 17 2.4.4 通过wait()使线程挂起。 17 2.5 线程...
一、为什么要引入Executor框架? 1、如果使用new Thread(…).start()的方法处理多线程,有如下缺点: ① 开销大。对于JVM来说,每次新建线程和销毁线程都会有很大的开销。 ② 线程缺乏管理。没有一个池来限制线程的...
《Java JDK 7学习笔记》将IDE操作纳为教学内容之一,使读者能与实践结合,提供的视频教学能更清楚地帮助读者掌握操作步骤。 内容简介 书籍 计算机书籍 《java jdk 7学习笔记》是作者多年来教学实践经验的总结...
详细介绍java并发编程相关知识: 基础知识 并发与并行 Java并发演进历史 Java并发模型 线程模型 存储模型 JVM同步原语 volatile CAS 线程安全 保护“共享数据” 低级并发工具 原子变量 锁...
1. Java多线程学习(一)Java多线程入门 2. Java多线程学习(二)synchronized关键字(1) 3. Java多线程学习(二)synchronized关键字(2) 4. Java多线程学习(三...9. Java多线程学习(八)线程池与Executor 框架
Executor框架是Java并发编程中的一个重要工具,它提供了一种管理线程池的方式,使得我们可以更方便地管理线程的生命周期和执行线程任务。 原子操作是指不可被中断的操作,要么全部执行成功,要么全部不执行。原子...
JUC(Java Util Concurrent)是Java中用于并发编程的工具包,提供了一组接口和类,用于处理多线程和并发操作。JUC提供了一些常用的并发编程模式和工具,如线程池、并发集合、原子操作等。 JUC的主要特点包括: ...
Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口...
除了基本的线程创建和启动,Java还提供了一些管理线程的方法和工具,例如: sleep 方法:使当前线程暂停执行一段时间。 join 方法:等待其他线程执行完毕后再继续执行。 interrupt 方法:中断线程的执行。 ...
该文档详细记录了Executor框架结构、使用示意图、ThreadPoolExecutor使用示例、线程池原理分析、几种常见线程池(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的详解以及线程池大小确定等内容
其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,Runnable任务开辟在新线程中...
主要介绍了Java并发之线程池Executor框架的深入理解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
java 并发 编程 多线程 concurrent lock condition executorserice executor java.util.curcurrent.
调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干任务,使用用户级的调度器(Executor框架)将任务映射为固定数量的线程,底层,操作系统吧、内核将这些线程映射到硬件处理器上。...
看完《think in java》多线程章节,自己写的多线程文档,还结合了其他的相关网络资料。 线程 一. 线程池 1)为什么要使用线程池 2 2)一个具有线程池的工作队列 3 3)使用线程池的风险: 4 4)有效使用线程池的原则 5...
全部是txt格式的,容量小,以下内容为其中之一: 5.0新特性: 泛型: 泛型的形式: 类型> <E extends Numner&comparator> 类名&接口,表示E继承Numner类实现comparator接口 <?> 泛型通配符表示任意...
JavaExecutor并发框架.pdf
解决线程的死掉问题和超时问题特别好使,在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent...
4.newSingleThreadExecutor 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。``