`
kobe学java
  • 浏览: 249886 次
  • 性别: Icon_minigender_1
  • 来自: 苏州
社区版块
存档分类
最新评论

多线程摘录 004

 
阅读更多

* 使用哪种模式的并发?
观察一下简单的服务器
方式一:    
    while(true) {
        Socket s = serverSocket.accept();
        handleRequest(s);
    }


方式二:
    while(true) {
        final Socket s = serverSocket.accept();
        new Thread(new Runnable(){
                handleRequest(s);
            }
        );
    }

方式一非常暴力, 最严重的问题是请求被串行处理.
方式二相对合理, 但是问题多多: 资源占用率高, 过度创建线程令服务器压力过大, 线程的生命周期没有得到控制...

JDK5带来了线程池
* JDK5提供了基于task的Executor框架(java.util.concurrent). 
    它内部封装了线程池.
public interface Executor {
    void execute(Runnable command);
}

这个接口很简单, 但是它是的任务的"提交"和"执行"这两个动作的分离成为可能. 因为一旦分离, 我们可以在其中加入许多控制, 比如调度线程来执行task, 加入监听器, 甚至把任务发送到其他机器处理

Executor是基于producer-consumer模式建立的, 任务提交线程为producer, 任务执行线程为consumer

例子: 使用了线程池来处理请求的服务器
class ThreadPooledServer {
    private static final Executor exec = Executors.newFixedThreadPool(100);

    public static void main(String[] args) {
        ServerSocket server = new ServerSocket();
        final Socket request = server.accept();
        Runnable task = new Runnable() {
            public void run() {
                handleRequest(request);
            }
        };
        exec.submit(task);
    }
}


用法很简单, 只要exec.submit(task); 就可以了, 然后由Executor(其实也就是线程池)来分配线程去处理这些任务.

* 任务的执行策略
策略指导是做什么, 什么时候做, 在什么地方做, 怎样做.
更详细的, 要考虑:
    - 这些任务被哪些线程执行?
    - 任务的执行顺序? 先到先处理? 后到先处理? 还是按照任务的优先级
    - 有多少任务能被并发执行?
    - 有多少任务能等待?
    - 如果任务队列满了, 需要拒绝任务, 要用什么策略?
    - 执行任务前后有没有什么动作? 比如分发事件

* JDK5带给我们的几种线程池
1) Executors.newFixedThreadPool
2) Executors.newCachedThreadPool
    无限大的线程池, 按需创建线程
3) Executors.newSingleThreadExecutor
    如果内部的线程意外终止, 用一个新的线程替换旧的
4) Executors.newScheduledThreadPool
    大小固定的, 支持延时和定时执行任务

* ExecutorService 
Executor 接口提供了任务执行的方法, 而线程池的生命周期呢? 由ExecutorService 管理!!! 
public interface ExecutorService extends Executor {
    void shutdown();                //正常关闭
    List<Runnable> shutdownNow();   //暴力关闭
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    // ... additional convenience methods for task submission
}

留意一下,ExecutorService是Executor的子接口, 而Executors.newXXX返回的其实就是ExecutorService.
class LifecycleWebServer {
    private final ExecutorService exec = ...;

    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket conn = socket.accept();
                exec.execute(new Runnable() {
                    public void run() { handleRequest(conn); }
                });
            } catch (RejectedExecutionException e) {
                if (!exec.isShutdown())
                    log("task submission rejected", e);
            }
        }
    }

    public void stop() { exec.shutdown(); }

    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(req))
            stop();
        else
            dispatchRequest(req);
    }
}


* 延时和定时任务
通过使用TimerTask, Timer类以及提供了延时和定时的功能, 但Timer的问题在于, 它在运行的时候只创建了一个线程, 这样, 当一个任务运行并且占用很长时间的时候, 其他的任务可能已经超时失效. 另外, Timer不处理RuntimeException, 这样可能导致整个程序崩溃.

ScheduledThreadPool通过允许创建多个线程来解决这个问题. JDK5还提供了一个很有趣的DelayQueue. 见JDK API 手册.

DelayQueue<E extends Delayed>
DelayQueue是Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null


* 对于线程池而言, Task的状态定义
The lifecycle of a task executed by an Executor has four phases: created, submitted, started, and completed. Since tasks can take a long time to run, we also want to be able to cancel a task. In the Executor framework, tasks that have been submitted but not yet started can always be cancelled, and tasks that have started can sometimes be cancelled if they are responsive to interruption. Cancelling a task that has already completed has no effect.

* 对于许多需要参与后续处理的任务而言, Callable接口是更好的选择, 因为Callable接口允许返回值, 并且允许抛出checked异常. 相比之下, Runnable是无返回值, 不允许抛出checked异常的.

* 通过ExecutorService.submit可以得到一个Future对象, 用于获取结果或者取消任务的执行

* 要考虑把串行的多个任务分割为N个线程并发执行任务的代价是否值得. 加入有一个任务占据了90%的时间, 那么这种情况下分割为并发的模式并没有得到多少好处, 相反代码却复杂了许多.

* CompletionService
是一个ExecutorService和BlockingQueue的混合体, 可以提交一堆任务给它, 然后可以像通过队列的take, pull方式来获得结果

// 使用CompletionService的伪代码
CompletionService cs = new CompletionService(exec);
foreach(Task task: tasks)
    cs.submit(task)

while(true)
    Object result = cs.taks().get();


* 超时处理
Future.get(long timeout, TimeUnit unit)是个很有用的方法, 可以用来控制线程的执行时间, 避免线程毫无节制地运行. 一旦超过指定的时候, 没有返回结果, 就可能抛出TimeoutException, 捕捉到这个异常后可以尝试用Future.cancel(true) 来中断线程的运行.

Page renderPageWithAd() throws InterruptedException {
    long endNanos = System.nanoTime() + TIME_BUDGET;
    Future<Ad> f = exec.submit(new FetchAdTask());
    // Render the page while waiting for the ad
    Page page = renderPageBody();
    Ad ad;
    try {
        // Only wait for the remaining time budget
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, NANOSECONDS);
    } catch (ExecutionException e) {
        ad = DEFAULT_AD;
    } catch (TimeoutException e) {
        ad = DEFAULT_AD;
        f.cancel(true);
    }
    page.setAd(ad);
    return page;
}

* 取消线程的运行
1) 即使取消. 如果线程是不断的检测某个标志位来运行的, 那么提供一种手段来设置这个标志位即可. 通常要求这种标志位是volatile的
2) 使用一个守护线程, 并保持对工作线程的引用. 当守护线程等待一段时间后, 执行上述修改标志位的操作
3) 然而, 如果工作线程是可能阻塞的, 比如使用了blockingQueue, 那么即使设置了标志位, 它可能还是继续阻塞等待其他的输入, 而根本没去检查标志位, 就无法实现取消线程的目的了.
A good way to think about interruption is that it does not actually interrupt a running thread; it just requests that the thread interrupt itself at the next convenient opportunity
Thread.interrupt其实只是请求某个线程去中断...

4) 检查线程是否被中断来实现取消线程
class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }
    public void cancel() { interrupt(); }
}


5) 最可取的策略
The most sensible interruption policy is some form of thread-level or service level cancellation: exit as quickly as practicalcleaning up if necessary, and possibly notifying some owning entity that the thread is exiting

* 取消的是线程还是任务?
取消一个任务是很正常的事情, 但是如果取消一个线程, 那么可能影响到很多东西, 比如线程池的一个工作线程, 可能意味着取消任务, 并从池里删去这个线程....

* JDK的线程框架提供的中断机制仅仅是抛出了InterruptedException, 是因为任务通常不是在自己的线程运行, 而是从其他地方或者线程池"借"来的, 因为不应该操作其他线程, 所以抛出异常让工作线程去处理是最可取的做法.

* 如果工作线程正在运行的时候收到一个InterruptedException, 可以自己处理, 比如剩下的计算, 在最后再还原这个中断的状态. 使用
Thread.currentThread().interrupt()

比如下面的线程就很拽, 非要做完自己的事情
public Task getNextTask(BlockingQueue<Taskgt; queue) {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                return queue.take();
            } catch (InterruptedException e) {
                interrupted = true;

                // fall through and retry
            }
        }
    } finally {
        if (interrupted)
            Thread.currentThread().interrupt();
 //确保线程的中断状态被还原
    }
}


* 一般情况下, 如果不了解工作线程的取消机制, 不要随便对工作线程使用中断操作. 然而JDK的标准的Executor使用的线程池的线程实现了"允许使用中断", 因此通过Future.cancel(true)来取消线程是安全的.
    示例:
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
    Future<?> task = taskExec.submit(r);
    try {
        task.get(timeout, unit);
    } catch (TimeoutException e) {
        // task will be cancelled below
    } catch (ExecutionException e) {
        // exception thrown in task; rethrow
        throw launderThrowable(e.getCause());
    } finally {
        // Harmless if task already completed
        task.cancel(true); // interrupt if running
    }
}


* 关于锁定的取消
对于使用synchronized来实现的锁定, 没有任何办法可以干预, 只能等. 但是如果使用Lock.lockInterruptibly 方法, 它允许你在等待锁定的同时也可以响应中断事件.

* 应该由线程的owner去中断线程. 通常而言, owner是指创建线程对象的类, 比如线程池. 常见的做法是"申请"让线程池去中断线程.

基于线程来工作的Service类应该提供关闭线程的方法.
* ExecutorService提供了优雅的关闭方法
public class LogService {
    private final ExecutorService exec = newSingleThreadExecutor();
    public void start() { }
    public void stop() throws InterruptedException {
        try {
            //启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。
            exec.shutdown();

            //请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
            exec.awaitTermination(TIMEOUT, UNIT);
        } finally {
            writer.close();
        }
    }
    public void log(String msg) {
        try {
            exec.execute(new WriteTask(msg));
        } catch (RejectedExecutionException ignored) { }
    }
}

* Poison Pill(毒药?)模式

基本思想就是往queue里面设置一个标志位, 告诉工作线程停止下来
要求:
1) producer/consumer 线程的数量已知
2) 由一个主类保持 producer线程的引用, 当需要停止的时候, 通知所有一个producer线程往queue放入一个Poison Pill标志
3) 注意放入标志的数量
The approach in IndexingService can be extended tomultiple producers by having each producer place a pill on the queue and having the consumer stop only when it receives N[producers] pills. It can be extended to multiple consumers by having each producer place N[consumers] pills on the queue
4) 要求queue的容量是"无限"的, 否则, 如果队列满了, producer线程就挂住了

* 一个Executor关闭的时候, 需要知道哪些任务正在运行, 哪些还没有运行.
这个Executor可以跟踪到shutdown的时候, 哪些task仍在运行
public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    private final Set<Runnable> tasksCancelledAtShutdown =
        Collections.synchronizedSet(new HashSet<Runnable>());
    ...
    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated())
            throw new IllegalStateException(...);
        return new ArrayList<Runnable>(tasksCancelledAtShutdown);
    }

    public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                /*注意这个finally段, 如果是在关闭的时候, 会有异常(可能是RuntimeException)抛出, 使程序直接运行这里的代码. 如果是正常运行的话, 那么下面的isShutdown()就是false*/
                    if (isShutdown()
                        && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
    // delegate other ExecutorService methods to exec
}


* 并发情况下如果一个线程出现问题(例如抛出RuntimeException), 如何处理
1) 忽略这种异常, 并且使用一个新的线程替代该线程(异常随后被抛出)
2) 如果这个时候刚好是关闭连接池, 忽略
3) 如果目前的线程池已经能满足并发要求, 忽略

大多数线程池内的设计的工作线程的模式:
public void run() {
    Throwable thrown = null;
    try {
        while (!isInterrupted())
            runTask(getTaskFromWorkQueue());
    } catch (Throwable e) {
        thrown = e;
    } finally {
        threadExited(this, thrown);
    }
}

* 另一种处理线程异常的方式: 使用UncaughtExceptionHandler 
在JDK5以及以后的情况下, 可以使用Thread.setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)来进行异常处理. 可以每个线程设置一个Thread.UncaughtExceptionHandler. 

JDK的解释:
设置当线程由于未捕获到异常而突然终止,并且没有为该线程定义其他处理程序时所调用的默认处理程序。 
未捕获到的异常处理首先由线程控制,然后由线程的 ThreadGroup 对象控制,最后由未捕获到的默认异常处理程序控制。如果线程不设置明确的未捕获到的异常处理程序,并且该线程的线程组(包括父线程组)未特别指定其 uncaughtException 方法,则将调用默认处理程序的 uncaughtException 方法。 

public interface UncaughtExceptionHandler {
    void uncaughtException(Thread t, Throwable e);
}

当然可能大多数这样handler做的事情都是记录异常而已

* tips:
1) In long-running applications, always use uncaught exception handlers for all threads that at least log the exception.

2) To set an UncaughtExceptionHandler for pool threads, provide a THReadFactory to the ThreadPoolExecutor constructor

标准的线程池允许抛出uncaught exception来终止工作线程, 并且使用try-finally来进行通知, 使得线程池可以替换原来的工作线程.

令人奇怪的是, 如果一个任务是通过executor.submit提交的, 那么, 任务所抛出的异常被当作一种正常的返回结果, 通过Future.get就可以取得. 如果通过executor.execute来执行的任务就可以实现提交给UncaughtExceptionHandler处理.

* Runtime.addShutdownHook()
可以向JVM注册关闭事件监听线程. JVM默认的情况下不会对正在运行的线程做任何改动. 但是注意所加入的监听线程必须是线程安全的

public void start() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            try { LogService.this.stop(); }
            catch (InterruptedException ignored) {}
        }
    });
}


* 线程分为正常线程和守护线程两种. 所有由JVM创建的线程都是守护线程, 比如gc. 但main线程不是. 由于任何新创建的线程都会集成其父线程的"守护"状态, 所以main线程创建的所有线程都是正常线程

正常线程和守护线程的差异在于退出的时候. 在线程退出的时候, 如果所有剩余的线程都是守护线程, 那么JVM进行一系列的动作, 而且所有正运行的守护线程都被放弃, finally块也不执行.

守护线程特别适用于数据清理方面的操作. 但不推荐.

分享到:
评论

相关推荐

    Delphi多线程详解_delphi_delphi多线程_多线程_

    Delphi多线程详解-编写多线程应用程序教程,是从一本Delphi专业书籍中摘录中的关于多线程编程的一个章节

    一个很不错的VC++ 多线程应用例子

    摘要:VC/C++源码,系统相关,多线程 与VC++爱好者们分享一个很不错的多线程MultiThread应用例子,你可将之中的模块摘录出来供需要者使用。本多线程模块实例以新建文件写入数据为操作目的,创建多个线程分别创建文件,...

    VB异步执行线程的实例源代码

     A:晕,这最终还是调用了老汉多线程……那和线程也没什么区别吧……你应该再试一试线程池……  B:不完全是,因为纤程要先ConvertThreadToFiber,才能CreateFiber,VB中就一个线程,你把它Convert成纤程,那纤程...

    C++经典笔试和面试摘录

    包括了c++经典笔试题,多线程编程,操作系统,数据库,网络相关知识。以及一些经典面经

    GameplayFootball:足球比赛,已停产

    另一件事,这款游戏使用了我自己的多线程游戏引擎(Blunted2),无论听起来多么酷,使用经过严格测试和未记录的引擎都会减慢开发速度。 此外,足球比赛并不需要那么多线程,它只会使事情变得不必要。 我对此不提供...

    binlog:高性能C ++日志库,可生成结构化的二进制日志

    动机请考虑以下来自应用程序的日志摘录,该摘录使用多个线程为每个请求计算浮点值: INFO [2019/12/10 12:54:47.062805012] thread1 Process request #1, result: 6.765831 (bin/app/processor.cpp:137)INFO [2019/...

    2048Solver:高级别玩令人着迷且令人上瘾的游戏2048的Java代码

    此处显示的测试结果是使用多个并行线程运行的。 这会导致每个线程的平均移动时间(线程持续时间/线程移动)变慢,但考虑所有线程时(程序持续时间/所有线程的移动),整体移动时间变短。 目前的计划表现 ![程序...

    JAVA上百实例源码以及开源项目

    百度云盘分享 简介 笔者当初为了学习JAVA,收集了很多经典源码,源码难易程度分为初级... 部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText();...

    JAVA上百实例源码以及开源项目源代码

     部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器地址  String user=jtfUser.getText(); //得到用户名  String pass=...

    java源码包---java 源码 大量 实例

     部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器地址  String user=jtfUser.getText(); //得到用户名  String pass=...

    java源码包2

    Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。... 部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器...

    java源码包3

    Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。... 部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器...

    java源码包4

    Applet钢琴模拟程序java源码 2个目标文件,提供基本的音乐编辑功能。... 部分源代码摘录:  ftpClient = new FtpClient(); //实例化FtpClient对象  String serverAddr=jtfServer.getText(); //得到服务器...

    成百上千个Java 源码DEMO 4(1-4是独立压缩包)

    部分源代码摘录: ftpClient = new FtpClient(); //实例化FtpClient对象 String serverAddr=jtfServer.getText(); //得到服务器地址 String user=jtfUser.getText(); //得到用户名 String pass=jtfPass.getPassword...

    成百上千个Java 源码DEMO 3(1-4是独立压缩包)

    部分源代码摘录: ftpClient = new FtpClient(); //实例化FtpClient对象 String serverAddr=jtfServer.getText(); //得到服务器地址 String user=jtfUser.getText(); //得到用户名 String pass=jtfPass.getPassword...

Global site tag (gtag.js) - Google Analytics