`

Java并发编程之线程池任务监控

    博客分类:
  • Java
阅读更多

Java并发编程之线程池任务监控

 

当我们提交runnable或者callable<?>到ThreadPoolExecutor时,我们是无法知道这些任务是在什么时候才真正的执行的,为了实现这个需求,我们需要扩展ThreadPoolExecutor,重写beforeExecute和afterExecute,在这两个方法里分别做一些任务执行前和任务执行后的相关监控逻辑,还有个terminated方法,是在线程池关闭后回调,,另外,我们可以通过getLargestPoolSize()和getCompletedTaskCount()来分别获取线程池数的峰值和线程池已完成的任务数。

 

下面就一个完整的例子来说明如何进行:

自定义MonitorHandler接口,把before和after抽象出来:

 

package cc.lixiaohui.demo.concurrent;


/**
 * 监控处理器, 目的是把before和after抽象出来, 以便在{@link MonitorableThreadPoolExecutor}中形成一条监控处理器链
 * 
 * @author lixiaohui
 * @date 2016年10月11日 下午7:18:38
 * 
 */
public interface MonitorHandler {
	
	/**
	 * 改监控任务是否可用
	 * 
	 * @return
	 */
	boolean usable(); 
	
	/**
	 * 任务执行前回调
	 * 
	 * @param thread 即将执行该任务的线程
	 * @param runnable 即将执行的任务
	 */
	void before(Thread thread, Runnable runnable);  
	
	/**
	 * <pre>
	 * 任务执行后回调
	 * 注意:
	 *     1.当你往线程池提交的是{@link Runnable} 对象时, 参数runnable就是一个{@link Runnable}对象
	 *     2.当你往线程池提交的是{@link java.util.concurrent.Callable<?>} 对象时, 参数runnable实际上就是一个{@link java.util.concurrent.FutureTask<?>}对象
	 *       这时你可以通过把参数runnable downcast为FutureTask<?>或者Future来获取任务执行结果
	 *       
	 * @param runnable 执行完后的任务
	 * @param throwable 异常信息
	 */
	void after(Runnable runnable, Throwable throwable);
	
	/**
	 * 线程池关闭后回调
	 * 
	 * @param largestPoolSize
	 * @param completedTaskCount
	 */
	void terminated(int largestPoolSize, long completedTaskCount);
}

 

 

扩展ThreadPoolExecutor,增加监控的逻辑,如果监控比较耗时的话,为了不影响业务线程池的执行效率,我们应该将before,after和terminated方法的调用封装为统一的Runnable交给非业务线程池内的Thread来跑(新建个Thread或者线程池):

 

package cc.lixiaohui.demo.concurrent;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 可监控的线程池, 可有多个监控处理器,如果监控的逻辑是比较耗时的话, 最好另起个线程或者线程池专门用来跑MonitorHandler的方法.
 * 
 * @author lixiaohui
 * @date 2016年10月11日 下午7:15:16
 * 
 */
public class MonitorableThreadPoolExecutor extends ThreadPoolExecutor {
	
	/**
	 * 可有多个监控处理器
	 */
	private Map<String, MonitorHandler> handlerMap = new HashMap<String, MonitorHandler>();
	
	private final Object lock = new Object();
	
	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
	}

	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
	}

	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
	}

	public MonitorableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}
	
	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		// 依次调用处理器
		for (MonitorHandler handler : handlerMap.values()) {
			if (handler.usable()) {
				handler.before(t, r);
			}
		}
	}
	
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		// 依次调用处理器
		for (MonitorHandler handler : handlerMap.values()) {
			if (handler.usable()) {
				handler.after(r, t);
			}
		}
	}
	
	/* 
	 * @see java.util.concurrent.ThreadPoolExecutor#terminated()
	 */
	@Override
	protected void terminated() {
		super.terminated();
		for (MonitorHandler handler : handlerMap.values()) {
			if (handler.usable()) {
				handler.terminated(getLargestPoolSize(), getCompletedTaskCount());
			}
		}
		
	}
	
	public MonitorHandler addMonitorTask(String key, MonitorHandler task, boolean overrideIfExist) {
		if (overrideIfExist) {
			synchronized (lock) {
				return handlerMap.put(key, task);
			}
		} else {
			synchronized (lock) {
				return handlerMap.putIfAbsent(key, task);
			}
		}
	}
	
	public MonitorHandler addMonitorTask(String key, MonitorHandler task) {
		return addMonitorTask(key, task, true);
	}
	
	public MonitorHandler removeMonitorTask(String key) {
		synchronized (lock) {
			return handlerMap.remove(key);
		}
	}
	
}

 

 

 测试程序:

 

package cc.lixiaohui.demo.concurrent;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import cc.lixiaohui.util.RandomUtils;

/**
 * @author lixiaohui
 * @date 2016年10月11日 下午8:11:39
 * 
 */
public class Tester {
	
	static volatile boolean stop = false;

	public static void main(String[] args) throws InterruptedException, IOException {
		// fixed size 5
		final MonitorableThreadPoolExecutor pool = new MonitorableThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

		pool.addMonitorTask("TimeMonitorTask", newTimeMonitorHandler());
		// 起一个线程不断地往线程池丢任务
		Thread t = new Thread(new Runnable() {
			public void run() {
				startAddTask(pool);
			}
		});
		t.start();
		
		// 丢任务丢20 ms
		Thread.sleep(50);
		stop = true;
		t.join();
		pool.shutdown();
		// 等线程池任务跑完
		pool.awaitTermination(100, TimeUnit.SECONDS);
	}

	private static MonitorHandler newTimeMonitorHandler() {

		return new MonitorHandler() {
			// 任务开始时间记录map, 多线程增删, 需用ConcurrentHashMap
			Map<Runnable, Long> timeRecords = new ConcurrentHashMap<Runnable, Long>();

			public boolean usable() {
				return true;
			}
			
			public void terminated(int largestPoolSize, long completedTaskCount) {
				System.out.println(String.format("%s:largestPoolSize=%d, completedTaskCount=%s", time(), largestPoolSize, completedTaskCount));
			}

			public void before(Thread thread, Runnable runnable) {
				System.out.println(String.format("%s: before[%s -> %s]", time(), thread, runnable));
				timeRecords.put(runnable, System.currentTimeMillis());
			}

			public void after(Runnable runnable, Throwable throwable) {
				long end = System.currentTimeMillis();
				Long start = timeRecords.remove(runnable);
				
				Object result = null;
				if (throwable == null && runnable instanceof FutureTask<?>) { // 有返回值的异步任务,不一定是Callable<?>,也有可能是Runnable
					try {
						result = ((Future<?>) runnable).get();
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt(); // reset
					} catch (ExecutionException e) {
						throwable = e;
					} catch (CancellationException e) {
						throwable = e;
					}
				}

				if (throwable == null) { // 任务正常结束
					if (result != null) { // 有返回值的异步任务
						System.out.println(String.format("%s: after[%s -> %s], costs %d millisecond, result: %s", time(), Thread.currentThread(), runnable, end - start, result));
					} else {
						System.out.println(String.format("%s: after[%s -> %s], costs %d millisecond", time(), Thread.currentThread(), runnable, end - start));
					}
				} else {
					System.err.println(String.format("%s: after[%s -> %s], costs %d millisecond, exception: %s", time(), Thread.currentThread(), runnable, end - start, throwable));
				}
			}

		};
	}

	// 随机runnable或者callable<?>, 任务随机抛异常
	private static void startAddTask(MonitorableThreadPoolExecutor pool) {
		int count = 0;
		while (!stop) {
			if (RandomUtils.randomBoolean()) {// 丢Callable<?>任务
				pool.submit(new Callable<Boolean>() {

					public Boolean call() throws Exception {
						// 随机抛异常
						boolean bool = RandomUtils.randomBoolean();
						// 随机耗时 0~100 ms
						Thread.sleep(RandomUtils.randomInt(100));
						if (bool) {
							throw new RuntimeException("thrown randomly");
						}
						return bool;
					}

				});
			} else { // 丢Runnable
				pool.submit(new Runnable() {

					public void run() {
						// 随机耗时 0~100 ms
						try {
							Thread.sleep(RandomUtils.randomInt(100));
						} catch (InterruptedException e) {}
						// 随机抛异常
						if (RandomUtils.randomBoolean()) {
							throw new RuntimeException("thrown randomly");
						}
					};

				});
			}
			System.out.println(String.format("%s:submitted %d task", time(), ++count));
		}
	}

	private static String time() {
		return String.valueOf(System.currentTimeMillis());
	}
}

 

 

一个较短的结果:

 

1476253228222: before[Thread[pool-1-thread-1,5,main] -> java.util.concurrent.FutureTask@548bb979]
1476253228222:Thread[Thread-0,5,main], submitted 1 task
1476253228253:Thread[Thread-0,5,main], submitted 2 task
1476253228264: before[Thread[pool-1-thread-2,5,main] -> java.util.concurrent.FutureTask@97e041d]
1476253228264:Thread[Thread-0,5,main], submitted 3 task
1476253228265: before[Thread[pool-1-thread-3,5,main] -> java.util.concurrent.FutureTask@7d6d5cc]
1476253228271: after[Thread[pool-1-thread-2,5,main] -> java.util.concurrent.FutureTask@97e041d], costs 7 millisecond, exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: thrown randomly
1476253228295: after[Thread[pool-1-thread-1,5,main] -> java.util.concurrent.FutureTask@548bb979], costs 42 millisecond
1476253228347: after[Thread[pool-1-thread-3,5,main] -> java.util.concurrent.FutureTask@7d6d5cc], costs 82 millisecond, exception: java.util.concurrent.ExecutionException: java.lang.RuntimeException: thrown randomly
1476253228347:Thread[pool-1-thread-3,5,main], largestPoolSize=3, completedTaskCount=3

 

 

 

0
1
分享到:
评论
3 楼 sll1097892736 2017-03-21  
import cc.lixiaohui.util.RandomUtils 这个类哪里有?
我邮箱:1097892736.qq.com
2 楼 莫名的拉风 2017-01-14  
漂泊一剑客 写道
public MonitorHandler removeMonitorTask(MonitorHandler task)

博主,发现一个问题,这个方法的参数,是有问题吧,应该是String的key


感谢提出,已修正
1 楼 漂泊一剑客 2017-01-14  
public MonitorHandler removeMonitorTask(MonitorHandler task)

博主,发现一个问题,这个方法的参数,是有问题吧,应该是String的key

相关推荐

    java并发编程从入门到精通

    《Java并发编程从入门到精通》内容包括并发编程概念,线程,线程安全,线程集合类,线程阀,线程池,Fork/Join,线程、线程池在互联网项目开发的应用,线程监控及线程分析,Android中线程应用。 本书适合Java开发...

    Java 7并发编程实战手册

    如果你是一名Java开发人员,并且想进一步掌握并发编程和多线程技术,并挖掘Java 7并发的新特性,那么本书是你的合适之选。 《Java 7并发编程实战手册》 第1章 线程管理 1 1.1 简介 1 1.2 线程的创建和运行...

    并发编程库,&amp;&amp;,线程池

    Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建...

    concurrent 多线程 教材

    00 IBM developerWorks 中国 : Java 多线程与并发编程专题 02 Java 程序中的多线程 03 编写多线程的 Java 应用程序 04 如果我是国王:关于解决 Java编程语言线程问题的建议 (2) 05 构建Java并发模型框架 (2) 06...

    java8集合源码分析-The-Road-To-Bald-Man:Java技能加点,秃顶之路

    并发编程基础 线程池 锁 并发容器 原子类 JUC并发工具类 网络编程 新特性 JVM 类加载机制 字节码执行机制 JVM内存模型 GC垃圾回收 JVM性能监控与故障定位 JVM调优 二、数据结构和算法 数据结构 字符串 数组 链表 ...

    leetcode手册JAVA-JavaLearn:学习java的路线和项目实战,javalearner快来看一看吧!

    并发编程基础 线程池 锁 并发容器 原子类 juc并发工具类 1.2 数据结构与算法 1.2.1 数据结构 字符串 数组 链表 二叉树 堆,栈,队列 哈希 1.2.2 算法 查找 排序 贪心 分治 动态规划 回溯 1.3 计算机网络 计算机网络...

    java8集合源码分析-AboutJava:java相关知识(理论,代码)相关知识均是看书,博客等地方获取再由自己整理,如存在侵权,请告诉我

    (很多笔记来自java并发艺术一书) 多线程基础 synchronized volatile 线程间的通信 锁(重入锁,读写锁) 并发工具 增强的Future CompletableFuture 线程池技术 Java线程池Executors ForkJoin框架 原子操作类 JVM ...

    java8集合源码分析-toBeTopJavaArchitect:Java架构师--成神之路

    Java架构师--成神之路 修改记录 版本 编写时间 作者 描述 v1.0.0 2019-10-29 Rock.Sang 梳理大纲 v1.0.1 2019-11-15 Rock.Sang 完善所有目录结构 v1.0.2 2020-01-07 Rock.Sang 添加英语模块 v1.0.3 2020-01-17 Rock....

    Linux多线程服务端编程:使用muduo C++网络库

    《Linux多线程服务端编程:使用muduo C++网络库》主要讲述采用现代C++在x86-64 Linux上编写多线程TCP网络服务程序的主流常规技术,重点讲解一种适应性较强的多线程服务器的编程模型,即one loop per thread。...

    javaSE代码实例

    第16章 多线程——Java中的并发协作 343 16.1 线程的基本知识 343 16.1.1 多线程编程的意义 343 16.1.2 定义自己的线程 344 16.1.3 创建线程对象 345 16.1.4 启动线程 347 16.1.5 同时使用多个线程 ...

Global site tag (gtag.js) - Google Analytics