`
san_yun
  • 浏览: 2594964 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

ThreadPoolExecutor入门

 
阅读更多

一、corePoolSize和maximumPoolSize
提交任务的基本流程:
1. 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
2. 如果运行的线程超过corePoolSize,把任务加入到队列中。
3. 如果无法将任务加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

代码实现:
{code}
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }
{code}
这段代码不太好理解,可以改写成:
{code}
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
    boolean addtask = False
    if (poolSize < corePoolSize){
        addtask = addIfUnderCorePoolSize(command)
    }
    if(!addtask){
       if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }else if (!addIfUnderMaximumPoolSize(command)) //如果插入到queue失败,检查是否小于MaximumPoolSize,如果是则拒绝
                reject(command); // is shutdown or saturated   
    }
    }
{code}

ps:
a. 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
b. 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。


二、Keep-alive times
如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见 getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。

三、Queuing
1.如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
2.如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
3.如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

排队有三种策略:
1. 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

2.无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

3.有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。



四、测试代码

    ThreadPoolExecutor boss = new ThreadPoolExecutor(1, 20, 0L, TimeUnit.MILLISECONDS,
                                                         new ArrayBlockingQueue(100000)); // test1
        // ThreadPoolExecutor boss = new ThreadPoolExecutor(1, 20, 0L, TimeUnit.MILLISECONDS, new
        // ArrayBlockingQueue(100)); // test2

        for (int i = 0; i < 10000; ++i) {

            System.out.println(i);
            try {

                boss.submit(new Runnable() {

                    @Override
                    public void run() {
                        while (true) {
                            try {
                                Thread.currentThread().sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });

            } catch (Exception e) {
                System.out.println("error: " + i);
            }
        }
        System.out.println("================");
        System.out.println(boss.getActiveCount() + "," + boss.getTaskCount());
        System.out.println(boss.getPoolSize() + "," + boss.getCorePoolSize() + "," + boss.getMaximumPoolSize());
        System.out.println(boss.getQueue().size());
        System.in.read();

 

================
1,10000
1,1,20
9999

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics