`
weina
  • 浏览: 142352 次
  • 性别: Icon_minigender_2
  • 来自: 北京
社区版块
存档分类
最新评论

concurent

    博客分类:
  • java
阅读更多

编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级 旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。

 

讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强 大,但想要用好还真不容易。从JDK 1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛Doug Lee)。

 

java.util.concurrent包分成了三个部分,分别是java.util.concurrent、 java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同 步互斥机制、线程安全的变量更新工具类、锁等等常用工具。

 

为了便于理解,本文使用一个例子来做说明,交代一下它的场景:

假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存 在环。为了提高检查的效率,考虑使用多线程。

 

1、Executors

通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的 ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到 ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了 Callable<T>的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用 ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方 法即可。

Java代码
  1. package  service;  
  2.   
  3. import  java.util.ArrayList;  
  4. import  java.util.List;  
  5. import  java.util.concurrent.ExecutionException;  
  6. import  java.util.concurrent.ExecutorService;  
  7. import  java.util.concurrent.Executors;  
  8. import  java.util.concurrent.Future;  
  9. import  java.util.concurrent.TimeUnit;  
  10.   
  11. /**  
  12.  * 线 程池服务类  
  13.  *   
  14.  * @author DigitalSonic  
  15.  */   
  16. public   class  ThreadPoolService {  
  17.     /**  
  18.      * 默 认线程池大小  
  19.      */   
  20.     public   static   final   int   DEFAULT_POOL_SIZE    =  5 ;  
  21.   
  22.     /**  
  23.      * 默 认一个任务的超时时间,单位为毫秒  
  24.      */   
  25.     public   static   final   long  DEFAULT_TASK_TIMEOUT =  1000 ;  
  26.   
  27.     private   int               poolSize             = DEFAULT_POOL_SIZE;  
  28.     private  ExecutorService  executorService;  
  29.   
  30.     /**  
  31.      * 根 据给定大小创建线程池  
  32.      */   
  33.     public  ThreadPoolService( int  poolSize) {  
  34.         setPoolSize(poolSize);  
  35.     }  
  36.   
  37.     /**  
  38.      * 使 用线程池中的线程来执行任务  
  39.      */   
  40.     public   void  execute(Runnable task) {  
  41.         executorService.execute(task);  
  42.     }  
  43.   
  44.     /**  
  45.      * 在 线程池中执行所有给定的任务并取回运行结果,使用默认超时时间  
  46.      *   
  47.      * @see #invokeAll(List, long)  
  48.      */   
  49.     public  List<Node> invokeAll(List<ValidationTask> tasks) {  
  50.         return  invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());  
  51.     }  
  52.   
  53.     /**  
  54.      * 在 线程池中执行所有给定的任务并取回运行结果  
  55.      *   
  56.      * @param timeout 以毫秒为单位的超时时间,小于0表示不设定超时  
  57.      * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)  
  58.      */   
  59.     public  List<Node> invokeAll(List<ValidationTask> tasks,  long  timeout) {  
  60.         List<Node> nodes = new  ArrayList<Node>(tasks.size());  
  61.         try  {  
  62.             List<Future<Node>> futures = null ;  
  63.             if  (timeout <  0 ) {  
  64.                 futures = executorService.invokeAll(tasks);  
  65.             } else  {  
  66.                 futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);  
  67.             }  
  68.             for  (Future<Node> future : futures) {  
  69.                 try  {  
  70.                     nodes.add(future.get());  
  71.                 } catch  (ExecutionException e) {  
  72.                     e.printStackTrace();  
  73.                 }  
  74.             }  
  75.         } catch  (InterruptedException e) {  
  76.             e.printStackTrace();  
  77.         }  
  78.         return  nodes;  
  79.     }  
  80.   
  81.     /**  
  82.      * 关 闭当前ExecutorService  
  83.      *   
  84.      * @param timeout 以毫秒为单位的超时时间  
  85.      */   
  86.     public   void  destoryExecutorService( long  timeout) {  
  87.         if  (executorService !=  null  && !executorService.isShutdown()) {  
  88.             try  {  
  89.                 executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);  
  90.             } catch  (InterruptedException e) {  
  91.                 e.printStackTrace();  
  92.             }  
  93.             executorService.shutdown();  
  94.         }  
  95.     }  
  96.   
  97.     /**  
  98.      * 关 闭当前ExecutorService,随后根据poolSize创建新的ExecutorService  
  99.      */   
  100.     public   void  createExecutorService() {  
  101.         destoryExecutorService(1000 );  
  102.         executorService = Executors.newFixedThreadPool(poolSize);  
  103.     }  
  104.   
  105.     /**  
  106.      * 调 整线程池大小  
  107.      * @see #createExecutorService()  
  108.      */   
  109.     public   void  setPoolSize( int  poolSize) {  
  110.         this .poolSize = poolSize;  
  111.         createExecutorService();  
  112.     }  
  113. }  
package service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 线程池服务类
 * 
 * @author DigitalSonic
 */
public class ThreadPoolService {
    /**
     * 默认线程池大小
     */
    public static final int  DEFAULT_POOL_SIZE    = 5;

    /**
     * 默认一个任务的超时时间,单位为毫秒
     */
    public static final long DEFAULT_TASK_TIMEOUT = 1000;

    private int              poolSize             = DEFAULT_POOL_SIZE;
    private ExecutorService  executorService;

    /**
     * 根据给定大小创建线程池
     */
    public ThreadPoolService(int poolSize) {
        setPoolSize(poolSize);
    }

    /**
     * 使用线程池中的线程来执行任务
     */
    public void execute(Runnable task) {
        executorService.execute(task);
    }

    /**
     * 在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间
     * 
     * @see #invokeAll(List, long)
     */
    public List<Node> invokeAll(List<ValidationTask> tasks) {
        return invokeAll(tasks, DEFAULT_TASK_TIMEOUT * tasks.size());
    }

    /**
     * 在线程池中执行所有给定的任务并取回运行结果
     * 
     * @param timeout 以毫秒为单位的超时时间,小于0表示不设定超时
     * @see java.util.concurrent.ExecutorService#invokeAll(java.util.Collection)
     */
    public List<Node> invokeAll(List<ValidationTask> tasks, long timeout) {
        List<Node> nodes = new ArrayList<Node>(tasks.size());
        try {
            List<Future<Node>> futures = null;
            if (timeout < 0) {
                futures = executorService.invokeAll(tasks);
            } else {
                futures = executorService.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS);
            }
            for (Future<Node> future : futures) {
                try {
                    nodes.add(future.get());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return nodes;
    }

    /**
     * 关闭当前ExecutorService
     * 
     * @param timeout 以毫秒为单位的超时时间
     */
    public void destoryExecutorService(long timeout) {
        if (executorService != null && !executorService.isShutdown()) {
            try {
                executorService.awaitTermination(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.shutdown();
        }
    }

    /**
     * 关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService
     */
    public void createExecutorService() {
        destoryExecutorService(1000);
        executorService = Executors.newFixedThreadPool(poolSize);
    }

    /**
     * 调整线程池大小
     * @see #createExecutorService()
     */
    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
        createExecutorService();
    }
}
 

这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable<T>对象,等所有任 务完成后返回一个包含了执行结果的List<Future<T>>,每个Future.isDone()都是true,可以用 Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。

还有一个比较诡异的地方
本代码是在JDK 1.6下编译测试的,如果在JDK 1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了 Callable<Node>,可是它死活不认,类型不匹配,这时可以将参数声明由List<ValidationTask>改 为 List<Callable<Node>>。
造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是 invokeAll(Collection<? extends Callable<T>> tasks),而1.5里是invokeAll(Collection<Callable<T>> tasks)。网上也有人遇到类似的问题(invokeAll() is not willing to acept a Collection<Callable<T>> )。

 

和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行 完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。

 

2、Lock

多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是 java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能, 例如尝试获得锁(tryLock())。

 

使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:

Java代码
  1. Lock l = ...;   
  2. l.lock();  
  3. try  {  
  4.     // 执行操作   
  5. finally  {  
  6.     l.unlock();  
  7. }  
Lock l = ...; 
l.lock();
try {
	// 执行操作
} finally {
	l.unlock();
}

 

java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例 中使用了ReentrantLock进行节点锁定:

Java代码
  1. package  service;  
  2.   
  3. import  java.util.concurrent.locks.Lock;  
  4. import  java.util.concurrent.locks.ReentrantLock;  
  5.   
  6. /**  
  7.  * 节 点类  
  8.  *   
  9.  * @author DigitalSonic  
  10.  */   
  11. public   class  Node {  
  12.     private  String name;  
  13.     private  String wsdl;  
  14.     private  String result =  "PASS" ;  
  15.     private  String[] dependencies =  new  String[] {};  
  16.     private  Lock lock =  new  ReentrantLock();  
  17.     /**  
  18.      * 默 认构造方法  
  19.      */   
  20.     public  Node() {  
  21.     }  
  22.       
  23.     /**  
  24.      * 构 造节点对象,设置名称及WSDL  
  25.      */   
  26.     public  Node(String name, String wsdl) {  
  27.         this .name = name;  
  28.         this .wsdl = wsdl;  
  29.     }  
  30.   
  31.     /**  
  32.      * 返 回包含节点名称、WSDL以及验证结果的字符串  
  33.      */   
  34.     @Override   
  35.     public  String toString() {  
  36.         String toString = "Node: "  + name +  " WSDL: "  + wsdl +  " Result: "  + result;  
  37.         return  toString;  
  38.     }  
  39.       
  40.     // Getter & Setter   
  41.     public  String getName() {  
  42.         return  name;  
  43.     }  
  44.   
  45.     public   void  setName(String name) {  
  46.         this .name = name;  
  47.     }  
  48.   
  49.     public  String getWsdl() {  
  50.         return  wsdl;  
  51.     }  
  52.   
  53.     public   void  setWsdl(String wsdl) {  
  54.         this .wsdl = wsdl;  
  55.     }  
  56.   
  57.     public  String getResult() {  
  58.         return  result;  
  59.     }  
  60.   
  61.     public   void  setResult(String result) {  
  62.         this .result = result;  
  63.     }  
  64.   
  65.     public  String[] getDependencies() {  
  66.         return  dependencies;  
  67.     }  
  68.   
  69.     public   void  setDependencies(String[] dependencies) {  
  70.         this .dependencies = dependencies;  
  71.     }  
  72.   
  73.     public  Lock getLock() {  
  74.         return  lock;  
  75.     }  
  76.   
  77. }  
package service;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 节点类
 * 
 * @author DigitalSonic
 */
public class Node {
	private String name;
	private String wsdl;
	private String result = "PASS";
	private String[] dependencies = new String[] {};
	private Lock lock = new ReentrantLock();
	/**
	 * 默认构造方法
	 */
	public Node() {
	}
	
	/**
	 * 构造节点对象,设置名称及WSDL
	 */
	public Node(String name, String wsdl) {
		this.name = name;
		this.wsdl = wsdl;
	}

	/**
	 * 返回包含节点名称、WSDL以及验证结果的字符串
	 */
	@Override
	public String toString() {
		String toString = "Node: " + name + " WSDL: " + wsdl + " Result: " + result;
		return toString;
	}
	
	// Getter & Setter
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getWsdl() {
		return wsdl;
	}

	public void setWsdl(String wsdl) {
		this.wsdl = wsdl;
	}

	public String getResult() {
		return result;
	}

	public void setResult(String result) {
		this.result = result;
	}

	public String[] getDependencies() {
		return dependencies;
	}

	public void setDependencies(String[] dependencies) {
		this.dependencies = dependencies;
	}

	public Lock getLock() {
		return lock;
	}

}
Java代码
  1. package  service;  
  2.   
  3. import  java.util.concurrent.Callable;  
  4. import  java.util.concurrent.locks.Lock;  
  5. import  java.util.logging.Logger;  
  6.   
  7. import  service.mock.MockNodeValidator;  
  8.   
  9. /**  
  10.  * 执 行验证的任务类  
  11.  *   
  12.  * @author DigitalSonic  
  13.  */   
  14. public   class  ValidationTask  implements  Callable<Node> {  
  15.     private   static  Logger logger = Logger.getLogger( "ValidationTask" );  
  16.   
  17.     private  String        wsdl;  
  18.   
  19.     /**  
  20.      * 构 造方法,传入节点的WSDL  
  21.      */   
  22.     public  ValidationTask(String wsdl) {  
  23.         this .wsdl = wsdl;  
  24.     }  
  25.   
  26.     /**  
  27.      * 执 行针对某个节点的验证<br/>  
  28.      * 如 果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证  
  29.      */   
  30.     @Override   
  31.     public  Node call()  throws  Exception {  
  32.         Node node = ValidationService.NODE_MAP.get(wsdl);  
  33.         Lock lock = null ;  
  34.         logger.info("开始验证节点:"  + wsdl);  
  35.         if  (node !=  null ) {  
  36.             lock = node.getLock();  
  37.             if  (lock.tryLock()) {  
  38.                 // 当前没有其他线程验证该节点   
  39.                 logger.info("当前没有其他线程验证节点"  + node.getName() +  "["  + wsdl +  "]" );  
  40.                 try  {  
  41.                     Node result = MockNodeValidator.validateNode(wsdl);  
  42.                     mergeNode(result, node);  
  43.                 } finally  {  
  44.                     lock.unlock();  
  45.                 }  
  46.             } else  {  
  47.                 // 当前有别的线程正在验证该节点,等待结果   
  48.                 logger.info("当前有别的线程正在验证节点"  + node.getName() +  "["  + wsdl +  "], 等待结果" );  
  49.                 lock.lock();  
  50.                 lock.unlock();  
  51.             }  
  52.         } else  {  
  53.             // 从未进行过验证,这种情况应该只出现在系统启动初期   
  54.             // 这时是在做初始化,不应该有冲突发生   
  55.             logger.info("首次验证节点:"  + wsdl);  
  56.             node = MockNodeValidator.validateNode(wsdl);  
  57.             ValidationService.NODE_MAP.put(wsdl, node);  
  58.         }  
  59.         logger.info("节点"  + node.getName() +  "["  + wsdl +  "]验 证结束,验证结果:"  + node.getResult());  
  60.         return  node;  
  61.     }  
  62.   
  63.     /**  
  64.      * 将 src的内容合并进dest节点中,不进行深度拷贝  
  65.      */   
  66.     private  Node mergeNode(Node src, Node dest) {  
  67.         dest.setName(src.getName());  
  68.         dest.setWsdl(src.getWsdl());  
  69.         dest.setDependencies(src.getDependencies());  
  70.         dest.setResult(src.getResult());  
  71.         return  dest;  
  72.     }  
  73. }  
package service;

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;

import service.mock.MockNodeValidator;

/**
 * 执行验证的任务类
 * 
 * @author DigitalSonic
 */
public class ValidationTask implements Callable<Node> {
    private static Logger logger = Logger.getLogger("ValidationTask");

    private String        wsdl;

    /**
     * 构造方法,传入节点的WSDL
     */
    public ValidationTask(String wsdl) {
        this.wsdl = wsdl;
    }

    /**
     * 执行针对某个节点的验证<br/>
     * 如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证
     */
    @Override
    public Node call() throws Exception {
        Node node = ValidationService.NODE_MAP.get(wsdl);
        Lock lock = null;
        logger.info("开始验证节点:" + wsdl);
        if (node != null) {
            lock = node.getLock();
            if (lock.tryLock()) {
                // 当前没有其他线程验证该节点
                logger.info("当前没有其他线程验证节点" + node.getName() + "[" + wsdl + "]");
                try {
                    Node result = MockNodeValidator.validateNode(wsdl);
                    mergeNode(result, node);
                } finally {
                    lock.unlock();
                }
            } else {
                // 当前有别的线程正在验证该节点,等待结果
                logger.info("当前有别的线程正在验证节点" + node.getName() + "[" + wsdl + "],等待结果");
                lock.lock();
                lock.unlock();
            }
        } else {
            // 从未进行过验证,这种情况应该只出现在系统启动初期
            // 这时是在做初始化,不应该有冲突发生
            logger.info("首次验证节点:" + wsdl);
            node = MockNodeValidator.validateNode(wsdl);
            ValidationService.NODE_MAP.put(wsdl, node);
        }
        logger.info("节点" + node.getName() + "[" + wsdl + "]验证结束,验证结果:" + node.getResult());
        return node;
    }

    /**
     * 将src的内容合并进dest节点中,不进行深度拷贝
     */
    private Node mergeNode(Node src, Node dest) {
        dest.setName(src.getName());
        dest.setWsdl(src.getWsdl());
        dest.setDependencies(src.getDependencies());
        dest.setResult(src.getResult());
        return dest;
    }
}
   

请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用 重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。

 

讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、 notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运 行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的 Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:

Java代码
  1. class  BoundedBuffer {  
  2.   final  Lock lock =  new  ReentrantLock();  
  3.   final  Condition notFull  = lock.newCondition();   
  4.   final  Condition notEmpty = lock.newCondition();   
  5.   
  6.   final  Object[] items =  new  Object[ 100 ];  
  7.   int  putptr, takeptr, count;  
  8.   
  9.   public   void  put(Object x)  throws  InterruptedException {  
  10.     lock.lock();  
  11.     try  {  
  12.       while  (count == items.length)   
  13.         notFull.await();  
  14.       items[putptr] = x;   
  15.       if  (++putptr == items.length) putptr =  0 ;  
  16.       ++count;  
  17.       notEmpty.signal();  
  18.     } finally  {  
  19.       lock.unlock();  
  20.     }  
  21.   }  
  22.   
  23.   public  Object take()  throws  InterruptedException {  
  24.     lock.lock();  
  25.     try  {  
  26.       while  (count ==  0 )   
  27.         notEmpty.await();  
  28.       Object x = items[takeptr];   
  29.       if  (++takeptr == items.length) takeptr =  0 ;  
  30.       --count;  
  31.       notFull.signal();  
  32.       return  x;  
  33.     } finally  {  
  34.       lock.unlock();  
  35.     }  
  36.   }   
  37. }  
 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }

说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然 我都拿到锁了,那也就是说验证已经结束了。。。

 

3、并发集合类

集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是, 在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如 ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场 景,开发者可以用它们替换java.util包中的相应集合类。

 

CopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整 个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用, 下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17 行),具体的使用是在上面的ValidationTask中。

Java代码
  1. package  service;  
  2.   
  3. import  java.util.ArrayList;  
  4. import  java.util.List;  
  5. import  java.util.Map;  
  6. import  java.util.concurrent.ConcurrentHashMap;  
  7.   
  8. /**  
  9.  * 执 行验证的服务类  
  10.  *   
  11.  * @author DigitalSonic  
  12.  */   
  13. public   class  ValidationService {  
  14.     /**  
  15.      * 全 局节点表  
  16.      */   
  17.     public   static   final  Map<String, Node> NODE_MAP =  new  ConcurrentHashMap<String, Node>();  
  18.   
  19.     private  ThreadPoolService threadPoolService;  
  20.       
  21.     public  ValidationService(ThreadPoolService threadPoolService) {  
  22.         this .threadPoolService = threadPoolService;  
  23.     }  
  24.   
  25.     /**  
  26.      * 给 出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点  
  27.      *   
  28.      * @param wsdl 入口节点WSDL  
  29.      */   
  30.     public   void  validate(List<String> wsdl) {  
  31.         List<String> visitedNodes = new  ArrayList<String>();  
  32.         List<String> nextRoundNodes = new  ArrayList<String>();  
  33.   
  34.         nextRoundNodes.addAll(wsdl);  
  35.         while  (nextRoundNodes.size() >  0 ) {  
  36.             List<ValidationTask> tasks = getTasks(nextRoundNodes);  
  37.             List<Node> nodes = threadPoolService.invokeAll(tasks);  
  38.   
  39.             visitedNodes.addAll(nextRoundNodes);  
  40.             nextRoundNodes.clear();  
  41.             getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);  
  42.         }  
  43.     }  
  44.   
  45.     private  List<String> getNextRoundNodes(List<Node> nodes,  
  46.             List<String> visitedNodes, List<String> nextRoundNodes) {  
  47.         for  (Node node : nodes) {  
  48.             for  (String wsdl : node.getDependencies()) {  
  49.                 if  (!visitedNodes.contains(wsdl)) {  
  50.                     nextRoundNodes.add(wsdl);  
  51.                 }  
  52.             }  
  53.         }  
  54.         return  nextRoundNodes;  
  55.     }  
  56.   
  57.     private  List<ValidationTask> getTasks(List<String> nodes) {  
  58.         List<ValidationTask> tasks = new  ArrayList<ValidationTask>(nodes.size());  
  59.         for  (String wsdl : nodes) {  
  60.             tasks.add(new  ValidationTask(wsdl));  
  61.         }  
  62.         return  tasks;  
  63.     }  
  64. }  
package service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 执行验证的服务类
 * 
 * @author DigitalSonic
 */
public class ValidationService {
	/**
	 * 全局节点表
	 */
	public static final Map<String, Node> NODE_MAP = new ConcurrentHashMap<String, Node>();

	private ThreadPoolService threadPoolService;
	
	public ValidationService(ThreadPoolService threadPoolService) {
		this.threadPoolService = threadPoolService;
	}

	/**
	 * 给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点
	 * 
	 * @param wsdl 入口节点WSDL
	 */
	public void validate(List<String> wsdl) {
		List<String> visitedNodes = new ArrayList<String>();
		List<String> nextRoundNodes = new ArrayList<String>();

		nextRoundNodes.addAll(wsdl);
		while (nextRoundNodes.size() > 0) {
			List<ValidationTask> tasks = getTasks(nextRoundNodes);
			List<Node> nodes = threadPoolService.invokeAll(tasks);

			visitedNodes.addAll(nextRoundNodes);
			nextRoundNodes.clear();
			getNextRoundNodes(nodes, visitedNodes, nextRoundNodes);
		}
	}

	private List<String> getNextRoundNodes(List<Node> nodes,
			List<String> visitedNodes, List<String> nextRoundNodes) {
		for (Node node : nodes) {
			for (String wsdl : node.getDependencies()) {
				if (!visitedNodes.contains(wsdl)) {
					nextRoundNodes.add(wsdl);
				}
			}
		}
		return nextRoundNodes;
	}

	private List<ValidationTask> getTasks(List<String> nodes) {
		List<ValidationTask> tasks = new ArrayList<ValidationTask>(nodes.size());
		for (String wsdl : nodes) {
			tasks.add(new ValidationTask(wsdl));
		}
		return tasks;
	}
}

 

4、AtomicInteger

对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++ --操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在 java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。

 

以AtomicInteger为例,提供了代替++ --的getAndIncrement()、incrementAndGet()、getAndDecrement()和 decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。

 

下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调 用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。

Java代码
  1. package  service.mock;  
  2.   
  3. import  java.util.ArrayList;  
  4. import  java.util.HashMap;  
  5. import  java.util.List;  
  6. import  java.util.Map;  
  7. import  java.util.concurrent.atomic.AtomicInteger;  
  8. import  java.util.logging.Logger;  
  9.   
  10. import  service.Node;  
  11.   
  12. /**  
  13.  * 模 拟执行节点验证的Mock类  
  14.  *   
  15.  * @author DigitalSonic  
  16.  */   
  17. public   class  MockNodeValidator {  
  18.     public   static   final  List<Node>         ENTRIES  =  new  ArrayList<Node>();  
  19.     private   static   final  Map<String, Node> NODE_MAP =  new  HashMap<String, Node>();  
  20.   
  21.     private   static  AtomicInteger           count    =  new  AtomicInteger( 0 );  
  22.     private   static  Logger                  logger   = Logger.getLogger( "MockNodeValidator" );  
  23.   
  24.     /*  
  25.      * 构 造模拟数据  
  26.      */   
  27.     static  {  
  28.         Node node0 = new  Node( "NODE0" "http://node0/check?wsdl" ); //入口 0   
  29.         Node node1 = new  Node( "NODE1" "http://node1/check?wsdl" );  
  30.         Node node2 = new  Node( "NODE2" "http://node2/check?wsdl" );  
  31.         Node node3 = new  Node( "NODE3" "http://node3/check?wsdl" );  
  32.         Node node4 = new  Node( "NODE4" "http://node4/check?wsdl" );  
  33.         Node node5 = new  Node( "NODE5" "http://node5/check?wsdl" );  
  34.         Node node6 = new  Node( "NODE6" "http://node6/check?wsdl" ); //入口 1   
  35.         Node node7 = new  Node( "NODE7" "http://node7/check?wsdl" );  
  36.         Node node8 = new  Node( "NODE8" "http://node8/check?wsdl" );  
  37.         Node node9 = new  Node( "NODE9" "http://node9/check?wsdl" );  
  38.   
  39.         node0.setDependencies(new  String[] { node1.getWsdl(), node2.getWsdl() });  
  40.         node1.setDependencies(new  String[] { node3.getWsdl(), node4.getWsdl() });  
  41.         node2.setDependencies(new  String[] { node5.getWsdl() });  
  42.         node6.setDependencies(new  String[] { node7.getWsdl(), node8.getWsdl() });  
  43.         node7.setDependencies(new  String[] { node5.getWsdl(), node9.getWsdl() });  
  44.         node8.setDependencies(new  String[] { node3.getWsdl(), node4.getWsdl() });  
  45.   
  46.         node2.setResult("FAILED" );  
  47.   
  48.         NODE_MAP.put(node0.getWsdl(), node0);  
  49.         NODE_MAP.put(node1.getWsdl(), node1);  
  50.         NODE_MAP.put(node2.getWsdl(), node2);  
  51.         NODE_MAP.put(node3.getWsdl(), node3);  
  52.         NODE_MAP.put(node4.getWsdl(), node4);  
  53.         NODE_MAP.put(node5.getWsdl(), node5);  
  54.         NODE_MAP.put(node6.getWsdl(), node6);  
  55.         NODE_MAP.put(node7.getWsdl(), node7);  
  56.         NODE_MAP.put(node8.getWsdl(), node8);  
  57.         NODE_MAP.put(node9.getWsdl(), node9);  
  58.   
  59.         ENTRIES.add(node0);  
  60.         ENTRIES.add(node6);  
  61.     }  
  62.   
  63.     /**  
  64.      * 模 拟执行远程验证返回节点,每次调用等待500ms  
  65.      */   
  66.     public   static  Node validateNode(String wsdl) {  
  67.         Node node = cloneNode(NODE_MAP.get(wsdl));  
  68.         logger.info(stri
    分享到:
    评论

相关推荐

    Design patterns for concurent objects

    Design patterns Synchronization patterns Concurrency patterns Event-driven programming

    Subversion+TortoiseSVN+教程

    SVN服务端V1.7.4 + SVN客户端V1.7.6 + SVN教程 三合一,适用于win32系统。...我用过CVS(Concurent Version System,SVN的前身)和微软的VSS(Visual SourceSafe),相比之下,SVN具有更强大的功能和更简单的操作特性。

    JDK1.8:JDK1.8源码解析-源码解析

    JDK1.8源码分析 容器类分析 java并发工具类java.util.concurent(JUC) 其他

    sztejkat.utils.ipc-开源

    这是一个类似于java.util.concurent的JAVA程序包,但具有更紧凑的结构,良好的文档记录和一些有趣的功能,例如区域锁定,事件,具有带宽限制的委派工作人员等。

    pexec:并行执行任务

    执行 并行执行任务。 用法: Usage: pexec.py [options] [jobfile | -] Execute all the command lines found in jobfile in parallel....instead of jobfile to read all the ... Number of concurent processed

Global site tag (gtag.js) - Google Analytics