`
lzkyo
  • 浏览: 456462 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java 线程池

    博客分类:
  • Java
阅读更多

1)threadpool.xml

Java代码 复制代码
  1. <?xml version="1.0" encoding="UTF-8"?>   
  2. <config>   
  3.   <threadPool>   
  4.   <minPools>10</minPools> <!--线程池最小线程-->   
  5.   <maxPools>100</maxPools> <!--线程池最大线程-->   
  6.   <checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟-->   
  7.   </threadPool>   
  8. </config>  
<?xml version="1.0" encoding="UTF-8"?>
<config>
  <threadPool>
  <minPools>10</minPools> <!--线程池最小线程-->
  <maxPools>100</maxPools> <!--线程池最大线程-->
  <checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟-->
  </threadPool>
</config>



(2)解析XML文件

Java代码 复制代码
  1. import java.io.InputStream;   
  2. import java.util.Iterator;   
  3.   
  4. import org.dom4j.Document;   
  5. import org.dom4j.Element;   
  6. import org.dom4j.io.SAXReader;   
  7. import org.springframework.core.io.ClassPathResource;   
  8.   
  9. public class XMLReader {   
  10.   
  11.     private Document document;   
  12.   
  13.     private static final XMLReader instance =    
  14.         new XMLReader();    
  15.   
  16.         /**   
  17.         * 私有的默认构造子   
  18.         */    
  19.         private XMLReader() {   
  20.             loadXML();   
  21.         }    
  22.            
  23.         /**   
  24.         * 静态工厂方法   
  25.         */    
  26.         public static XMLReader getInstance()    
  27.         {   
  28.         return instance;    
  29.         }   
  30.   
  31.         private void loadXML(){   
  32.             InputStream is = null;   
  33.             SAXReader reader =null;   
  34.             try {   
  35.                 is = (new ClassPathResource("threadpool.xml")).getInputStream();   
  36.                 reader = new SAXReader();    
  37.                 document = reader.read(is);   
  38.                 is.close();   
  39.             } catch (Exception e) {   
  40.                 e.printStackTrace();   
  41.             }   
  42.         }   
  43.            
  44.         /**  
  45.          * 读取指定值  
  46.          * @param name  
  47.          * @return  
  48.          */  
  49.         public  String getThreadPoolPara(String name){   
  50.             String str = "";   
  51.             try {   
  52.                 Element root = document.getRootElement(); // 获得根元素   
  53.                 Iterator lv = root.elementIterator("threadPool");   
  54.                 Element el = null;   
  55.                 while (lv.hasNext()) {   
  56.                     el = (Element) lv.next();   
  57.                     str = el.element(name).getText();   
  58.                 }   
  59.             } catch (Exception e) {   
  60.                 System.out.println(e.toString());   
  61.             }   
  62.             return str;   
  63.         }   
  64.            
  65. }  
import java.io.InputStream;
import java.util.Iterator;

import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.core.io.ClassPathResource;

public class XMLReader {

	private Document document;

	private static final XMLReader instance = 
		new XMLReader(); 

		/** 
		* 私有的默认构造子 
		*/ 
		private XMLReader() {
			loadXML();
		} 
		
		/** 
		* 静态工厂方法 
		*/ 
		public static XMLReader getInstance() 
		{
		return instance; 
		}

		private void loadXML(){
			InputStream is = null;
			SAXReader reader =null;
			try {
				is = (new ClassPathResource("threadpool.xml")).getInputStream();
				reader = new SAXReader(); 
				document = reader.read(is);
				is.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		/**
		 * 读取指定值
		 * @param name
		 * @return
		 */
		public  String getThreadPoolPara(String name){
			String str = "";
			try {
				Element root = document.getRootElement(); // 获得根元素
				Iterator lv = root.elementIterator("threadPool");
				Element el = null;
				while (lv.hasNext()) {
					el = (Element) lv.next();
					str = el.element(name).getText();
				}
			} catch (Exception e) {
				System.out.println(e.toString());
			}
			return str;
		}
		
}



(3)工作线程

Java代码 复制代码
  1. class WorkThread extends Thread {   
  2.     private boolean runningFlag;   
  3.   
  4.     private String argument;   
  5.   
  6.     public boolean isRunning() {   
  7.         return runningFlag;   
  8.     }   
  9.   
  10.     public synchronized void setRunning(boolean flag) {   
  11.         runningFlag = flag;   
  12.         if (flag)   
  13.             this.notify();   
  14.     }   
  15.   
  16.     public String getArgument() {   
  17.         return this.argument;   
  18.     }   
  19.   
  20.     public void setArgument(String string) {   
  21.         argument = string;   
  22.     }   
  23.   
  24.     public WorkThread(int threadNumber) {   
  25.         runningFlag = false;   
  26.         System.out.println("thread " + threadNumber + "started.");   
  27.     }   
  28.   
  29.     public synchronized void run() {   
  30.         try {   
  31.             while (true) {   
  32.                 if (!runningFlag) {   
  33.                     this.wait();   
  34.                 } else {   
  35.                     System.out.println("processing " + getArgument()   
  36.                             + "... done.");   
  37.                     sleep(5000);   
  38.                     System.out.println("Thread is sleeping...");   
  39.                     setRunning(false);   
  40.                 }   
  41.             }   
  42.         } catch (InterruptedException e) {   
  43.             System.out.println("Interrupt");   
  44.         }   
  45.     }   
  46. }   
class WorkThread extends Thread {
	private boolean runningFlag;

	private String argument;

	public boolean isRunning() {
		return runningFlag;
	}

	public synchronized void setRunning(boolean flag) {
		runningFlag = flag;
		if (flag)
			this.notify();
	}

	public String getArgument() {
		return this.argument;
	}

	public void setArgument(String string) {
		argument = string;
	}

	public WorkThread(int threadNumber) {
		runningFlag = false;
		System.out.println("thread " + threadNumber + "started.");
	}

	public synchronized void run() {
		try {
			while (true) {
				if (!runningFlag) {
					this.wait();
				} else {
					System.out.println("processing " + getArgument()
							+ "... done.");
					sleep(5000);
					System.out.println("Thread is sleeping...");
					setRunning(false);
				}
			}
		} catch (InterruptedException e) {
			System.out.println("Interrupt");
		}
	}
} 



(4)管理线程池

Java代码 复制代码
  1. import java.util.*;   
  2.   
  3. class ThreadPoolManager {   
  4.        
  5.     private int maxPools;   
  6.     private int minPools;   
  7.     private int checkThreadPeriod;   
  8. //  private java.util.Timer timer = null;   
  9.     public Vector vector;   
  10.   
  11.     @SuppressWarnings("unchecked")   
  12.     public ThreadPoolManager() {   
  13.         setMaxPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("maxPools")));   
  14.         setMinPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("minPools")));   
  15.         setCheckThreadPeriod(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("checkThreadPeriod")));   
  16.         System.out.println("Starting thread pool...");   
  17.         vector = new Vector();   
  18.         for (int i = 1; i <= minPools; i++) {   
  19.             WorkThread thread = new WorkThread(i);   
  20.             vector.addElement(thread);   
  21.             thread.start();   
  22.         }   
  23. //      timer = new Timer(true);   
  24. //      timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);   
  25.     }   
  26.   
  27.     @SuppressWarnings("unchecked")   
  28.     public void process(String argument) {   
  29.         int i;   
  30.         for (i = 0; i < vector.size(); i++) {   
  31.                
  32.             WorkThread currentThread = (WorkThread) vector.elementAt(i);   
  33.             if (!currentThread.isRunning()) {   
  34.                 System.out.println("Thread " + (i + 1) + " is processing:"  
  35.                         + argument);   
  36.                 currentThread.setArgument(argument);   
  37.                 currentThread.setRunning(true);   
  38.                 return;   
  39.             }   
  40.                
  41.             if(i == vector.size()-1){//没的空闲线程了,就新建一个   
  42.                 if(vector.size() < maxPools){   
  43.                     WorkThread thread = new WorkThread(i);   
  44.                     vector.addElement(thread);   
  45.                     thread.setArgument(argument);   
  46.                     thread.setRunning(true);   
  47.                     thread.start();   
  48.                 }   
  49.             }   
  50.         }   
  51.            
  52.         if (i == maxPools) {   
  53.             System.out.println("pool is full, try in another time.");   
  54.         }   
  55.     }   
  56.   
  57.     public int getCheckThreadPeriod() {   
  58.         return checkThreadPeriod;   
  59.     }   
  60.   
  61.     public void setCheckThreadPeriod(int checkThreadPeriod) {   
  62.         this.checkThreadPeriod = checkThreadPeriod;   
  63.     }   
  64.   
  65.     public int getMaxPools() {   
  66.         return maxPools;   
  67.     }   
  68.   
  69.     public void setMaxPools(int maxPools) {   
  70.         this.maxPools = maxPools;   
  71.     }   
  72.   
  73.     public int getMinPools() {   
  74.         return minPools;   
  75.     }   
  76.   
  77.     public void setMinPools(int minPools) {   
  78.         this.minPools = minPools;   
  79.     }   
  80. }// end of class ThreadPoolManager  
import java.util.*;

class ThreadPoolManager {
	
	private int maxPools;
	private int minPools;
	private int checkThreadPeriod;
//	private java.util.Timer timer = null;
	public Vector vector;

	@SuppressWarnings("unchecked")
	public ThreadPoolManager() {
		setMaxPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("maxPools")));
		setMinPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("minPools")));
		setCheckThreadPeriod(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("checkThreadPeriod")));
		System.out.println("Starting thread pool...");
		vector = new Vector();
		for (int i = 1; i <= minPools; i++) {
			WorkThread thread = new WorkThread(i);
			vector.addElement(thread);
			thread.start();
		}
//		timer = new Timer(true);
//		timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);
	}

	@SuppressWarnings("unchecked")
	public void process(String argument) {
		int i;
		for (i = 0; i < vector.size(); i++) {
			
			WorkThread currentThread = (WorkThread) vector.elementAt(i);
			if (!currentThread.isRunning()) {
				System.out.println("Thread " + (i + 1) + " is processing:"
						+ argument);
				currentThread.setArgument(argument);
				currentThread.setRunning(true);
				return;
			}
			
			if(i == vector.size()-1){//没的空闲线程了,就新建一个
				if(vector.size() < maxPools){
					WorkThread thread = new WorkThread(i);
					vector.addElement(thread);
					thread.setArgument(argument);
					thread.setRunning(true);
					thread.start();
				}
			}
		}
		
		if (i == maxPools) {
			System.out.println("pool is full, try in another time.");
		}
	}

	public int getCheckThreadPeriod() {
		return checkThreadPeriod;
	}

	public void setCheckThreadPeriod(int checkThreadPeriod) {
		this.checkThreadPeriod = checkThreadPeriod;
	}

	public int getMaxPools() {
		return maxPools;
	}

	public void setMaxPools(int maxPools) {
		this.maxPools = maxPools;
	}

	public int getMinPools() {
		return minPools;
	}

	public void setMinPools(int minPools) {
		this.minPools = minPools;
	}
}// end of class ThreadPoolManager



(5)调用

Java代码 复制代码
  1. public static void main(String[] args) {   
  2.     try {   
  3.         BufferedReader br = new BufferedReader(new InputStreamReader(   
  4.                 System.in));   
  5.         String s;   
  6.         ThreadPoolManager manager = new ThreadPoolManager();   
  7.         while ((s = br.readLine()) != null) {   
  8.             manager.process(s);   
  9.         }   
  10.     } catch (IOException e) {   
  11.     }   
  12. }  
	public static void main(String[] args) {
		try {
			BufferedReader br = new BufferedReader(new InputStreamReader(
					System.in));
			String s;
			ThreadPoolManager manager = new ThreadPoolManager();
			while ((s = br.readLine()) != null) {
				manager.process(s);
			}
		} catch (IOException e) {
		}
	}



(6)具有线程池的工作队列

    我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。

Java代码 复制代码
  1. public class WorkQueue   
  2. {   
  3.     private final int nThreads;   
  4.     private final PoolWorker[] threads;   
  5.     private final LinkedList queue;   
  6.     public WorkQueue(int nThreads)   
  7.     {   
  8.         this.nThreads = nThreads;   
  9.         queue = new LinkedList();   
  10.         threads = new PoolWorker[nThreads];   
  11.         for (int i=0; i<nThreads; i++) {   
  12.             threads[i] = new PoolWorker();   
  13.             threads[i].start();   
  14.         }   
  15.     }   
  16.     public void execute(Runnable r) {   
  17.         synchronized(queue) {   
  18.             queue.addLast(r);   
  19.             queue.notify();   
  20.         }   
  21.     }   
  22.     private class PoolWorker extends Thread {   
  23.         public void run() {   
  24.             Runnable r;   
  25.             while (true) {   
  26.                 synchronized(queue) {   
  27.                     while (queue.isEmpty()) {   
  28.                         try  
  29.                         {   
  30.                             queue.wait();   
  31.                         }   
  32.                         catch (InterruptedException ignored)   
  33.                         {   
  34.                         }   
  35.                     }   
  36.                     r = (Runnable) queue.removeFirst();   
  37.                 }   
  38.                 // If we don't catch RuntimeException,    
  39.                 // the pool could leak threads   
  40.                 try {   
  41.                     r.run();   
  42.                 }   
  43.                 catch (RuntimeException e) {   
  44.                     // You might want to log something here   
  45.                 }   
  46.             }   
  47.         }   
  48.     }   
  49. }  
public class WorkQueue
{
    private final int nThreads;
    private final PoolWorker[] threads;
    private final LinkedList queue;
    public WorkQueue(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new LinkedList();
        threads = new PoolWorker[nThreads];
        for (int i=0; i<nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }
    public void execute(Runnable r) {
        synchronized(queue) {
            queue.addLast(r);
            queue.notify();
        }
    }
    private class PoolWorker extends Thread {
        public void run() {
            Runnable r;
            while (true) {
                synchronized(queue) {
                    while (queue.isEmpty()) {
                        try
                        {
                            queue.wait();
                        }
                        catch (InterruptedException ignored)
                        {
                        }
                    }
                    r = (Runnable) queue.removeFirst();
                }
                // If we don't catch RuntimeException, 
                // the pool could leak threads
                try {
                    r.run();
                }
                catch (RuntimeException e) {
                    // You might want to log something here
                }
            }
        }
    }
}



大多数专家建议使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有难以捉摸的风险,只有在某些特定条件下使用该方法才是合适的。另一方面,如果使用得当, notify() 具有比 notifyAll() 更可取的性能特征;特别是, notify() 引起的环境切换要少得多,这一点在服务器应用程序中是很重要的。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics