`

《Java并发编程》之六:线程池的使用

    博客分类:
  • Java
阅读更多

8.1  在任务与执行策略之间的隐形耦合

有些类型的任务需要明确指定执行策略,包括:

依赖性任务、使用线程封闭机制的任务、对响应时间敏感的任务、使用ThreadLocal的任务。

只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。

 

8.1.1  线程饥饿死锁

public class ThreadDeadlock {
    ExecutorService exec = Executors.newSingleThreadExecutor();

    public class LoadFileTask implements Callable<String> {
        private final String fileName;

        public LoadFileTask(String fileName) {
            this.fileName = fileName;
        }

        public String call() throws Exception {
            // Here's where we would actually read the file
            return "";
        }
    }

    public class RenderPageTask implements Callable<String> {
        public String call() throws Exception {
            Future<String> header, footer;
            header = exec.submit(new LoadFileTask("header.html"));
            footer = exec.submit(new LoadFileTask("footer.html"));
            String page = renderBody();
            // 将发生死锁 -- task waiting for result of subtask
            return header.get() + page + footer.get();
        }

        private String renderBody() {
            // Here's where we would actually render the page
            return "";
        }
    }
}

 

每当提交了一个有依赖性的Executor任务的时候,也就是说这个任务依赖其他的任务的计算结果。要清除地知道可能会出现线程饥饿死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。

 

8.2 设置线程池的大小

根据Runtime.getRuntime().availableProcessors()动态计算处理器的个数

对于计算密集型的任务,在拥有N(cpu)个处理器的系统上,当线程池的大小为N + 1的时候能实现最优利用率。

对于Executor,newCachedThreadPool工厂方法是一种很好的默认的选择,它能提供比固定大小的线程池更好的排队性能,一般来讲,只要任务数量不会爆炸型增长,就选择这个。

而当需要限制当前任务的数量以满足资源管理需求的时候,那么可以选择固定大小的线程池,就像在接受网络客户请求服务器应用程序中,不然不限制,很容易产生过载的问题。服务器挂点

只有当任务都独立的时候,为线程池或工作队列设置界限才是合理的,如果任务之间存在依赖性,那么有界线程池或队列可能导致线程饥饿死锁问题,此时应该使用无界线程池例如newCachedThreadPool。

 

8.3.3  饱和策略

当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可通过setRejectedExecutionHandler来修改。

JDK提供了几个饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

AbortPolicy是默认的饱和策略,该策略抛出uncheckedException  RejectedExecutionException。

DiscardPolicy抛弃策略会悄悄抛弃这个任务

DiscardOldestPolicy会抛弃下一个即将要执行的任务(因为这个任务肯定是在队列里面放最久的,排在队列最前头)。所以这个千万别跟PriorityBlockingQueue一起用。

 

使用Semaphore来控制任务的提交速率:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

 

8.3.4  线程工厂

自己定制线程池创建的线程,比如为线程指定名字,设置自定义的UncaughtExceptionHandler,向Logger写日志,维护统计信息包括多少线程被创建和销毁,以及在线程被创建和终止的时候把调试信息写入日志。

public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,
                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }

    public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

 

public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }

    public Thread newThread(Runnable runnable) {
        return new MyAppThread(runnable, poolName);
    }
}

 

8.4  扩展ThreadPoolExecutor

ThreadPoolExecutor是可扩展的,它提供了几个可以在子类中改写的方法:beforeExecute,afterExecute和terminated。

beforeExecute和afterExecute是在每个执行任务的线程调用任务的run方法之前和之后会执行的

terminated方法是在整个Executor关闭的时候,也就是所有任务都完成并且所有工作者线程关闭后执行,这个方法可以用来释放Executor在其生命周期分配的各种资源,此外还可以执行发送通知,记录日志或收集finalize统计信息等。

 

示例:给线程池添加统计信息:

public class TimingThreadPool extends ThreadPoolExecutor {

    public TimingThreadPool() {
        super(1, 1, 0L, TimeUnit.SECONDS, null);
    }

    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end %s, time=%dns",
                    t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }

    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time=%dns",
                    totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

 

8.5  递归算法的并行化

当串行循环中各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务带来的开销更多,那么这个串行循环就适合并行化。

public abstract class TransformingSequential {

    void processSequentially(List<Element> elements) {
        for (Element e : elements)
            process(e);
    }

    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements)
            exec.execute(new Runnable() {
                public void run() {
                    process(e);
                }
            });
    }

    public abstract void process(Element e);


    public <T> void sequentialRecursive(List<Node<T>> nodes,
                                        Collection<T> results) {
        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,
                                      List<Node<T>> nodes,
                                      final Collection<T> results) {
        for (final Node<T> n : nodes) {
            exec.execute(new Runnable() {
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec, n.getChildren(), results);
        }
    }

    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

    interface Element {
    }

    interface Node<T> {
        T compute();

        List<Node<T>> getChildren();
    }
}

 

示例:谜题框架

这项技术的一种强大应用就是解决一些谜题,这些谜题都需要找出一系列的操作从初始状态转换到目标状态,例如类似于 搬箱子 、 Hi-Q 、 四色方柱Instant Insanity和其他的棋牌谜题

 

具体算法描述和解法,请参考我的另一篇算法文章:   利用递归算法并行化解决谜题框架

 

本人博客已搬家,新地址为:http://yidao620c.github.io/

分享到:
评论

相关推荐

    Java并发编程:线程池的使用 - 平凡希 - 博客园1

    Java并发编程:线程池的使用 - 平凡希 - 博客园平凡希博客园首页联系管理随笔 - 127 文章 - 1 评论 - 94Java并发编程:线程池的使用在前面

    java并发编程:juc线程池

    而了解 Java 并发编程以及其中的 JUC(java.util.concurrent)线程池,对于构建高性能、高可伸缩性的应用程序具有重要意义。 多核处理器的出现使得并发执行成为一种重要的优化手段。了解并发编程和线程池的工作原理...

    阿里专家级并发编程架构师课程 彻底解决JAVA并发编程疑难杂症 JAVA并发编程高级教程

    阿里专家级并发编程架构师级课程,完成课程的学习可以帮助同学们解决非常多的JAVA并发编程疑难杂症,极大的提高JAVA并发编程的效率。课程内容包括了JAVA手写线程池,UC线程池API详解,线程安全根因详解,锁与原子类...

    【并发编程】自定义简单线程池.pdf

    书籍:如《Java并发编程实战》、《Concurrency in C++》等。 官方文档:不同编程语言的官方文档通常会提供关于并发编程的指南和最佳实践。 社区和论坛:如Stack Overflow、Reddit等,可以提供实际问题的帮助和讨论。

    《Java并发编程的艺术》

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    java并发编程从入门到精通

    《Java并发编程从入门到精通》内容包括并发编程概念,线程,线程安全,线程集合类,线程阀,线程池,Fork/Join,线程、线程池在互联网项目开发的应用,线程监控及线程分析,Android中线程应用。 本书适合Java开发...

    Java并发编程实战

    第8章 线程池的使用 第9章 图形用户界面应用程序 第三部分 活跃性、性能与测试 第10章 避免活跃性危险 第11章 性能与可伸缩性 第12章 并发程序的测试 第四部分 高级主题 第13章 显式锁 第14章 构建自定义的...

    《Java并发编程的艺术》源代码

    Java并发编程的艺术 作者:方腾飞 魏鹏 程晓明 著 丛书名:Java核心技术系列 出版日期 :2015-07-25 ISBN:978-7-111-50824-3 第1章介绍Java并发编程的挑战,向读者说明进入并发编程的世界可能会遇到哪些问题,以及如何...

    23 高并发编程和线程池

    23 高并发编程和线程池,教程视频:java中高并发编程和线程池

    Java并发编程的艺术

    , 《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,...

    Java并发编程与高并发解决方案

    Java并发编程与高并发解决方案:线程池,消息队列,服务拆分,限流,降级,熔断思路,数据库分表

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    Java并发编程常见知识点源码集锦,涉及到对象锁,Executors多任务线程框架,线程池等示例,列出一些源码包中包括的内容:  volatile关键字的非原子性、volatile关键字的使用、AtomicInteger原子性操作、线程安全小...

    【并发编程】如何优雅使用线程池.pdf

    书籍:如《Java并发编程实战》、《Concurrency in C++》等。 官方文档:不同编程语言的官方文档通常会提供关于并发编程的指南和最佳实践。 社区和论坛:如Stack Overflow、Reddit等,可以提供实际问题的帮助和讨论。

    java并发编程脑图

    java并发编程脑图总结,线程池、主要使用场景分析,进程:是CPU分配资源的最小单元,是程序的一次动态执行,它对应着从代码加载,执行至完成的一个完整的过程,它有自己的生命周期。它是应用程序的执行实例,每个...

    Java并发编程技术总结

    Java并发编程技术总结,所含内容有并发特性、并发锁、线程池、并发场景解决方案等,对于性能思考和内容参考资料有一定说明

    java并发编程 线程池 异步编排

    java并发编程相关总结

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    并发思维导图Java并发编程的相关概念和知识

    在思维导图中,我们可以包括Java并发编程的基本概念、线程管理、锁、并发集合、线程池、并发控制等方面的内容。同时,我们还可以将相关的类和接口进行关联和整合,形成完整的知识网络。 总之,Java并发编程是一个...

Global site tag (gtag.js) - Google Analytics