`
log_cd
  • 浏览: 1089930 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

java 线程池学习

阅读更多
     说明:Servlet也是多线程结构。servlet类中定义的成员变量,被所有的客户线程共享。当容器同时收到对同一Servlet的多个请求,那这个Servlet的service方法将在多线程中并发的执行。
一、线程池示例
(1)threadpool.xml

<?xml version="1.0" encoding="UTF-8"?>
<config>
  <threadPool>
  <minPools>10</minPools> <!--线程池最小线程-->
  <maxPools>100</maxPools> <!--线程池最大线程-->
  <checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟-->
  </threadPool>
</config>


(2)解析XML文件

import java.io.InputStream;
import java.util.Iterator;

import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.core.io.ClassPathResource;

public class XMLReader {

	private Document document;

	private static final XMLReader instance = 
		new XMLReader(); 

		/** 
		* 私有的默认构造子 
		*/ 
		private XMLReader() {
			loadXML();
		} 
		
		/** 
		* 静态工厂方法 
		*/ 
		public static XMLReader getInstance() 
		{
		return instance; 
		}

		private void loadXML(){
			InputStream is = null;
			SAXReader reader =null;
			try {
				is = (new ClassPathResource("threadpool.xml")).getInputStream();
				reader = new SAXReader(); 
				document = reader.read(is);
				is.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		/**
		 * 读取指定值
		 * @param name
		 * @return
		 */
		public  String getThreadPoolPara(String name){
			String str = "";
			try {
				Element root = document.getRootElement(); // 获得根元素
				Iterator lv = root.elementIterator("threadPool");
				Element el = null;
				while (lv.hasNext()) {
					el = (Element) lv.next();
					str = el.element(name).getText();
				}
			} catch (Exception e) {
				System.out.println(e.toString());
			}
			return str;
		}
		
}


(3)工作线程

class WorkThread extends Thread {
	private boolean runningFlag;

	private String argument;

	public boolean isRunning() {
		return runningFlag;
	}

	public synchronized void setRunning(boolean flag) {
		runningFlag = flag;
		if (flag)
			this.notify();
	}

	public String getArgument() {
		return this.argument;
	}

	public void setArgument(String string) {
		argument = string;
	}

	public WorkThread(int threadNumber) {
		runningFlag = false;
		System.out.println("thread " + threadNumber + "started.");
	}

	public synchronized void run() {
		try {
			while (true) {
				if (!runningFlag) {
					this.wait();
				} else {
					System.out.println("processing " + getArgument()
							+ "... done.");
					sleep(5000);
					System.out.println("Thread is sleeping...");
					setRunning(false);
				}
			}
		} catch (InterruptedException e) {
			System.out.println("Interrupt");
		}
	}
} 


(4)管理线程池

import java.util.*;

class ThreadPoolManager {
	
	private int maxPools;
	private int minPools;
	private int checkThreadPeriod;
//	private java.util.Timer timer = null;
	public Vector vector;

	@SuppressWarnings("unchecked")
	public ThreadPoolManager() {
		setMaxPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("maxPools")));
		setMinPools(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("minPools")));
		setCheckThreadPeriod(Integer.parseInt(XMLReader.getInstance().getThreadPoolPara("checkThreadPeriod")));
		System.out.println("Starting thread pool...");
		vector = new Vector();
		for (int i = 1; i <= minPools; i++) {
			WorkThread thread = new WorkThread(i);
			vector.addElement(thread);
			thread.start();
		}
//		timer = new Timer(true);
//		timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod);
	}

	@SuppressWarnings("unchecked")
	public void process(String argument) {
		int i;
		for (i = 0; i < vector.size(); i++) {
			
			WorkThread currentThread = (WorkThread) vector.elementAt(i);
			if (!currentThread.isRunning()) {
				System.out.println("Thread " + (i + 1) + " is processing:"
						+ argument);
				currentThread.setArgument(argument);
				currentThread.setRunning(true);
				return;
			}
			
			if(i == vector.size()-1){//没的空闲线程了,就新建一个
				if(vector.size() < maxPools){
					WorkThread thread = new WorkThread(i);
					vector.addElement(thread);
					thread.setArgument(argument);
					thread.setRunning(true);
					thread.start();
				}
			}
		}
		
		if (i == maxPools) {
			System.out.println("pool is full, try in another time.");
		}
	}

	public int getCheckThreadPeriod() {
		return checkThreadPeriod;
	}

	public void setCheckThreadPeriod(int checkThreadPeriod) {
		this.checkThreadPeriod = checkThreadPeriod;
	}

	public int getMaxPools() {
		return maxPools;
	}

	public void setMaxPools(int maxPools) {
		this.maxPools = maxPools;
	}

	public int getMinPools() {
		return minPools;
	}

	public void setMinPools(int minPools) {
		this.minPools = minPools;
	}
}// end of class ThreadPoolManager


(5)调用

	public static void main(String[] args) {
		try {
			BufferedReader br = new BufferedReader(new InputStreamReader(
					System.in));
			String s;
			ThreadPoolManager manager = new ThreadPoolManager();
			while ((s = br.readLine()) != null) {
				manager.process(s);
			}
		} catch (IOException e) {
		}
	}


(6)具有线程池的工作队列

    我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。

public class WorkQueue
{
    private final int nThreads;
    private final PoolWorker[] threads;
    private final LinkedList queue;
    public WorkQueue(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new LinkedList();
        threads = new PoolWorker[nThreads];
        for (int i=0; i<nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }
    public void execute(Runnable r) {
        synchronized(queue) {
            queue.addLast(r);
            queue.notify();
        }
    }
    private class PoolWorker extends Thread {
        public void run() {
            Runnable r;
            while (true) {
                synchronized(queue) {
                    while (queue.isEmpty()) {
                        try
                        {
                            queue.wait();
                        }
                        catch (InterruptedException ignored)
                        {
                        }
                    }
                    r = (Runnable) queue.removeFirst();
                }
                // If we don't catch RuntimeException, 
                // the pool could leak threads
                try {
                    r.run();
                }
                catch (RuntimeException e) {
                    // You might want to log something here
                }
            }
        }
    }
}


大多数专家建议使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有难以捉摸的风险,只有在某些特定条件下使用该方法才是合适的。另一方面,如果使用得当, notify() 具有比 notifyAll() 更可取的性能特征;特别是, notify() 引起的环境切换要少得多,这一点在服务器应用程序中是很重要的。

二、如何停止线程
(1)运行状态下,通过置一个volatile变量值来结束一个循环线程。
(2)非运行状态下(调用sleep,wait,或者被I/O阻塞,可能是文件或者网络等等),使用interrupt(),让线程在run方法中停止。
    当interrupt()被调用的时候,InterruptedException将被抛出,所以你可以再run方法中捕获这个异常,让线程安全退出:
try {
   ....
   wait();
} catch (InterruptedException iex) {
   throw new RuntimeException("Interrupted",iex);
}

    当线程被I/O阻塞的时候,调用interrupt()的情况是依赖与实际运行的平台的。在Solaris和Linux平台上将会抛出InterruptedIOException的异常,但是Windows上面不会有这种异常。所以,我们处理这种问题不能依靠于平台的实现。
    也可以使用InterruptibleChannel接口。 实现了InterruptibleChannel接口的类可以在阻塞的时候抛出ClosedByInterruptException。
    需要注意一点,当线程处于写文件的状态时,调用interrupt()不会中断线程。
(3)一个示例
import java.io.BufferedReader;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.Channels;

public class InterruptInput {   
    static BufferedReader in = new BufferedReader(
            new InputStreamReader(
            Channels.newInputStream(
            (new FileInputStream(FileDescriptor.in)).getChannel())));
    
    public static void main(String args[]) {
        try {
            System.out.println("Enter lines of input (user ctrl+Z Enter to terminate):");
            System.out.println("(Input thread will be interrupted in 10 sec.)");
            // interrupt input in 10 sec
            (new TimeOut()).start();
            String line = null;
            while ((line = in.readLine()) != null) {
                System.out.println("Read line:'"+line+"'");
            }
        } catch (Exception ex) {
            System.out.println(ex.toString()); // java.nio.channels.ClosedChannelException
        }
    }
    
    public static class TimeOut extends Thread {
        int sleepTime = 10000;
        Thread threadToInterrupt = null;    
        public TimeOut() {
            // interrupt thread that creates this TimeOut.
            threadToInterrupt = Thread.currentThread();
            setDaemon(true);
        }
        
        public void run() {
            try {
                sleep(10000); // wait 10 sec
            } catch(InterruptedException ex) {/*ignore*/}
            threadToInterrupt.interrupt();
        }
    }
}

三、关于volatile
    volatile是一个类型修饰符(type specifier)。它是被设计用来修饰被不同线程访问和修改的变量。如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
    一般说来,volatile用在如下的几个地方:
1、中断服务程序中修改的供其它程序检测的变量需要加volatile;
2、多任务环境下各任务间共享的标志应该加volatile;
3、存储器映射的硬件寄存器通常也要加volatile说明,因为每次对它的读写都可能由不同意义;
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics