java.util.concurrent包,分为了三个部分:java.util.concurrent,java.util.concurrent.atomic,以及java.util.concurrent.lock,主要包含了五个方面:线程池机制、同步集合、锁、原子操作、以及辅助类。
下面的图是网上搜到的一张J.U.C完整的API,根据这张图可以从整体上看出J.U.C包中包括了哪些部分:
最近主要学习了其中的线程池机制和锁,包括其中使用到的辅助类,本篇主要记录了关于这两方面的学习总结,其他的部分后续再来学习和添加。
1. 线程池机制
首先,我们讨论一下使用线程池的好处,线程池为线程生命周期间接成本问题和资源崩溃问题提供了解决方案。通过对多个任务重新使用线程,创建线程的间接成本将分布到多个任务中。另外,当请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。
Executor:任务的执行者
ExecutorService:一个线程池的管理者
CompletionService:ExecutorService的扩展,可以获得线程执行后的结果
Future:异步计算的结果
ScheduledExecutorService:一个ExecutorService,可以安排线程在给定延迟后运行,或定时运行
ExecutorService用法示例:
package concurrenttest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 本示例使用了Executors的静态函数生成一个固定的线程池,只能执行4个线程,当执行完一个线程后,才会又执行一个新的线程 * 这就会产生性能问题,比如如果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加 * * @author sha.tians */ public class ExecutorsTest extends Thread { private int index; public ExecutorsTest(int i){ this.index = i; } public void run() { try { System.out.println("[" + this.index + "] start...."); Thread.sleep((int) (Math.random() * 10000)); System.out.println("[" + this.index + "] end."); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String args[]) { ExecutorService service = Executors.newFixedThreadPool(4); for (int i = 0; i < 10; i++) { service.submit(new ExecutorsTest(i)); } System.out.println("submit finish"); service.shutdown(); } }
ExecutorService通过submit提交任务,通过shutdown结束任务。线程池通过辅助类Executors来创建,本例中创建了一个固定大小为4的线程池,而Executors还可以创建其他类型的线程池,查看其源码,Executors是通过new ThreadPoolExecutor来实现的,对应关系如下:
Executors newSingleThreadExecutor(): new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory) newCachedThreadPool(): new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()) newFixedThreadPool(int nThreads): new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
ThreadPoolExecutor构造方法参数含义如下:
ThreadPoolExecutor 构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 入参: corePoolSize:the number of threads to keep in the pool, even if they are idle. 保持在池中的线程数,包括闲置的。 maximumPoolSize: the maximum number of threads to allow in the pool. 池中允许的最大线程数 keepAliveTime: when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. 当线程数大于core,闲置线程在终止前等待新任务的最大时间 unit: the time unit for the keepAliveTime argument. keepAliveTime的时间单位 workQueue: the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method. 在执行前holding任务的队列,这个队列只hold被execute方法提交的Runnable任务。 threadFactory: the factory to use when the executor creates a new thread. 当executor创建新任务时使用的工厂 handler: the handler to use when execution is blocked because the thread bounds and queue capacities are reached. 当execution因为线程闲置和队列达到容量阻塞时使用的handler
CompletionService用法示例:
package concurrenttest; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务, 并按照完成这些任务的顺序处理它们的结果。 * * @author sha.tians */ public class CompletionServiceTest implements Callable<String> { private int id; public CompletionServiceTest(int i){ this.id = i; } public static void main(String[] args) throws Exception { ExecutorService service = Executors.newCachedThreadPool(); CompletionService<String> completion = new ExecutorCompletionService<String>(service); for (int i = 0; i < 10; i++) { completion.submit(new CompletionServiceTest(i)); } for (int i = 0; i < 10; i++) { System.out.println(completion.take().get()); } service.shutdown(); } public String call() throws Exception { Integer time = (int) (Math.random() * 1000); try { System.out.println(this.id + " start"); Thread.sleep(time); System.out.println(this.id + " end"); } catch (Exception e) { e.printStackTrace(); } return this.id + ":" + time; } }
本例中,通过CompletionService扩展ExecutorService,获得了线程执行后结果。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。
ExecutorCompletionSevice构造方法如下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
此处初始化了一个completionQueue,这个queue是用来做什么的呢?查看一下源代码:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } public Future<V> take() throws InterruptedException { return completionQueue.take(); }
执行的过程中,应该是将结果加入了completionQueue中, take线程执行结果的时候就从这个queue中取得。
ScheduledExecutorService用法示例:
package concurrenttest; import static java.util.concurrent.TimeUnit.SECONDS; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; /** * schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。 * scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。 * @author sha.tians * */ public class ScheduledExecutorServiceTest { public static void main(String[] args) { final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); final Runnable beeper = new Runnable() { int count = 0; public void run() { System.out.println(new Date() + " beep " + (++count)); } }; // 1秒钟后运行,并每隔2秒运行一次 final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS); // 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行 final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS); // 30秒后结束关闭任务,并且关闭Scheduler scheduler.schedule(new Runnable() { public void run() { beeperHandle.cancel(true); beeperHandle2.cancel(true); scheduler.shutdown(); } }, 30, SECONDS); } }
ScheduleExecutorService通过scheduleAtFixedRate方法,和scheduleWihFixedDelay方法,定期运行任务,并返回了ScheduledFuure对象,可通过cancle方法取消任务。
2. 锁机制
ReentrantLock :一个可重入的互斥锁定Lock
package concurrenttest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; /** * ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。 * * @author sha.tians */ public class ReentrantLockTest extends Thread { TestReentrantLock lock; private int id; public ReentrantLockTest(int i, TestReentrantLock test){ this.id = i; this.lock = test; } public void run() { lock.print(id); } public static void main(String args[]) { ExecutorService service = Executors.newCachedThreadPool(); TestReentrantLock lock = new TestReentrantLock(); for (int i = 0; i < 10; i++) { service.submit(new ReentrantLockTest(i, lock)); } service.shutdown(); } } class TestReentrantLock { private ReentrantLock lock = new ReentrantLock(); public void print(int str) { try { lock.lock(); System.out.println(str + "获得"); Thread.sleep((int) (Math.random() * 1000)); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println(str + "释放"); lock.unlock(); } } }
ReentrantLock由最近成功获得lock,且还没有释放该lock的线程拥有,当lock没有被另一个线程所拥有时,调用lock的线程会成功获得该锁并返回,否则就直接立即返回。可以适用isHeldBYCUrrentThread()和getHoldCount()方法来检查此情况是否发生。
public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public boolean isLocked() { return sync.isLocked(); }此类构造方法有两种,另一种可以接受一个可选的公平参数,当设置为true时,在多个线程竞争下,该锁定会倾向于将访问授权给等待时间最长的线程,否则将无法保证任何特定的访问顺序。
相关推荐
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
"java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError" 是一个典型的错误提示,它表明在并发执行过程中遇到了内存不足的问题。下面我们将深入探讨这个问题的原因、影响以及如何解决。 内存溢出...
Java.util.concurrent是Java 5.0引入的一个重要包,它为多线程编程提供了一组高级并发工具。这个包的设计者是Doug Lea,它的出现是JSR-166的一部分,也被称作Tiger更新。Java.util.concurrent的引入是为了解决传统...
java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和...
import java.util.concurrent.ArrayBlockingQueue; public class BlockingQueueExample { public static void main(String[] args) { // 创建一个容量为 10 的阻塞队列 BlockingQueue<String> queue = new ...
### Java.util.concurrent 系列文章知识点总结 #### 一、引言 随着多核处理器的普及,多线程编程已成为现代软件开发中的一个重要组成部分。Java 5 引入了 `java.util.concurrent` 包,该包提供了丰富的 API 来简化...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
### Java.util.concurrent.Synchronizer框架详解 #### 一、引言与背景 随着Java技术的发展,多线程编程成为了一项重要的技术需求。为了更好地支持并发编程,Java平台在J2SE 1.5版本中引入了`java.util.concurrent`...
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
一个高性能的Java线程库,该库是 JDK 1.5 中的 java.util.concurrent 包的补充,可用于基于并发消息机制的应用。该类库不提供远程的消息功能,其设计的宗旨是实现一个内存中的消息传递机制. 主要特点有: * All ...
如何启动:以win7系统为例,最好jdk8 1.打开cmd,cd到jdk的path,本机是:cd C:\Java\jdk6\bin ...java -cp D:\javaConcurrentAnimated.jar vgrazi.concurrent.samples.launcher.ConcurrentExampleLauncher
java.util.concurrent 多线程框架 java.util.concurrent 多线程框架是 Java 语言中用于多线程编程的库。该库提供了多种线程池实现、并发集合、同步器、lock 等多种机制,以便开发者更方便地编写高效、可靠的多线程...
标题中提到了“java.util.concurrent.uml.pdf”,这表明文件是一份Java并发编程工具包java.util.concurrent的UML(统一建模语言)类结构图的PDF格式文件。UML图能够帮助开发者理解Java并发包中的类、接口及其关系,...
AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...
文档标题“java.util.concurrent同步器框架”和描述“Doug Lea的java.util.concurrent同步器框架”表明本文将探讨由Doug Lea所撰写的关于Java并发编程中同步器框架的内容。文档中提到了AbstractQueuedSynchronizer类...
### Java.util.concurrent_您不知道的5件事 #### 1. Semaphore(信号量) - **定义与作用**:`Semaphore` 类是一种控制多个线程访问共享资源的机制,它通过内部维护一个整数计数器(许可的数量)以及一组等待线程...
在Java中,`java.util.concurrent.ExecutorService` 接口代表了一个线程池服务,而`ThreadPoolExecutor` 是它的具体实现,我们可以自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间、线程队列等。...
本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...