`
bo_hai
  • 浏览: 554192 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

ThreadPoolExecutor 中饱和策略分析

阅读更多
import java.util.concurrent.TimeUnit;

public class ThreadPoolTask implements Runnable {

	private final Object threadPoolTaskData;
	private static long consumerTaskSleepTime = 2L;
	
	public ThreadPoolTask(Object tasks) {
		this.threadPoolTaskData = tasks;
	}
	
	@Override
	public void run() {
		System.out.println("start :" + threadPoolTaskData);
		try {
			TimeUnit.SECONDS.sleep(consumerTaskSleepTime);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("finish " + threadPoolTaskData);   
	}
}

 Abort(中止)策略:该策略会抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.AbortPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 程序的执行结果如下:

put task@ 0
put task@ 1
start :task@ 0
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
start :task@ 5
put task@ 7
start :task@ 6
put task@ 8
java.util.concurrent.RejectedExecutionException
put task@ 9
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
java.util.concurrent.RejectedExecutionException
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656)
	at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22)
finish task@ 0
start :task@ 2
finish task@ 1
start :task@ 3
finish task@ 5
start :task@ 4
finish task@ 6
finish task@ 2
finish task@ 3
finish task@ 4

 分析运行结果:

corepoolsize = 2 ,maxpoolsize = 4,queue size = 3。运行中的线程数+队列中等待的线程数 = 7,及提交的线程数大于7时,会抛出异常。

 

DiscardPolicy 策略会悄悄抛弃新提交的任务:代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 执行结果如下:

put task@ 0
put task@ 1
start :task@ 0
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
put task@ 7
put task@ 8
put task@ 9
start :task@ 6
start :task@ 5
finish task@ 0
finish task@ 6
start :task@ 2
finish task@ 1
start :task@ 3
start :task@ 4
finish task@ 5
finish task@ 2
finish task@ 3
finish task@ 4

 分析结果:

 提交的任务数有10个,开始和结束的任务数7个。抛弃了新提交的任务7、8、9。

 

DiscardOldestPolicy 抛弃旧的线程。代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 执行结果如下:

put task@ 0
put task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
put task@ 6
put task@ 7
put task@ 8
put task@ 9
start :task@ 0
start :task@ 5
start :task@ 1
start :task@ 6
finish task@ 0
start :task@ 7
finish task@ 5
start :task@ 8
finish task@ 1
start :task@ 9
finish task@ 6
finish task@ 7
finish task@ 8
finish task@ 9

 对结果进行分析:

线程池抛弃了2、3、4,线程0、1没有进入队列,进行开始执行。所以不会被抛出。

 

CallerRunsPolicy 策略实现一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。代码如下:

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPool {

	private static int executePrograms = 0;
	private static int produceTaskMaxNumber = 10;
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, 
				TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.CallerRunsPolicy());
		for(int i = 0 ; i < produceTaskMaxNumber; i ++) {
			try {
				String task = "task@ " + i;
				System.out.println("put " + task);
				threadPoolExecutor.execute(new ThreadPoolTask(task));
				TimeUnit.SECONDS.sleep(executePrograms);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 运行结果如下:

put task@ 0
put task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
start :task@ 1
put task@ 6
put task@ 7
start :task@ 7
start :task@ 6
start :task@ 0
start :task@ 5
finish task@ 7
put task@ 8
finish task@ 1
start :task@ 2
finish task@ 6
finish task@ 0
finish task@ 5
put task@ 9
start :task@ 3
start :task@ 8
start :task@ 4
finish task@ 2
start :task@ 9
finish task@ 3
finish task@ 8
finish task@ 4
finish task@ 9

 对结果进行分析:

所有的线程都执行了。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics