`
阅读更多

首先来看一下,任务的定义:

 所谓的任务,就是抽象,离散的工作单位。你可以简单理解为代码级别的 (Runnable接口)

 大多数并发应用程序都是围绕着任务进行管理的.

 我们来看一小段代码:

Java代码  收藏代码
  1. package  com.ivan.concurrent.charpter6;  
  2.   
  3. import  java.net.ServerSocket;  
  4. import  java.net.Socket;  
  5.   
  6. /**  
  7.  * 顺序化的Web Server.  
  8.  * @author root  
  9.  * OS:Ubuntu 9.04  
  10.  * Date:2010-6-19  
  11.  */   
  12. public   class  SingleThreadWebServer {  
  13.     public   static   void  main(String[] args)  throws  Exception {  
  14.         ServerSocket server=new  ServerSocket( 8080 );  
  15.         while ( true ){  
  16.             Socket socket=server.accept();  
  17.             handleRequest(socket);  
  18.         }  
  19.     }  
  20.   
  21.     private   static   void  handleRequest(Socket socket) {  
  22.         /**  
  23.          * 做相关的处理……, 比如请求运算与I/O  
  24.          *   这将会导致出现阻塞,  会延迟当前请求的处理,  
  25.          *   而且会产生非常严重的后果,比如: 假死。  
  26.          *    那样会极度考验用户的耐心,知道他忍无可忍的关闭浏览器。  
  27.          *   同时,单线程在等待IO操作时,CPU处于闲置状态,这样也降低了资源的利用率   
  28.          *     
  29.          *  这样的服务器,缺乏良好的吞吐量和快速的响应性。  
  30.          */   
  31.     }  
  32. }  

 




上面的代码是顺序地执行任务,主线程在不断接受连接与处理请求之间交替运行。
一个Web请求会做相关的处理……, 比如请求运算与I/O
 这将会导致出现阻塞,  会延迟当前请求的处理,
 而且会产生非常严重的后果,比如: 假死。
 那样会极度考验用户的耐心,知道他忍无可忍的关闭浏览器。
 同时,单线程在等待IO操作时,CPU处于闲置状态,这样也降低了资源的利用率
 这样的服务器,缺乏良好的吞吐量和快速的响应性。

所以,基于上面代码的基础上,我们需要给他作些小许的改进:

Java代码  收藏代码
  1. package  com.ivan.concurrent.charpter6;  
  2.   
  3. import  java.net.ServerSocket;  
  4. import  java.net.Socket;  
  5.   
  6. public   class  ThreadPerTaskWebServer {  
  7.     public   static   void  main(String[] args)  throws  Exception {  
  8.         ServerSocket server=new  ServerSocket( 80 );  
  9.         while ( true ){  
  10.             final  Socket socket=server.accept();  
  11.             new  Thread( new  Runnable(){  
  12.                 public   void  run() {  
  13.                     handleRequest(socket);  
  14.                 }  
  15.             }).start();  
  16.         }  
  17.     }  
  18.   
  19.     protected   static   void  handleRequest(Socket socket) {  
  20.         /**  
  21.          *相比较而言,这样的处理方式有良好的改进:  
  22.          * 1.执行人物的负载已经脱离主线程,让主循环能更加迅速的重新开始等待下一个连接。提高了响应性  
  23.          * 2.并发处理任务,多个请求可以同时得到处理,提高了吞吐性  
  24.          * 3.任务处理代码必须要是线程安全的。防止出现并发性数据共享问题。   
  25.          *   
  26.          * 这个程序可能在开发阶段运行良好,一旦部署,就可能出现致命的错误,  
  27.          * 我们接着来分析:  
  28.          */   
  29.     }  
  30. }  

 


    相比较而言,这样的处理方式有良好的改进:
    1.执行人物的负载已经脱离主线程,让主循环能更加迅速的重新开始等待下一个连接。提高了响应性
     2.并发处理任务,多个请求可以同时得到处理,提高了吞吐性
     3.任务处理代码必须要是线程安全的。防止出现并发性数据共享问题。
    
 这个程序可能在开发阶段运行良好,一旦部署,就可能出现致命的错误,
 我们接着来分析:

 我们看到,上面的代码中,是为每个请求的到来,创建一个新的线程来处理, 那么这样就会有以下的问题出现:


无限创建线程的缺点:
1.线程生命周期的开销
1.1.线程的创建与关闭并非是免费的,实际的开销根据不同的OS有不同的处理.但是线程的创建的确需要时间,带来处理请求的延迟.一般的Web Server的请求是很频繁的,为每个请求创建一个线程,无非要耗费大量的资源.
2.资源消耗量
2.1. 活动的线程会消耗资源,尤其是内存.如果可运行的线程数多于可用的处理器数,线程将会空闲。大量的空闲线程占用更多的内存,给垃圾回收器带来压力,而且, 线程在竞争CPU的同时,也会带来许多其他的性能开销。所以,建议在有足够多的线程让CPU忙碌时,不要再创建多余的线程.
3.应用的稳定性
3.1. 应该限制创建线程的数量,限制的数目根据不同的平台而定,同时也受到JVM的启动参数,Thread的构造函数中栈大小等因素的影响. 如果打破了这个限制,你很可能会得到一个OutOfMemoryError. 在一定范围内增加线程可以提高系统的吞吐量,但是一旦超过这个范围,再创建线程只会拖垮你的系统。甚至可能会导致应用程序的崩溃.
  
我们的解决办法:
    使用线程池,当然,你完全没有必要自己写一个线程池的实现(好吧,或许你跟我一样,也希望能从重复创造轮子中,找到自己想要了解的东西),你可以利用 Executor框架来帮你处理,java.util.concurrent提供了一个灵活的线程池实现。 在新的java类库当中,任务执行的首要抽象不是Thread,而是Executor.
    Executor仅仅是一个简单的接口,但是它很强大,包括用于异步任务的执行,支持不同类型的任务执行策略,为任务提交和任务执行之间的解藕,提供了标准的方式等等, 我们后续再重点讨论。
    Executor基于 生产者-消费者模式。提交任务的是生产者,执行任务的是消费者。 也就是说, 采用Executor框架实现 生产者-消费者模式,十分简单。

Java代码  收藏代码
  1. package  com.ivan.concurrent.charpter6;  
  2.   
  3. import  java.net.ServerSocket;  
  4. import  java.net.Socket;  
  5. import  java.util.concurrent.Executor;  
  6. import  java.util.concurrent.Executors;  
  7.   
  8. public   class  TaskExecutionWebServer{  
  9.     private   static   final   int  NTHREADS= 100 ;  
  10.     //使用线程池来避免 为每个请求创建一个线程。   
  11.     private   static   final  Executor threadPool=Executors.newFixedThreadPool(NTHREADS);  
  12.       
  13.     public   static   void  main(String[] args)  throws  Exception {  
  14.         ServerSocket server=new  ServerSocket( 8011 );  
  15.         while ( true ){  
  16.             final  Socket socket=server.accept();  
  17.             threadPool.execute(new  Runnable(){  
  18.                 public   void  run() {  
  19.                     handleRequest(socket);  
  20.                 }  
  21.             });  
  22.         }  
  23.     }  
  24.   
  25.     protected   static   void  handleRequest(Socket socket) {  
  26.         /**  
  27.          *  
  28.          */   
  29.         System.out.println(Thread.currentThread().getId());  
  30.         try  {  
  31.             Thread.sleep(5000 );  
  32.         } catch  (InterruptedException e) {  
  33.             e.printStackTrace();  
  34.         }  
  35.     }  
  36.       
  37.       
  38. }  

 

线程池:
    线程池管理着一个工作者线程的同构池,线程池是与工作队列紧密绑定的。工作队列的作用就是持有所有等待执行的任务, 工作者队列只需要从工作队列中获取到下一个任务,执行,然后回来等待下一个线程。
    Java类库中提供了以下几种线程池:
1.newFixedThreadPool :创建定长的线程池,每当提交一个任务就创建一个线程,直到达到池的最大长度。
2.newCachedThreadPool:创建一个可缓存的线程池,如果当前线程池的长度超过了处理的需要,它可以灵活的收回空闲线程,当需求增加时,它可以灵活添加新的线程,而并不对池的长度做任何限制
3.newSingleThreadExecutor:创建单线程化的executor,它只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另外一个线程来取代它.它会保证任务按照任务队列规定的顺序来执行。
4.NewScheduledThreadPool:创建一个定长的线程池,而且支持定时的,以及周期性的任务执行,类似Timer.

Executor的生命周期:
    它的创建已经说了,我们来看看它如何关闭, Executor 是为了执行任务而创建线程,而JVM通常会在所有非后台线程退出后才退出,如果它无法正确的关闭,则会影响到JVM的结束。
    这里需要提一下,在我们了解如何关闭Executor的一些疑惑,  由于Executor是异步执行任务,那么这些任务的状态不是立即可见的, 换句话说,在任务时间里,这些执行的任务中,有的可能已经完成,有的还可能在运行,其他的还可能在队列里面等待。 为了解决这些问题, Java引入了另外一个接口,它扩展了Executor,并增加一些生命周期的管理方法: ExecutorService.


ExecutorService表示生命周期有三种状态:  运行,关闭,终止。
    关闭和终止? 怎么看上去是一个意思, 这里我们先搁置着,留着后续来讨论。
    
ExecutorService最初创建后的初始状态就是运行状态;
    shutdown与shutdownNow方法,都是ExecutorService的关闭方法,区别在于:
    shutdown:
        会启动一个平稳的关闭过程, 停止接受新任务,同时等待已经提交的任务完成(包括尚未开始执行的任务)
    shutdownNow:
        会启动一个强制关闭的过程:尝试取消所有运行中的任务和排在队列中尚未开始的任务。

    一旦所有任务全完成后,ExecutorService会转到终止状态, awaitTermination可以用来等待ExecutorService到达终止状态,也可以轮询isTerminated判断ExecutorService是否已经终止。

Java代码  收藏代码
  1. package  com.ivan.concurrent.charpter6;  
  2.   
  3. import  java.io.IOException;  
  4. import  java.net.ServerSocket;  
  5. import  java.net.Socket;  
  6. import  java.util.concurrent.ExecutorService;  
  7. import  java.util.concurrent.Executors;  
  8. import  java.util.concurrent.RejectedExecutionException;  
  9.   
  10. /**  
  11.  * 线程池的生命周期是如何管理的?  
  12.  * @author root  
  13.  * OS:Ubuntu 9.04  
  14.  * Date:2010-6-19  
  15.  */   
  16. public   class  LifeCycleWebServer {  
  17.     private   static   final   int  NTHREADS= 100 ;  
  18.     private   static   final  ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);  
  19.       
  20.     public   void  start()  throws  IOException{  
  21.         ServerSocket server=new  ServerSocket( 8011 );  
  22.         while (exec.isShutdown()){  
  23.             try  {  
  24.                 final  Socket socket=server.accept();  
  25.                 exec.execute(new  Runnable(){  
  26.                     public   void  run() {  
  27.                         handleRequest(socket);  
  28.                     }  
  29.                 });  
  30.             } catch  (RejectedExecutionException e) {  
  31.                 if (!exec.isShutdown()){  
  32.                     //log.error(...)   
  33.                 }  
  34.             }  
  35.         }  
  36.     }  
  37.       
  38.       
  39.     protected   void  handleRequest(Socket socket) {  
  40.         Request req=readRequest(socket);  
  41.         if (isShutDown(req)){  
  42.             stop();  
  43.         }else {  
  44.             dispatchRequest(req);  
  45.         }  
  46.     }  
  47.   
  48.     public   void  stop(){  
  49.         exec.shutdown();  
  50.     }  
  51.       
  52.       
  53.     //~ Mock Object And Function..   
  54.     private   static   class  Request{  
  55.           
  56.     }  
  57.       
  58.     private  Request readRequest(Socket socket) {  
  59.         // TODO Auto-generated method stub   
  60.         return   null ;  
  61.     }  
  62.   
  63.   
  64.     private   boolean  isShutDown(Request req) {  
  65.         // TODO Auto-generated method stub   
  66.         return   false ;  
  67.     }  
  68.   
  69.   
  70.     private   void  dispatchRequest(Request req) {  
  71.         // TODO Auto-generated method stub   
  72.           
  73.     }  
  74.       
  75. }  

 


OK,了解了线程池的使用,这里有必要介绍介绍执行策略,

执行策略:
    简单来说,就是任务执行的”What,When,Where,How”,包括:
1.任务在什么线程中执行(what)
2.任务以什么顺序执行(fifo,lifo,优先级)?
3.可以有多少个任务并发执行?(how many)
4.可以有多少个任务进入等待执行队列
5.系统过载时,需要放弃一个任务,该挑选哪一个? 如何通知应用程序知道?

 


另外,java类库中还提供有一种特别的任务,----可携带结果的任务:
    Callable 和 Future
    Runnable 作为任务的基本表达形式只是个相当有限的抽象; 它的局限在于,不能返回一个值或者抛出受检查的异常。
    通常,很多任务都会引起严重的计算延迟,比如执行数据库查询,从网络下载资源,进行复杂的计算。对于这样的任务,Callable是更佳的抽象: 它在主进入点,等待返回值,并为可能抛出的异常预先作准备。
    Runnable与Callable描述的都是首相的计算型任务,这些任务通常都是有限的。,任务的所生命周期分为4个阶段: 创建、提交、开始和完成。
    Future描述了任务的生命周期,并提供了相关的方法来获取任务的结果、取消任务以及检验任务是否已经完成或者被取消。
    Future的get方法取决于任务的状态, 如果任务已经完成,get会立即返回或者抛出异常,如果任务没有完成,get会阻塞直到它的完成。
    
    创建Future的方法有很多, ExecutorService的submit会返回一个Future,你可以将一个Callable或者Runnable提交给executor,然后得到一个Future,用它来重新获得任务执行的结果,或者取消任务。
    你也可以显示的为给定的Callable和Runnable实例化一个FutureTask.

    
OK, 前面介绍了很多关于并发的理论知识,下面我们来看看,如果寻找可强化的并发性。

首先,我们从一个例子开始, 开始之前,简单介绍一下这个例子所要表达的事情:
    它的来源是浏览器程序中渲染页面的那部分功能, 首先获取HTML,并将它渲染到图像缓存里。为了简单起见,我们假设HTML只有文本标签。 OK, 开始吧。

    首先,如果按照一般的处理方式,我们会这样做:
1.遇到文本标签,将它渲染到图像缓存中
2.当遇到的是一个图片标签,我们通过网络获取它,再将它放到缓存里面。
    
    很明显,这是最简单的方式, 它很容易实现,但是,问题在于,你这样做,是在考验用户的耐心,结果就是他会对着屏幕丢一句 ****.然后毫不犹豫的关掉浏览器.

    另外一种方法:
     它先渲染文本,并为图像预留出占位符;在完成第一趟文本处理后,程序返回开始,并下载图像,将它们绘制到占位符上去。 但是这样的问题也很明显, 需要最少2次的文档处理, 其性能与效率稍有提升,但是还不足解决用户希望快速浏览页面的需求。

    为了使我们的渲染器具有更高的并发性,我们需要做的第一步就是, 将渲染过程分为两部分: 一个用来渲染文本,一个用来下载所有图像。(一个受限于CPU,另外一个受限于IO, 即使在单CPU系统上,效率的提升也很明显。)
    Callable与Future可以用来表达所有协同工作的任务之间的交互。我们来看代码:

Java代码  收藏代码
  1. package  com.ivan.concurrent.charpter6;  
  2.   
  3. import  java.util.ArrayList;  
  4. import  java.util.List;  
  5. import  java.util.concurrent.Callable;  
  6. import  java.util.concurrent.ExecutionException;  
  7. import  java.util.concurrent.ExecutorService;  
  8. import  java.util.concurrent.Executors;  
  9. import  java.util.concurrent.Future;  
  10.   
  11. public   class  FutureRenderer {  
  12.     private   static   final   int  NTHREADS= 100 ;  
  13.     private   static   final  ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);  
  14.       
  15.     void  renderPage(CharSequence source){  
  16.         final  List<ImageInfo> imageinfos=scanForImageInfo(source);  
  17.         Callable<List<ImageData>> task=  
  18.                 new  Callable<List<ImageData>>(){  
  19.                     public  List<ImageData> call()  throws  Exception {  
  20.                         List<ImageData> result=new  ArrayList<ImageData>();  
  21.                         for (ImageInfo imageinfo:imageinfos){  
  22.                             result.add(imageinfo.downloadImage());  
  23.                         }  
  24.                         return  result;  
  25.                     }  
  26.               
  27.         };  
  28.           
  29.           
  30.         Future<List<ImageData>> future=exec.submit(task);  
  31.         //保证渲染文本与下载图像数据并发执行。   
  32.         renderText(source);  
  33.         try  {  
  34.             /**  
  35.              * 到达需要所有图像的时间点时,主任务会等待future.get调用的结果,  
  36.              *  幸运的话,我们请求的同时,下载已经完成,即使没有,下载也已经预先开始了。  
  37.              *    
  38.              *  这里还有一定的局限性, 用户可能不希望等待所有图片下载完成后才可以看见,  
  39.              *   他希望下载完成一张图片后,就可以立即看到。 …… 这里还待优化。  
  40.              */   
  41.             List<ImageData> imageData=future.get();  
  42.               
  43.             for (ImageData data:imageData){  
  44.                 reanderImage(data);  
  45.             }  
  46.         } catch  (InterruptedException e) {  
  47.             Thread.currentThread().interrupt();  
  48.             future.cancel(true ); //取消任务   
  49.         }catch (ExecutionException e){  
  50.             e.printStackTrace();  
  51.               
  52.         }  
  53.     }  
  54.   
  55.     private   void  renderText(CharSequence source) {  
  56.         // TODO Auto-generated method stub   
  57.           
  58.     }  
  59.   
  60.     private   void  reanderImage(ImageData data) {  
  61.         // TODO Auto-generated method stub   
  62.           
  63.     }  
  64.   
  65.     private  List<ImageInfo> scanForImageInfo(CharSequence source) {  
  66.         // TODO Auto-generated method stub   
  67.         return   null ;  
  68.     }  
  69. }  

 



CompletionService: 当executorService遇到BlockingQueue
    CompletionService整合了Executor和BlockingQueue的功能,你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完成可用时,获得这个结果,像一个打包的Future.
  我们利用它来为我们的渲染器需要优化的地方做些处理,代码如下:

Java代码  收藏代码
  1. package  com.ivan.concurrent.charpter6;  
  2.   
  3. import  java.util.List;  
  4. import  java.util.concurrent.Callable;  
  5. import  java.util.concurrent.CompletionService;  
  6. import  java.util.concurrent.ExecutionException;  
  7. import  java.util.concurrent.ExecutorCompletionService;  
  8. import  java.util.concurrent.ExecutorService;  
  9. import  java.util.concurrent.Executors;  
  10. import  java.util.concurrent.Future;  
  11.   
  12. public   class  FutureRenderer2 {  
  13.     private   static   final   int  NTHREADS= 100 ;  
  14.     private   static   final  ExecutorService exec=Executors.newFixedThreadPool(NTHREADS);  
  15.       
  16.     void  renderPage(CharSequence source){  
  17.         final  List<ImageInfo> imageinfos=scanForImageInfo(source);  
  18.           
  19.         CompletionService<ImageData> completionService=new  ExecutorCompletionService<ImageData>(exec);  
  20.           
  21.         for ( final  ImageInfo imageinfo:imageinfos){  
  22.             completionService.submit(new  Callable<ImageData>(){  
  23.                 public  ImageData call()  throws  Exception {  
  24.                     //提高性能点一: 将顺序的下载,变成并发的下载,缩短下载时间   
  25.                     return  imageinfo.downloadImage();  
  26.                 }  
  27.             });  
  28.         }  
  29.         renderText(source);  
  30.         try  {  
  31.             for ( int  i= 0 ;i<imageinfos.size();i++){  
  32.                 Future<ImageData> f=completionService.take();  
  33.                 //提高性能点二: 下载完成一张图片后,立刻渲染到页面。   
  34.                 ImageData imagedata=f.get();  
  35.                 reanderImage(imagedata);  
  36.             }  
  37.         } catch  (InterruptedException e) {  
  38.             Thread.currentThread().interrupt();  
  39.         }catch (ExecutionException e){  
  40.             e.printStackTrace();  
  41.               
  42.         }  
  43.     }  
  44.   
  45.     private   void  renderText(CharSequence source) {  
  46.         // TODO Auto-generated method stub   
  47.           
  48.     }  
  49.   
  50.     private   void  reanderImage(ImageData data) {  
  51.         // TODO Auto-generated method stub   
  52.           
  53.     }  
  54.   
  55.     private  List<ImageInfo> scanForImageInfo(CharSequence source) {  
  56.         // TODO Auto-generated method stub   
  57.         return   null ;  
  58.     }  

分享到:
评论

相关推荐

    Java多线程设计模式上传文件

    Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...

    java多线程读取文件

    Java多线程读大文件 java多线程写文件:多线程往队列中写入数据

    java多线程ppt

    java多线程PPT 多线程基本概念 创建线程的方式 线程的挂起与唤醒 多线程问题

    java 多线程操作数据库

    一个java 多线程操作数据库应用程序!!!

    java多线程经典案例

    java多线程经典案例,线程同步、线程通信、线程阻塞等经典案例

    Java多线程编程技术

    《Java多线程编程核心技术》建议猿友们读两遍,因为其写得没有那么抽象,第一遍有些概念不是很理解,可以先跳过并记录起来,第一遍阅读的目的主要是了解整个架构。第二遍再慢慢品味,并贯穿全部是指点来思考,并将...

    Java多线程编程实战指南(核心篇)

    Java多线程编程实战指南(核心篇) 高清pdf带目录 随着现代处理器的生产工艺从提升处理器主频频率转向多核化,即在一块芯片上集成多个处理器内核(Core),多核处理器(Multicore Processor)离我们越来越近了――如今...

    Java多线程知识点总结

    该文档总结了Java多线程相关的知识点,分享给大家,简单易懂!

    java多线程的讲解和实战

    详细的讲解了java多线程的原理,并配有代码进行实战,适合java初学者和想对多线程有进一步了解的人。

    java多线程通信图解

    一张图方便理解和掌握java 多线程之间通信的实质 java 多线程 其实就是每个线程都拥有自己的内存空间,多线程之间的通信,比例A线程修改了主内存(main方法的线程)变量,需要把A线程修改的结果同步到主线程中,...

    java多线程处理数据库数据

    java多线程处理数据库数据,使用并发包,无框架,可批量处数据库数据,进行增删改。。等等操作。

    java多线程,对多线程,线程池进行封装,方便使用

    java多线程,对多线程,线程池进行封装,方便使用

    Java多线程编程经验

    现在的操作系统是多任务操作系统。多线程是实现多任务的一种方式。 线程是指进程中的一个执行流程,一个进程中可以运行多个线程。...本文档提供Java多线程编程经验,方便广大Java爱好者研究学习Java多线程

    java多线程处理大数据

    java多线程处理大数据,可根据配置的线程数,任务去调度处理

    java多线程并发

    java多线程并发的在新窗口

    Java多线程机制(讲述java里面与多线程有关的函数)

    Java多线程机制 9.1 Java中的线程 9.2 Thread的子类创建线程 9.3 使用Runable接口 9.4 线程的常用方法 9.5 GUI线程 9.6 线程同步 9.7 在同步方法中使用wait()、notify 和notifyAll()方法 9.8 挂起、恢复和终止线程 ...

    java多线程核心技术

    资深Java专家10年经验总结,全程案例式讲解,首本全面介绍Java多线程编程技术的专著 结合大量实例,全面讲解Java多线程编程中的并发访问、线程间通信、锁等最难突破的核心技术与应用实践 Java多线程无处不在,如...

    java多线程实现大批量数据导入源码

    java多线程实现大批量数据切分成指定份数的数据,然后多线程处理入库或者导出,线程的个数和每份数据的数量都可以控制

    java多线程查询数据库

    java多线程并发查询数据库,使用线程池控制分页,并发查询。

    java多线程模拟队列实现排队叫号

    java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号 java多线程模拟队列实现排队叫号,多线程模拟排队叫号取号

Global site tag (gtag.js) - Google Analytics