`
qq466862016
  • 浏览: 125842 次
  • 来自: 杭州
社区版块
存档分类
最新评论

1、Java5多线程---线程池

阅读更多
        JAVA语言中引入了线程(Thread)的概念,这让程序员极大方便的操作线程,但是也带来了很多的弊端,如数据共享、同步、死锁等问题。java虚拟机支持多个线程在应用程序上并发运行,这样大大的增加了处理器的吞吐量,不过线程的创建、调度、销毁等非常耗时而又耗内存的缺点,由此引入了线程池的概念。那什么是线程池呢?比如就是打个比方,每年九月份最忙最热闹的是大学校园,新生报道,对就是新生报道。我们把新生报道看成一个任务,每个任务在没有引入线程池的时候,会为每个任务创建一个线程,当任务完成的时候会把当前执行任务的线程销毁,你想想在现实的生活中,登记报道的工作人员也就是那几个人,那样工作人员不累死才怪,有了线程池的概念,会把对应的任务交个线程池,然后线程池会把任务分发到指定的空闲的线程进行执行任务,当任务执行完成后,不会销毁线程,而是要归还给线程池;如果当把任务提交给线程池的时候,这时恰当没有空闲的线程来执行此提交的任务,线程池会把这个提交的任务放在任务队列中,直到有空闲的线程来分发。线程池的引入大大的减少了线程的创建与销毁的开销。
下面我们来看下是如果在JAVA5中如何创建线程池
创建线程池我们可以用java.util.concurrent.Executors 类的静态方法进行创建线程池也可以用ThreadPoolExecutor 创建
 
1、创建单一线程的线程池
 
package mail;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 
 * @author dongtian
 * @date   2015年6月12日 下午2:52:32
 */
public class ThreadPool {
	public static void main(String[] args) throws Exception {
		//创建单一线程的线程池
		ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
		//将任务提交给线程(线程池会将任务分发给指定的空闲线程执行具体任务)
		newSingleThreadExecutor.execute(new Runnable() {
			public void run() {
				try {
					Thread.sleep(5000);
					System.err.println("我是单个线程的线程池中的一个线程" + Thread.currentThread().getName());
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		/**
		 * 注销线程池 此方法会等待线程正在执行任务完成
		 * 而对应的@shutdownNow()则不会等待此时线程执行的任务结束
		**/
		newSingleThreadExecutor.shutdown();
	}
}
 
2、创建线程数量固定大小的线程池
 
package mail;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * 
 * @author dongtian
 * @date   2015年6月12日 下午3:01:59
 */
public class FixedThreadPoolTest {

	
	public static void main(String[] args) {
		
		ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
		newFixedThreadPool.execute(new Runnable() {
			
			@Override
			public void run() {
				
				try {
					Thread.sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.err.println("我是fixedpool中线程  "+ Thread.currentThread().getName());
			}
		});
		
		newFixedThreadPool.shutdown();
	}
}
 
 3、创建可缓存的线程池
         这个线程池比较有意思  初始化的时候是没有一个线程
         没到将任务放在此线程,它就查看当前线程池中是否有空闲的线程,如果没有会创建一个线程放入此线程池
          然后执行任务,任务执行完成之后会 将刚才创建的线程放入线程池中。但是这个线程池不是常在的,  它会检查线程池中线程如果在60秒钟没有使用的话,会将此线程从缓存中删除
具体代码如下:
package mail;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/***
 * 可缓存的线程池
 * 
 * @author dongtian
 * @date   2015年6月12日 下午3:04:57
 */
public class CachedThreadPoolTest {

	public static void main(String[] args) {
		/**
		 * 创建一个可缓存的线程池
		 * 这个线程池比较有意思  初始化的时候是没有一个线程 
		 * 没到将任务放在此线程,它就查看当前线程池中是否有空闲的线程,如果没有会创建一个线程放入此线程池
		 * 然后执行任务,任务执行完成之后会 将刚才创建的线程放入线程池中。但是这个线程池不是常在的,它会检查
		 * 线程池中线程如果在60秒钟没有使用的话,会将此线程从缓存中删除
		 * 
		 */
		ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
		newCachedThreadPool.execute(new Runnable() {
			
			@Override
			public void run() {
				
			}
		});
		
		newCachedThreadPool.shutdown();
	}
}
 4、可任务时间调度的线程池
package mail;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 可定时调度任务的线程池
 * @author dongtian
 * @date   2015年6月12日 下午3:13:02
 */
public class ScheduledThreadPoolTest {

	public static void main(String[] args) {
		
		
		/***
		 * 创建可任务调度的线程池
		 */
		ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);
		/***
		 * 参数说明
		 * 1:Runnable 具体工作任务
		 * 2:delay    延时事件
		 * 3:timeUnit  就是 前面的那个delay数字代表是年 月日 等时间单位
		 * 
		 */
		newScheduledThreadPool.schedule(new Runnable() {
			
			@Override
			public void run() {
				
				System.err.println("我在调用5秒后执行....");
			}
		}, 5, TimeUnit.SECONDS);
		
		newScheduledThreadPool.shutdown();
	}
}
 二、像线程池中提交任务
把具体要执行的任务提交给线程池,让线程池自己来分发任务有两种方式提交任务,
1、无返回值的提交任务
 service.execute(Runnable able);
通过调用 service的 execute方法来提交任务到线程池,是没有返回值的,
如下通过execute方法提交任务到线程池 将没有返回值
newFixedThreadPool.execute(new Runnable() {
			
			@Override
			public void run() {
				
				try {
					Thread.sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				System.err.println("我是fixedpool中线程  "+ Thread.currentThread().getName());
			}
		});
 
 
2、有返回值的提交任务到线程池(获取反馈信息)
在实际的任务中我们想从任务执行完毕会有个指定的返回值,我们拿来结果来进行分析(如: 批量返送邮件,如果一次性发送100000封 我们要知道有多少发送成功,有多少发送失败)这就用到了带返回值。提交任务 方式为submit 任务接口必须是 Callable<V>具体代码如下:
package mail;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/***
 * 带有返回值的提交任务到线程池
 * @author dongtian
 * @date   2015年6月12日 下午3:32:08
 */
public class CallableTest {

	
	public static void main(String[] args) throws Exception {
		
		ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
		//带有返回值的提交任务到线程池
		Future<String> future = newFixedThreadPool.submit(new Callable<String>() {

			public String call() throws Exception {
				Thread.sleep(4000);
				return "冬天";
			}
		});
		
		//打印结果
		System.err.println(future.get());
		newFixedThreadPool.shutdown();
	}
}
 
 三、线程池的等待
在线程池中有个方法awaitTermination(delay,timeunit)等待多长时间,比如我们可以每秒查询当前线程池是否注销完成所有任务具体代码如下:
 
package mail;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/***
 * waitfor
 * @author dongtian
 * @date   2015年6月12日 下午5:12:39
 */
public class WaitTask {

	public static void main(String[] args) {
		
		
		ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 1, TimeUnit.SECONDS, new LinkedBlockingQueue());
		
		executor.submit(new Callable<String>() {

			@Override
			public String call() throws Exception {
				Thread.sleep(5000);
				return null;
			}
			
			
		});
		
		executor.submit(new Callable<String>() {
			
			@Override
			public String call() throws Exception {
				Thread.sleep(2000);
				return null;
			}
			
			
		});
		
		executor.submit(new Callable<String>() {
			
			@Override
			public String call() throws Exception {
				Thread.sleep(100);
				return null;
			}
			
			
		});
		
		executor.submit(new Callable<String>() {
			
			@Override
			public String call() throws Exception {
				Thread.sleep(100);
				return null;
			}
			
			
		});
		
		executor.submit(new Callable<String>() {
			
			@Override
			public String call() throws Exception {
				Thread.sleep(100);
				return null;
			}
			
			
		});
		
		executor.submit(new Callable<String>() {
			
			@Override
			public String call() throws Exception {
				Thread.sleep(100);
				return null;
			}
			
			
		});
		
		executor.submit(new Callable<String>() {
			
			@Override
			public String call() throws Exception {
				Thread.sleep(200000);
				return null;
			}
			
			
		});
		
		
		executor.shutdown();
		while (!executor.isTerminated()) {
			
			try {
				executor.awaitTermination(1, TimeUnit.SECONDS);
				System.err.println("阴湿傻吊赶快喊自己是傻吊,阴湿喊了" + executor.getCompletedTaskCount() +"次");
				if(executor.getCompletedTaskCount() >=6) {
					System.err.println("喊够了..阴湿脱衣服了......");
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
			
		}
		
		
	}
}
  四、实战
我们将模拟一个批量发送邮件的案例,来统计并行任务反馈信息,来统计发送成功多少封邮箱和发送失败多少封邮件,我们这里不用Executors的静态方法,我们采用 ThreadPoolExecutor类来创建一个线程池,构造方法初始化线程数 最大线程数,超出初始化的线程数的线程 执行完成任务存留时长,任务队列
1、模拟发送邮件
SendEmail 实现Callable接口 来模拟发送邮件的功能
package mail;

import java.util.concurrent.Callable;

public class SendEmailTask implements Callable<Boolean> {

	@Override
	public Boolean call() throws Exception {
		
		//模拟发送邮件
		Thread.sleep(200);
		if(Math.random() *0.5 > 0.2) {
			return false;
		}
		
		return true;
	}

}
 2、测试类
测试类用来模拟发送10000封邮件
package mail;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/***
 * 模拟发送邮件测试类
 * @author dongtian
 * @date   2015年6月12日 下午5:24:06
 */
public class SendEmailTest  {

	
	public static void main(String[] args) throws Exception {
		
		ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(30, 30, 1, TimeUnit.SECONDS, new LinkedBlockingQueue());
		
		List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(1000);
		for (int i = 0; i < 1000; i++) {
			futures.add(poolExecutor.submit(new SendEmailTask()));
		}
		System.err.println("shutdown .....");
		poolExecutor.shutdown();
		while (!poolExecutor.isTerminated()) {
			poolExecutor.awaitTermination(1, TimeUnit.SECONDS);
			System.err.println("已经完成    " + poolExecutor.getCompletedTaskCount());
			
			
		}
		
		int error = 0;
		int success = 0;
		for (Future<Boolean> future : futures) {
			
			if (future.get() == true) {
				
				success ++;
			} else {
				error ++;
			}
		}
		
		System.err.println("已经成功发送了 " + success +"封邮件 发送失败  " +error +"封");
	}
}
 
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics