- 浏览: 190944 次
- 性别:
- 来自: 北京
-
文章分类
最新评论
-
only_java:
博主,你好。感谢这篇你的这篇文章,我的问题是跟你一样,也是在跑 ...
JVM Crash分析 -
shuofenglxy:
1 确保程序运行时没有更新程序需要的相关jar包。2 确保程序 ...
JVM Crash分析 -
renduly:
# A fatal error has been detect ...
JVM Crash分析 -
shuofenglxy:
renduly 写道博主好。这两天我在公司程序也出现了类似的问 ...
JVM Crash分析 -
renduly:
博主好。这两天我在公司程序也出现了类似的问题。博主能否说的详细 ...
JVM Crash分析
多线程应用执行时JDK提供了线程的载体,线程池,通过线程池管理线程,优化线程的执行,有效合理利用资源。而JDK提供的线程池有四大类:FixedThreadPool,SingleThreadExecutor,CachedThreadPool,ScheduledThreadPool。这四种池各有特点。现在一一来看。
第一,FixedThreadPool,来自jdk的解释是这样的:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。而通过源码
参数说明:第一个nThreads,线程池的线程数。第二个nThreads是线程池中允许的最大线程数。
0L表示多余最大线程数-线程池的线程数的这些线程在闲置情况下允许的存活时间。TimeUnit时间单位。 LinkedBlockingQueue<Runnable> 一个基于已链接节点的、范围任意的 blocking queue。备注:还有可能有一个参数threadFactory用来创建线程。
可以看到线程池有一个LinkedBlockingQueue<Runnable>队列来存放过量的任务(也就是JDK中所说的附加任务)。所以,这个固定大小的线程池的好处在于,无论是流量高峰还是没有访问装态,都会最多有nThreads在线程池(如果某一瞬间,某些线程down掉后,没来的急新建,那就少于这个值了),这样的话能够尽量保证消耗的内存空间较少,能够避免一些线程数猛增带来的OOM问题。
第二:SingleThreadExecutor。来自jdk的解释:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
源码:
说明:只是池大小为1了。
分析:此种池的特点在于永远只有一个有效线程在运行,也就成了多个任务串行执行了,对于前后任务有顺序的多任务可能有所帮助。
第三:CachedThreadPool。来自jdk的解释:创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
参见创建源码:
说明:这个池建立参数上比较特殊,它允许最大 Integer.MAX_VALUE个线程共存,并且设置了60秒线程的闲置限制。并采用了奇怪的SynchronousQueue。这个队列是这样的,看jdk api文档:
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
分析:这种池的特点在于能根据任务的数量设置池中线程多少,并能在一段时间后清除闲置线程,但是风险在于允许太多的线程存在,这就会导致线程创建的资源消耗过多。
第四:ScheduledThreadPool。来自jdk的解释:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
创建源码:
该池最大的特点在于允许你定时delay后按照一定的速率隔时执行一些任务。
简要的理解了这几种池之后通过几个demo来进一步认识下哈。 源码是王道。恩直接看吧:
这个例子主要是理解下四个池的特点,针对runnable和callable任务的一些测试。
另外看一个自己修正的简要池,主要是用来测RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>。
源码:
再看一下最奇怪的同步队列
说明:
线程池其实也是比较简单的 ,只要记住将任务按照需要组装成runnale 或者callable形式(两者的区别在于有无返回值),然后选择合适的池类型,将任务提交就可以了。只是要注意多个任务的时候 控制任务的到达等,然后要注意的就是合适的时机关闭线程池了。
第一,FixedThreadPool,来自jdk的解释是这样的:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。而通过源码
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
参数说明:第一个nThreads,线程池的线程数。第二个nThreads是线程池中允许的最大线程数。
0L表示多余最大线程数-线程池的线程数的这些线程在闲置情况下允许的存活时间。TimeUnit时间单位。 LinkedBlockingQueue<Runnable> 一个基于已链接节点的、范围任意的 blocking queue。备注:还有可能有一个参数threadFactory用来创建线程。
可以看到线程池有一个LinkedBlockingQueue<Runnable>队列来存放过量的任务(也就是JDK中所说的附加任务)。所以,这个固定大小的线程池的好处在于,无论是流量高峰还是没有访问装态,都会最多有nThreads在线程池(如果某一瞬间,某些线程down掉后,没来的急新建,那就少于这个值了),这样的话能够尽量保证消耗的内存空间较少,能够避免一些线程数猛增带来的OOM问题。
第二:SingleThreadExecutor。来自jdk的解释:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
源码:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
说明:只是池大小为1了。
分析:此种池的特点在于永远只有一个有效线程在运行,也就成了多个任务串行执行了,对于前后任务有顺序的多任务可能有所帮助。
第三:CachedThreadPool。来自jdk的解释:创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
参见创建源码:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
说明:这个池建立参数上比较特殊,它允许最大 Integer.MAX_VALUE个线程共存,并且设置了60秒线程的闲置限制。并采用了奇怪的SynchronousQueue。这个队列是这样的,看jdk api文档:
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。
分析:这种池的特点在于能根据任务的数量设置池中线程多少,并能在一段时间后清除闲置线程,但是风险在于允许太多的线程存在,这就会导致线程创建的资源消耗过多。
第四:ScheduledThreadPool。来自jdk的解释:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
创建源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
该池最大的特点在于允许你定时delay后按照一定的速率隔时执行一些任务。
简要的理解了这几种池之后通过几个demo来进一步认识下哈。 源码是王道。恩直接看吧:
这个例子主要是理解下四个池的特点,针对runnable和callable任务的一些测试。
package ThreadSPools; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ThreadPoolDemo { public static void main(String[] args){ Thread main = Thread.currentThread(); final HashMap<Integer, ArrayList<Integer>> map = new HashMap<Integer,ArrayList<Integer>>(); HashMap<Integer, ArrayList<Integer>> resultMap = new HashMap<Integer,ArrayList<Integer>>(); for(int i=0;i<300;i++){ ArrayList<Integer> arrayList= new ArrayList<Integer>(); for(int i1=0;i1<10000;i1++){ int p = (int) (Math.random()*10000); arrayList.add(p); } map.put(i, arrayList); } //ExecutorService executor = Executors.newFixedThreadPool(2); //ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newCachedThreadPool(); //ExecutorService executor = Executors.newScheduledThreadPool(4); long start = System.currentTimeMillis(); for(final int key : map.keySet()){ Callable<ArrayList<Integer>> task = new Callable<ArrayList<Integer>>(){ @Override public ArrayList<Integer>call() throws Exception { Collections.sort(map.get(key)); return map.get(key); } }; Future<ArrayList<Integer>> future=executor.submit(task); try { resultMap.put(key, future.get()); System.out.println(key+" "+resultMap.get(key).subList(0, 10)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } long end = System.currentTimeMillis(); try { main.sleep(6000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Excuting this totally costs "+(end - start)); executor.shutdown(); } }
另外看一个自己修正的简要池,主要是用来测RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>。
源码:
package ThreadSPools; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; //一個類似于cachedPool的ThreadPool 主要用來測試任務的RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable> public class ThreadPoolOwnedByShuofengDemo extends ThreadPoolExecutor{ public ThreadPoolOwnedByShuofengDemo(int nThreads,int maxnThreads, long keepaliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadfactory,RejectedExecutionHandler handler){ super(nThreads,maxnThreads,keepaliveTime, TimeUnit.SECONDS, workQueue, threadfactory,handler); } public static void main(String[]args){ ThreadPoolOwnedByShuofengDemo pool = new ThreadPoolOwnedByShuofengDemo(5,10,30,TimeUnit.SECONDS, //使用這個隊列時,大於這個隊列的容量后,基本上直接拋出RejectedExecutionException //new ArrayBlockingQueue<Runnable>(10) //new LinkedBlockingQueue<Runnable>()//使用這個隊列時,基本上RejectedExecutionHandler沒用了 //沒有空間冗餘,也就是說當任務數大於當前可執行的數量時,一般都直接拋出RejectedExecutionException //偶爾比較幸運當期執行的任務完成,那就會執行后提交的一個,不過這個概率貌似很小一樣,除非提交任務的時間點靠後 new SynchronousQueue<Runnable>() , Executors.defaultThreadFactory(), //当多于 maxnThread +ArrayBlockingQueue大小时抛出异常RejectedExecutionException new AbortPolicy() //当任务数瞬间多于 maxnThread +ArrayBlockingQueue大小时,将任务退回给调用线程执行 //new CallerRunsPolicy() //任务数瞬间多于 maxnThread +ArrayBlockingQueue大小时,后来任务被丢掉 //new DiscardPolicy() //任务数瞬间多于 maxnThread +ArrayBlockingQueue大小时,最早提交还未被执行的任务被丢掉 //new DiscardOldestPolicy() ); final CopyOnWriteArrayList<String> arraylist = new CopyOnWriteArrayList<String>(); //测试runnable task // for(int i=0;i<50;i++){ // Runnable task = new Runnable(){ // // @Override // public void run() { // // System.out.println(Thread.currentThread().getName()+" hello,this is shuofengTask."); // try { // Thread.currentThread().sleep(1000); // } catch (InterruptedException e) { // // TODO Auto-generated catch block // e.printStackTrace(); // } // } // // }; // pool.submit(task); // } //测试Callable task for(int i=0;i<10;i++){ final int id= i; Callable task = new Callable(){ final String taskname= id+"号task"; @Override public Object call() throws Exception { String s =taskname+" hello,this is shuofengTask."; arraylist.add(s); try { Thread.currentThread().sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return arraylist; } }; pool.submit(task); } try { Thread.currentThread().sleep(1000);//sleep时间要合理 保证在池关闭前任务已经都执行完了 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(arraylist.size()); System.out.println(arraylist.subList(0, arraylist.size())); pool.shutdown();//优雅的关闭池资源 } }
再看一下最奇怪的同步队列
package SychQueue; import java.util.Arrays; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; //特点在于每一个消费者都要等生产者放入字符串之后才能读取。而且同步队列没有容量,如果消费者不取的话,则会被阻塞。 public class SynchronousQueueDemo { public static void main(String[]args){ final List<String> msg = Arrays.asList("start","one","two","three"); final BlockingQueue<String> queue = new SynchronousQueue<String>(); ExecutorService executor = Executors.newCachedThreadPool(); Runnable producerTask = new Runnable(){ final List<String> waitingRecievingMsg = msg; @Override public void run() { try { for(String s:waitingRecievingMsg){ queue.put(s); Thread.currentThread().sleep(1000); } queue.put("end"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; executor.submit(producerTask); Runnable consumerTask = new Runnable(){ String recievedMsg; @Override public void run() { try { ; while((recievedMsg = queue.take())!=null&& !recievedMsg.equals("end")){ System.out.println(recievedMsg); Thread.currentThread().sleep(3000); } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; executor.submit(consumerTask); executor.shutdown(); } }
说明:
线程池其实也是比较简单的 ,只要记住将任务按照需要组装成runnale 或者callable形式(两者的区别在于有无返回值),然后选择合适的池类型,将任务提交就可以了。只是要注意多个任务的时候 控制任务的到达等,然后要注意的就是合适的时机关闭线程池了。
发表评论
-
java.lang.OutOfMemoryError: unable to create new native thread
2012-05-09 21:55 1695今天压系统,出现一个java.lang.OutOfMem ... -
ThreadLocal
2010-12-27 10:57 0ThreadLocal 是Thread的一个局部变量。 Th ... -
多线程之并发流程控制方式
2010-12-21 11:41 1594多线程间经常需要协调 ... -
zz Java 理论与实践: 正确使用 Volatile 变量
2010-12-18 15:47 1080原文来自 http://www.ibm.com/devel ... -
多线程之数据同步方式
2010-12-18 14:56 1624多线程出现后,最大的 ... -
并发程序入门
2010-12-18 13:53 1435多线程是改进顺序执行 ...
相关推荐
C#的多线程机制探索 doc 一.多线程的概念 二.操纵一个线程 三.线程的同步和通讯——生产者和消费者 四、线程池和定时器——多线程的自动管理
在本文中,我们将深入浅出Java多线程编程的世界,探索多线程编程的基本概念、多线程编程的优点、多线程编程的缺点、多线程编程的应用场景、多线程编程的实现方法等内容。 一、多线程编程的基本概念 多线程编程是指...
设想一下,如果我们使用有任务就开启一个子线程处理,处理完成后,销毁子线程或等得子线程自然死亡,那么如果我们的任务所需时间比较短,但是任务数量比较多,那么更多的时间是花在线程的创建和结束上面,效率肯定...
线程、多线程和线程池面试专题.docx 网络编程面试专题.docx 腾讯Android社招面试源码相关11题+原理详解.docx 设计模式面试专题.docx 设计模式面试专题及答案.pdf 面试必备之乐观锁与悲观锁.pdf 高级面试题.docx
MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和 unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型 ## CPP线程池 ### 关键技术...
由于此代码多用C++11的模板以及新特性,因此对于一般开发人员略有难度,且由于是在线程池基础上进一步探索定时任务的实现方式,难度进一步加大。关键点是先明白线程池的运作方式。然后再针对定时任务的处理实际上也...
《Android开发艺术探索》是一本Android.../ 496 15.1.5 List View和Bitmap优化 / 501 15.1.6 线程优化 / 501 15.1.7 一些性能优化建议 / 501 15.2 内存泄露分析之MAT工具 / 502 15.3 提高程序的可维护性 / 506
多线程 源码 basic-of-java java 基础:算法、多线程、锁、AQS、JUC、NIO、一些 JDK 源码、一些问题总结 记录自己对于 java 的一些探索和思路: 多线程、锁、map、list 的一些实现 网络编程中的一些 tips,如 nio ...
第11章 Android的线程和线程池 / 391 11.1 主线程和子线程 / 392 11.2 Android中的线程形态 / 392 11.2.1 Async Task / 392 11.2.2 Async Task的工作原理 / 395 11.2.3 Handler Thread / 402 11.2.4 ...
第11章 Android的线程和线程池 391 11.1 主线程和子线程 392 11.2 Android中的线程形态 392 11.2.1 AsyncTask 392 11.2.2 AsyncTask的工作原理 395 11.2.3 HandlerThread 402 11.2.4 IntentService 403 11.3 ...
3、线程池高效操作,是支持线程优先级的哦; 4、高级表格自动调整列宽; 5、封装了各种常见命令,1个模块足矣··· 说了这么多,无图无真相! 1、框架基于HPSOCKET,伤神小怪兽大神的作品,完美!高效的网络通信...
网上有很多热心易友分享了他们的例子,非常精彩,能够学到各位大佬的设计思路,自己也在模仿中慢慢探索···但是,总是带上了N多个模块···额···每次内心都呐喊为什么有那么多的模块··· 那么,这个帖子有...
│ │ 9.JAVA并发编程之多线程并发同步业务场景与解决方案.wmv │ │ │ ├─10.微服务架构之Spring Cloud Eureka 场景分析与实战 │ │ 10.微服务架构之Spring Cloud Eureka 场景分析与实战.wmv │ │ │ ├─11....
《CLR via C#(第3版) 》针对.NET Framework 4.0和多核编程进行了全面更新和修订,是帮助读者深入探索和掌握公共语言运行时、C#和.NET开发的重要参考,同时也是帮助开发人员构建任何一种应用程序(如Microsoft ...
26.9 线程池如何管理线程 26.9.1 设置线程池限制 26.9.2 如何管理工作者线程 26.10 缓存线和伪共享 第27章 I/O限制的异步操作 27.1 Windows如何执行I/O操作 27.2 CLR的异步编程模型(APM) 27.3 ...
《CLR via C#(第3版) 》针对.NET Framework 4.0和多核编程进行了全面更新和修订,是帮助读者深入探索和掌握公共语言运行时、C#和.NET开发的重要参考,同时也是帮助开发人员构建任何一种应用程序(如Microsoft ...
《CLR via C#(第3版) 》针对.NET Framework 4.0和多核编程进行了全面更新和修订,是帮助读者深入探索和掌握公共语言运行时、C#和.NET开发的重要参考,同时也是帮助开发人员构建任何一种应用程序(如Microsoft ...
11.1.5 解决多线程问题 11.1.6 有趣的超时 11.1.7 使用HttpURLConnection 11.1.8 使用AndroidHttpClient 11.1.9 使用后台线程(AsyncTask) 11.1.10 使用AsyncTask处理配置更改 11.1.11 使用...