`
jsx112
  • 浏览: 308796 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

JDK1.5新特性 - 线程池详解三

阅读更多
简单的线程池实现
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。以下代码实现了具有线程池的工作队列。
public class WorkQueue
{
    private final int nThreads;
    private final PoolWorker[] threads;
    private final LinkedList queue;
    public WorkQueue(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new LinkedList();
        threads = new PoolWorker[nThreads];
        for (int i=0; i<nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }
    public void execute(Runnable r) {
        synchronized(queue) {
            queue.addLast(r);
            queue.notify();
        }
    }
    private class PoolWorker extends Thread {
        public void run() {
            Runnable r;
            while (true) {
                synchronized(queue) {
                    while (queue.isEmpty()) {
                        try
                        {
                            queue.wait();
                        }
                        catch (InterruptedException ignored)
                        {
                        }
                    }
                    r = (Runnable) queue.removeFirst();
                }
                // If we don’t catch RuntimeException,
                // the pool could leak threads
                try {
                    r.run();
                }
                catch (RuntimeException e) {
                    // You might want to log something here
                }
            }
        }
    }
}
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。
用线程池执行任务
如果你开发项目的时候用到很多的short-lived任务,这里推荐使用“线程池”这项技术。你可以创建一个线程池来执行池中的的任务,来取代每次执行任务是都要为新的任务来new和discard。如果一个线程在池中是可用状态,那么任务将立即执行。执行完成之后线程返回池中,否则,任务将一直等待直到有线程处在可用状态。

J2SE 5.0为大家提供了一个新的java.util.concurrent package,并且在这个报中提供了一个pre-built 的线程池架构。在java.util.concurrent中提供了一个Executor 接口,里面有一个execute的方法,参数是Runnable 类型
   public interface Executor {
     public void execute(Runnable command);
   }
使用线程池架构,你就必须创建一个Executor实例,然后你给他分配一些runnable任务,例如:
Java代码 
Executor executor = ...;  
executor.execute(aRunnable1);  
executor.execute(aRunnable2); 
   Executor executor = ...;
   executor.execute(aRunnable1);
   executor.execute(aRunnable2);
然后你创建或者找到Executor的实现类,实现类可以立即(或者连续)执行分配的任务,例如:
Java代码 
class MyExecutor implements Executor {  
    public void execute(Runnable r) {  
        new Thread(r).start();  
    }  

   class MyExecutor implements Executor {
       public void execute(Runnable r) {
           new Thread(r).start();
       }
   }
concurrency utilities也包括了一个ThreadPoolExecutor类,它提供了很多对线程的一般性操作,提供了四个构造函数,每个都可以指定如:线程池大小,持续时间,一个线程factory,和拒绝线程的handler。
Java代码 
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue)  
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory)  
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          RejectedExecutionHandler handler)  
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler) 
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue)
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory)
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             RejectedExecutionHandler handler)
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler)
但是你不必声明构造函数,Executors类会为你创建一个线程池。在一种最简单的情况下,你在Executors类中声明了newFixedThreadPool方法,并且在池中分配了许多线程。你可以使用ExecutorService(继承Executor的一个接口),去execute和submit 那些Runnable任务,使用ExecutorService中的submit方法可以得到一个返回结果,当然submit也可以返回一个Future对象用来检查任务是否执行。
让我们来先做一个Runnable类,名字为NamePrinter,它通知你运行、暂停、和耗费的时间。
Java代码 
public class NamePrinter implements Runnable {  
  private final String name;  
  private final int delay;  
  public NamePrinter(String name, int delay) {  
    this.name = name;  
    this.delay = delay;  
  }  
  public void run() {  
    System.out.println("Starting: " + name);  
    try {  
      Thread.sleep(delay);  
    } catch (InterruptedException ignored) {  
    }  
    System.out.println("Done with: " + name);  
  }  

   public class NamePrinter implements Runnable {
     private final String name;
     private final int delay;
     public NamePrinter(String name, int delay) {
       this.name = name;
       this.delay = delay;
     }
     public void run() {
       System.out.println("Starting: " + name);
       try {
         Thread.sleep(delay);
       } catch (InterruptedException ignored) {
       }
       System.out.println("Done with: " + name);
     }
   }
然后下面是我们测试的项目UsePool,它创建一个有三个线程的线程池,分配了10个任务给它(运行10次NamePrinter),UsePool在被shutdown 和 awaitTermination之前将等待并执行分配的任务。一个ExecutorService必须要在terminated之前执行shutdown,shutdownNow方法是立即尝试shutdown操作。shutdownNow 方法将返回没有被执行的任务。
Java代码 
import java.util.concurrent.*;  
import java.util.Random;  
public class UsePool {  
  public static void main(String args[]) {  
    Random random = new Random();  
    ExecutorService executor =   
            Executors.newFixedThreadPool(3);  
    // Sum up wait times to know when to shutdown  
    int waitTime = 500;  
    for (int i=0; i<10; i++) {  
      String name = "NamePrinter " + i;  
      int time = random.nextInt(1000);  
      waitTime += time;  
      Runnable runner = new NamePrinter(name, time);  
      System.out.println("Adding: " + name + " / " + time);  
      executor.execute(runner);  
    }  
    try {  
      Thread.sleep(waitTime);  
      executor.shutdown();  
      executor.awaitTermination  
              (waitTime, TimeUnit.MILLISECONDS);  
    } catch (InterruptedException ignored) {  
    }  
    System.exit(0);  
  }  

   import java.util.concurrent.*;
   import java.util.Random;
   public class UsePool {
     public static void main(String args[]) {
       Random random = new Random();
       ExecutorService executor =
               Executors.newFixedThreadPool(3);
       // Sum up wait times to know when to shutdown
       int waitTime = 500;
       for (int i=0; i<10; i++) {
         String name = "NamePrinter " + i;
         int time = random.nextInt(1000);
         waitTime += time;
         Runnable runner = new NamePrinter(name, time);
         System.out.println("Adding: " + name + " / " + time);
         executor.execute(runner);
       }
       try {
         Thread.sleep(waitTime);
         executor.shutdown();
         executor.awaitTermination
                 (waitTime, TimeUnit.MILLISECONDS);
       } catch (InterruptedException ignored) {
       }
       System.exit(0);
     }
    }
输出的结果是:
Adding: NamePrinter 0 / 30
Adding: NamePrinter 1 / 727
Adding: NamePrinter 2 / 980
Starting: NamePrinter 0
Starting: NamePrinter 1
Starting: NamePrinter 2
Adding: NamePrinter 3 / 409
Adding: NamePrinter 4 / 49
Adding: NamePrinter 5 / 802
Adding: NamePrinter 6 / 211
Adding: NamePrinter 7 / 459
Adding: NamePrinter 8 / 994
Adding: NamePrinter 9 / 459
Done with: NamePrinter 0
Starting: NamePrinter 3
Done with: NamePrinter 3
Starting: NamePrinter 4
Done with: NamePrinter 4
Starting: NamePrinter 5
Done with: NamePrinter 1
Starting: NamePrinter 6
Done with: NamePrinter 6
Starting: NamePrinter 7
Done with: NamePrinter 2
Starting: NamePrinter 8
Done with: NamePrinter 5
Starting: NamePrinter 9
Done with: NamePrinter 7
Done with: NamePrinter 9
Done with: NamePrinter 8
注意前三个NamePrinter对象启动的非查的快,之后的NamePrinter对象每次启动都要等待前面的执行完成。
在J2SE 5.0有非常多的pooling framework可以用,例如,你可以创建一个scheduled线程池……
更多信息还是看官方的concurrency utilities,地址:http://java.sun.com/j2se/1.5.0/docs/guide/concurrency/
public class PoolAsynService extends BaseService implements Runnable {
private Thread thread = new Thread(this);
private List waitToList = (List) Collections.synchronizedList(new LinkedList());
// ////////////线程池参数/////////////////
private int corePoolSize = 5;// : 线程池维护线程的最少数量
private int maximumPoolSize = 10;// :线程池维护线程的最大数量
private long keepAliveTime = 60;// : 线程池维护线程所允许的空闲时间
private TimeUnit unit = TimeUnit.SECONDS;// : 线程池维护线程所允许的空闲时间的单位
private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);// :
// 线程池所使用的缓冲队列
private RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();// :
// 线程池对拒绝任务的处理策略
// //////////线程池参数/////////////////
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue, handler);
public void run() {
while (!thread.isInterrupted()) {
if (!waitToList.isEmpty()) {
try {
threadPool.execute(new Executor());
} catch (Exception e) {
logger.error("pool  execute error!!!", e);
}
}

Tools.block(25);
}
}
public void doAsync(Object executor, Object... objects) {
Throwable t = new Throwable();
StackTraceElement[] elements = t.getStackTrace();
StackTraceElement element = elements[1];
String method = element.getMethodName();
AsyncContext ctx = new AsyncContext();
ctx.args = objects;
ctx.executor = executor;
ctx.method = method;
if (method.endsWith("PA")) {
waitToList.add(ctx);
} else {
logger.warn("async method name is not good!");
}
}
private class AsyncContext {
String method;
Object executor;
Object[] args;
}
private class Executor implements Runnable {
public void run() {
if (!waitToList.isEmpty()) {
try {
Object task = waitToList.remove(0);
AsyncContext ctx = (AsyncContext) task;
doTaskByCtx(ctx);
} catch (Exception e) {
logger.error("async error!!!", e);
}
}
}
private void doTaskByCtx(AsyncContext ctx) {
String targetMethodName = ctx.method.substring(0, ctx.method
.length() - 2);
Method targetMethod = null;
Class clazz = null;
try {
clazz = ctx.executor.getClass();
Method[] methods = clazz.getDeclaredMethods();
if (methods != null) {
for (int i = 0; i < methods.length; i++) {
String name = methods[i].getName();
if (name.equals(targetMethodName)) {
targetMethod = methods[i];
break;
}
}
if (targetMethod != null) {
targetMethod.invoke(ctx.executor, ctx.args);
}
}
} catch (Exception e) {
logger.error(
"do async fail! " + clazz + ":" + targetMethodName, e);
}
}
}
@Override
public void destroy() {
thread.interrupt();
threadPool.shutdown();
logger.info("thread pool asynService shut down");
}
@Override
public void init() {
thread.start();
logger.info("thread pool asynService start");
}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics