`

ThreadPool的实现

阅读更多

最近看hadoop的时候,无意中看到了concurrent包中的类,于是打算好好研究一下线程安全方面的东西。先自己实现一个线程池。然后和sun自带的threadpool比较一下,看自己的实现有什么问题。

 1. ThreadPool 类,用于存储工作线程,在构造函数中创建线程,并启动线程

package com.fnk.threadpool;

import java.util.Vector;

public class ThreadPool {
	private Vector<Thread> threads;
	private static final int DEFAULT_THREAD_SIZE = 5;
	
	ThreadPool(ThreadWorkQueue workQueue) throws InvalidThreadParamException{
		this(workQueue,DEFAULT_THREAD_SIZE);
	}
	
	ThreadPool(ThreadWorkQueue workQueue,int threadSize) throws InvalidThreadParamException{
		if(workQueue == null || threadSize <= 0){
			throw new InvalidThreadParamException();
		}
		this.threads = new Vector<Thread>();
		for(int i = 0 ; i < threadSize; i++){
			threads.add(new WorkThread(workQueue));
			threads.get(i).start();
		}
	}
}

 

 2.任务队列类

package com.fnk.threadpool;

import java.util.Vector;
/*
 * 任务队列类,用于存储任务。先到的任务,先执行
 */
public class ThreadWorkQueue {
	private Vector<WorkIntf> works;
	public  Object mutex = new Object();   
	
	ThreadWorkQueue(){
		works = new Vector<WorkIntf>();
	}
	/*
	 * 任务进队列,如果原来的队列中的任务数为0,唤醒任务线程
	 */
	public void enQueue(WorkIntf work){
		if(works.size() == 0){
			works.add(work);
			synchronized (mutex) {
				mutex.notifyAll();
			}
		}else{
			works.add(work);
		}
		
	}
	/*
	 * 任务出队列,如果队列中的任务数为0,让线程等待
	 */
	public WorkIntf deQueue(){
		if(works.size() == 0){
			synchronized (mutex) {
				try {
					mutex.wait();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			return null;
		}else{
			return works.remove(0);
		}
	}
}

 

 3.任务接口类

package com.fnk.threadpool;

public interface WorkIntf {
	public boolean doWork();
}

 4. 工作线程类

package com.fnk.threadpool;
//任务线程类 
public class WorkThread extends Thread {
	ThreadWorkQueue workQueue;

	WorkThread(ThreadWorkQueue workQueue) {
		this.workQueue = workQueue;
	}
	
	public void run() {
		while (true) {
			WorkIntf work = null;
			//获取任务,如果任务队列中,
			work = workQueue.deQueue();
			//如果 有任务 ,那么就工作 
			if (work != null) {
				work.doWork();
			}
		}
	}
}

   5. 异常类

package com.fnk.threadpool;

public class InvalidThreadParamException extends Exception {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public InvalidThreadParamException() {
		super();
	}

	public InvalidThreadParamException(String message) {
		super(message);
	}

	public InvalidThreadParamException(String message, Throwable cause) {
		super(message, cause);
	}

	public InvalidThreadParamException(Throwable cause) {
		super(cause);
	}
}

  6. 测试

package com.fnk.threadpool;

public class TestThreadPool {
	public static void main(String[] args) {
		
		try {
			ThreadWorkQueue workQueue = new ThreadWorkQueue();
			ThreadPool tp = new ThreadPool(workQueue);
			for(int i = 0; i < 10 ; i++){
				workQueue.enQueue(new WorkImpl(i));
			}
			
		} catch (InvalidThreadParamException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

 

 总结:缺点是不能动态的控制线程的个数,在线程池开启的时候就必须创建所以线程。

 

 

 

下面这段代码可能会导致dequeue线程永远被阻塞,这种情况出现在dequeue变成wait状态,而works队列一直有数据,建议换成blockqueue,参照JDK的ThreadPool

	/*
	 * 任务进队列,如果原来的队列中的任务数为0,唤醒任务线程
	 */
	public void enQueue(WorkIntf work){
		if(works.size() == 0){
			works.add(work);
			synchronized (mutex) {
				mutex.notifyAll();
			}
		}else{
			works.add(work);
		}
		
	}
	/*
	 * 任务出队列,如果队列中的任务数为0,让线程等待
	 */
	public WorkIntf deQueue(){
		if(works.size() == 0){
			synchronized (mutex) {
				try {
					mutex.wait();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			return null;
		}else{
			return works.remove(0);
		}
	}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics