`
joerong666
  • 浏览: 411328 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Java线程学习笔记之Executor

 
阅读更多

并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor在执行时使用内部的线程池完成操作。由此,任务提交者不需要再创建管理线程,使用更方便,也减少了开销。有两种任务:RunnableCallable,Callable是需要返回值的任务。Task Submitter把任务提交给Executor执行,他们之间需要一种通讯手段,这种手段的具体实现,通常叫做FutureFuture通常包括get ,cancel,get(timeout) 等等。Future也用于异步变同步的场景。


伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executor = Executors.newSingleThreadExecutor();
 
Callable<Object> task = new Callable<Object>() {
	public Object call() throws Exception {
		Object result = "...";
		return result;
	}
};
 
Future<Object> future = executor.submit(task);
// 等待到任务被执行完毕返回结果
future.get();
// 等待3秒,超时后会抛TimeoutException
future.get(3, TimeUnit.SECONDS);

Executors

包含ExecutorExecutorServiceScheduledExecutorServiceThreadFactory Callable类的工厂和实用方法。支持以下各种方法:

  • 创建并返回设置有常用配置字符串的 ExecutorService 的方法。
  • 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。
  • 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
  • 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。
  • 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。

具体的方法说明如下:

  • callable(PrivilegedAction action)
    返回 Callable 对象,调用它时可运行给定特权的操作并返回其结果。
  • callable(PrivilegedExceptionAction action)
    返回 Callable 对象,调用它时可运行给定特权的异常操作并返回其结果。
  • callable(Runnable task)
    返回 Callable 对象,调用它时可运行给定的任务并返回 null。
  • callable(Runnable task, T result)
    返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。
  • defaultThreadFactory()
    返回用于创建新线程的默认线程工厂。
  • newCachedThreadPool()
    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
  • newCachedThreadPool(ThreadFactory threadFactory)
    创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程
  • newFixedThreadPool(int nThreads)
    创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
  • newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
    创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程
  • newScheduledThreadPool(int corePoolSize)
    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
  • newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
    创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
  • newSingleThreadExecutor()
    创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
  • newSingleThreadExecutor(ThreadFactory threadFactory)
    创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。
  • newSingleThreadScheduledExecutor()
    创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
  • newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
    创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
  • privilegedCallable(Callable callable)
    返回 Callable 对象,调用它时可在当前的访问控制上下文中执行给定的 callable 对象。
  • privilegedCallableUsingCurrentClassLoader(Callable callable)
    返回 Callable 对象,调用它时可在当前的访问控制上下文中,使用当前上下文类加载器作为上下文类加载器来执行给定的 callable 对象。
  • privilegedThreadFactory()
    返回用于创建新线程的线程工厂,这些新线程与当前线程具有相同的权限。
  • unconfigurableExecutorService(ExecutorService executor)
    返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。
  • unconfigurableScheduledExecutorService(ScheduledExecutorService executor)
    返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。

ScheduledExecutorServices

尽管ExecutorService接口非常有用,但某些任务仍需要以计划方式执行,比如以确定的时间间隔或在特定时间执行给定的任务。这就是 ScheduledExecutorService的应用范围,它扩展了ExecutorService

例如创建一个每隔 5 秒跳一次的 “心跳” 命令,使用ScheduledExecutorService可以轻松实现:

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
	ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
	Runnable pinger = new Runnable() {
		public void run() {
			System.out.println("PING!");
		}
	};
 
	ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS);
}

不用过于担心线程,不用过于担心用户希望取消心跳时会发生什么,也不用明确地将线程标记为前台或后台;只需将所有的计划细节留给ScheduledExecutorService。如果用户希望取消心跳,scheduleAtFixedRate调用将返回一个ScheduledFuture实例,它不仅封装了结果(如果有),还拥有一个cancel方法来关闭计划的操作。

下面是一个完整的示例,并行计算数组的和。

利用CompletionService,生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionServicetake方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConcurrentCalculator {
	private ExecutorService exec;
	private CompletionService<Long> completionService;
	private int cpuCoreNumber;
 
	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;
 
		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}
 
		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}
 
	public ConcurrentCalculator() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		completionService = new ExecutorCompletionService<Long>(exec);
	}
 
	public Long sum(final int[] numbers) {
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			if (!exec.isShutdown()) {
				completionService.submit(subCalc);
			}
		}
 
		return getResult();
	}
 
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {
			try {
				Long subSum = completionService.take().get();
				result += subSum;
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
 
		return result;
	}
 
	public void close() {
		exec.shutdown();
	}
 
	public static void main(String[] args) {
		int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 34 };
		ConcurrentCalculator calc = new ConcurrentCalculator();
		Long sum = calc.sum(numbers);
		System.out.println(sum);
		calc.close();
	}
}
分享到:
评论

相关推荐

    java线程学习笔记

    2.3 线程本地存储(Java.lang.ThreadLocal) 15 2.4 线程阻塞 17 2.4.1 调用sleep(millisecond)使任务进入休眠状态 17 2.4.2 等待输出与输入 17 2.4.3 对象锁不可用 17 2.4.4 通过wait()使线程挂起。 17 2.5 线程...

    Java多线程之Executor框架.docx

    一、为什么要引入Executor框架? 1、如果使用new Thread(…).start()的方法处理多线程,有如下缺点: ① 开销大。对于JVM来说,每次新建线程和销毁线程都会有很大的开销。 ② 线程缺乏管理。没有一个池来限制线程的...

    Java JDK 7学习笔记(国内第一本Java 7,前期版本累计销量5万册)

     《Java JDK 7学习笔记》将IDE操作纳为教学内容之一,使读者能与实践结合,提供的视频教学能更清楚地帮助读者掌握操作步骤。 内容简介 书籍 计算机书籍  《java jdk 7学习笔记》是作者多年来教学实践经验的总结...

    并发编程实践,全面介绍基础知识、JVM同步原语、线程安全、低级并发工具、线程安全容器、高级线程协作工具、Executor部分等

    详细介绍java并发编程相关知识: 基础知识   并发与并行   Java并发演进历史   Java并发模型   线程模型   存储模型 JVM同步原语 volatile CAS 线程安全   保护“共享数据” 低级并发工具   原子变量   锁...

    多线程系列相关的技术要点

    1. Java多线程学习(一)Java多线程入门 2. Java多线程学习(二)synchronized关键字(1) 3. Java多线程学习(二)synchronized关键字(2) 4. Java多线程学习(三...9. Java多线程学习(八)线程池与Executor 框架

    针对于Executor框架,Java API,线程共享数据

    Executor框架是Java并发编程中的一个重要工具,它提供了一种管理线程池的方式,使得我们可以更方便地管理线程的生命周期和执行线程任务。 原子操作是指不可被中断的操作,要么全部执行成功,要么全部不执行。原子...

    JUC多线程学习个人笔记

    JUC(Java Util Concurrent)是Java中用于并发编程的工具包,提供了一组接口和类,用于处理多线程和并发操作。JUC提供了一些常用的并发编程模式和工具,如线程池、并发集合、原子操作等。 JUC的主要特点包括: ...

    Java-Executor并发框架.docx

    Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口...

    什么是线程?Java中如何创建和管理线程?(java面试题附答案).txt

    除了基本的线程创建和启动,Java还提供了一些管理线程的方法和工具,例如: sleep 方法:使当前线程暂停执行一段时间。 join 方法:等待其他线程执行完毕后再继续执行。 interrupt 方法:中断线程的执行。 ...

    Java基础篇:Executor框架.pdf

    该文档详细记录了Executor框架结构、使用示意图、ThreadPoolExecutor使用示例、线程池原理分析、几种常见线程池(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的详解以及线程池大小确定等内容

    java并发编程:Executor、Executors、ExecutorService.docx

    其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,Runnable任务开辟在新线程中...

    Java并发之线程池Executor框架的深入理解

    主要介绍了Java并发之线程池Executor框架的深入理解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    java 并发编程 多线程

    java 并发 编程 多线程 concurrent lock condition executorserice executor java.util.curcurrent.

    Java并发Executor框架

     调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干任务,使用用户级的调度器(Executor框架)将任务映射为固定数量的线程,底层,操作系统吧、内核将这些线程映射到硬件处理器上。...

    个人总结的深入java多线程开发

    看完《think in java》多线程章节,自己写的多线程文档,还结合了其他的相关网络资料。 线程 一. 线程池 1)为什么要使用线程池 2 2)一个具有线程池的工作队列 3 3)使用线程池的风险: 4 4)有效使用线程池的原则 5...

    \java超强笔记(超级经典)

    全部是txt格式的,容量小,以下内容为其中之一: 5.0新特性: 泛型: 泛型的形式: 类型&gt; &lt;E extends Numner&comparator&gt; 类名&接口,表示E继承Numner类实现comparator接口 &lt;?&gt; 泛型通配符表示任意...

    JavaExecutor并发框架.pdf

    JavaExecutor并发框架.pdf

    线程超时死掉

    解决线程的死掉问题和超时问题特别好使,在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent...

    Executor,Executors,ExecutorService比较.docx

    4.newSingleThreadExecutor 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。``

Global site tag (gtag.js) - Google Analytics