`
jacally
  • 浏览: 762481 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

java 5.0 中的 同步(Concurrent) [转]

阅读更多

9.   同步(Concurrent)

    
1.      Executor接口

     Executor接口提供了一个类似于线程池的管理工具。用于只需要往Executor中提交Runnable对象,剩下的启动线程等工作,都会有对应的实现类来完成。ScheduledExecutorService比ExecutorService增加了,时间上的控制,即用户可以在提交的时候额外的定义该任务的启动时机,以及随后的执行间隔和延迟等。

     例子:

     任务:

     public class ETask implements Runnable{

          private int id = 0;

          public ETask(int id){

               this.id = id;

          }

          public void run(){

               try{

                   System.out.println(id+" Start");

                   Thread.sleep(1000);

                   System.out.println(id+" Do");

                   Thread.sleep(1000);

                   System.out.println(id+" Exit");

              }catch(Exception e){

                   e.printStackTrace();

              }

          }

     }

    

     测试类:

     public class ETest{

          public static void main(String[] args){       

              ExecutorService executor = Executors.newFixedThreadPool(2);

              for(int i=0;i<5;i++){

                   Runnable r = new ETask(i);

                   executor.execute(r);

              }

              executor.shutdown();

          }

     }

 

     输出:

     0 Start

     1 Start

     0 Do

     1 Do

     0 Exit

     2 Start

     1 Exit

     3 Start

     2 Do

     3 Do

     2 Exit

     3 Exit

     4 Start

     4 Do

     4 Exit

 

2.      Future和Callable

     Callable是一个类似于Runnable的接口,他与Runnable的区别是,她在执行完毕之后能够返回结果。Future用于获取线程的执行结果,或者取消已向Executor的任务。当我们通过Future提供的get()方法获取任务的执行结果时,如果任务没有完成,则调用get()方法的线程将会被阻塞,知道任务完成为止。一般我们都会使用Future的实现类FutureTask。

     例子:

     Callable对象:

     public class ETask implements Callable{

          private String id = null;

          public ETask(String id){

               this.id = id;

          }

    

          public String call(){

              try{

                   System.out.println(id+" Start");

                   Thread.sleep(1000);

                   System.out.println(id+" Do");

                   Thread.sleep(1000);

                   System.out.println(id+" Exit");          

              }catch(Exception e){

                   e.printStackTrace();

              }

              return id;

          }

     }

 

     测试类:

     public class ETest{

          public static void main(String[] args){       

              ExecutorService executor = Executors.newFixedThreadPool(2);

              for(int i=0;i<5;i++){           

                   try{

                       Callable c = new ETask(String.valueOf(i));

                        FutureTask ft = new FutureTask(c);

                        executor.execute(ft);

                        System.out.println("Finish:" + ft.get());         

                   }catch(Exception e){

                       e.printStackTrace();

                   }

              }

               executor.shutdown();

          }

     }

 

     输出:

     0 Start

     0 Do

     0 Exit

     Finish:0

     1 Start

     1 Do

     1 Exit

     Finish:1

     2 Start

     …

3.      CompletionService和ExecutorCompletionService

     CompletionService类似于一个Executor和Queue的混合。我们可以通过submit()向CompletionService提交任务,然后通过poll()来获取第一个完成的任务,也可以通过take()来阻塞等待下一个完成的任务。ExecutorCompletionService是CompletionService的实现类,他需要提供一个Executor作为构造函数的参数。

     例子:

     Executor executor = …;

     CompletionService cs = new ExecutorCompletionService(executor);

     Future fs = cs.submit(…);

     Future ft = cs.take();

 

4.      Semaphore

     信号量是用于同步和互斥的低级原语。信号量提供的acquire()和release()操作,与操作系统上的p,v操作同。

     例子:

     缓冲区:

     public class Buffer{

          private Semaphore s = null;

          private Semaphore p = null;

          Vector<Integer> v = new Vector<Integer>();

         

          public Buffer(int capacity){

               s = new Semaphore(capacity);

              p = new Semaphore(0);

          }

    

          public void put(int i){

              try{

                   s.acquire();

                   v.add(new Integer(i));

                   p.release();

               }catch(Exception e){

                   e.printStackTrace();

              }

          }

    

          public int get(){ 

               int i = 0;

              try{

                   p.acquire();

                   i = ((Integer)v.remove(0)).intValue();

                   s.release();

              }catch(Exception e){

                   e.printStackTrace();

              }

               return i;

          }   

     }

 

     生产者:

     public class Producer extends Thread{

          private Buffer b;

          private int count;

          private int step;

          private int id;

 

          public Producer(Buffer b,int step,int id){

               this.b =  b;

              this.step = step;

              this.id = id;

               count = 0;

          }

    

          public void run(){

              try{

                   while(true){

                       System.out.println("In put");

                        b.put(count);

                        System.out.println("Producer "+id+":"+count);

                        count++;

                       Thread.sleep(step);

                        System.out.println("Out put");

                   }

               }catch(Exception e){

                   e.printStackTrace();

              }

          }

     }

 

     消费者:

     public class Consumer extends Thread{

          private Buffer b;

          private int step;

          private int id;

    

          public Consumer(Buffer b,int step,int id){

              this.b = b;

               this.step = step;

              this.id = id;

          }

         

          public void run(){

              try{

                   while(true){

                        System.out.println("In get");

                       System.out.println("\t\tConsume "+id+":"+b.get());

                        System.out.println("Out get");

                        Thread.sleep(step);

                   }

               }catch(Exception e){

                   e.printStackTrace();

              }   

          }

     }

 

     测试程序:

     public class CPTest{

          public static void main(String[] args){

               Buffer b = new Buffer(3);

              Consumer c1 = new Consumer(b,1000,1);

              Consumer c2 = new Consumer(b,1000,2);

               Producer p1 = new Producer(b,100,1);

              Producer p2 = new Producer(b,100,2);

        

              c1.start();

               c2.start();

              p1.start();

              p2.start();

          }

     }

 

5.      CyclicBarrier

     CyclicBarrier可以让一组线程在某一个时间点上进行等待,当所有进程都到达该等待点后,再继续往下执行。CyclicBarrier使用完以后,通过调用reset()方法,可以重用该CyclicBarrier。线程通过调用await()来减少计数。

 

CyclicBarrier
 
 

 

 

 

 

 

 

     例子:

     任务:

     public class Task extends Thread{

          private String id;

          private CyclicBarrier c;

          private int time;

    

          public Task(CyclicBarrier c,String id,int time){

               this.c = c;

              this.id = id;

               this.time = time;

          }

    

          public void run(){

               try{

                   System.out.println(id+" Start");

                  Thread.sleep(time);

                   System.out.println(id+" Finish");

                   c.await();

                   System.out.println(id+" Exit");         

               }catch(Exception e){

                   e.printStackTrace();

              }

          }   

     }

 

     测试类:

     public class Test{

          public static void main(String[] args){

              CyclicBarrier c = new CyclicBarrier(3,new Runnable(){

                   public void run(){

                        System.out.println("All Work Done");

                   }

              });

               Task t1 = new Task(c,"1",1000);

              Task t2 = new Task(c,"2",3000);

              Task t3 = new Task(c,"3",5000);

               t1.start();

              t2.start();

              t3.start();       

          }

     }

 

     输出结果:

     1 Start

     2 Start

     3 Start

     1 Finish

     2 Finish

     3 Finish

     All Work Done

     3 Exit

     1 Exit

     2 Exit

 

6.      CountdownLatch

     CountdownLatch具有与CyclicBarrier相似的功能,也能让一组线程在某个点上进行同步。但是与CyclicBarrier不同的是:1.CountdownLatch不能重用,2.线程在CountdownLatch上调用await()操作一定会被阻塞,直到计数值为0时才会被唤醒,而且计数值只能通过conutDown()方法进行减少。

特别的,当CountdownLatch的值为1时,该Latch被称为“启动大门”,所有任务线程都在该Latch上await(),直到某个非任务线程调用countDown()触发,所有任务线程开始同时工作。

 

7.      Exchanger

     Exchanger是一个类似于计数值为2的CyclicBarrier。她允许两个线程在某个点上进行数据交换。

       例子:

     public class FillAndEmpty {

         Exchanger<DataBuffer> exchanger = new Exchanger();

         DataBuffer initialEmptyBuffer = ... a made-up type

         DataBuffer initialFullBuffer = ...

 

         public class FillingLoop implements Runnable {

              public void run() {

                   DataBuffer currentBuffer = initialEmptyBuffer;

                   try {

                       while (currentBuffer != null) {

                            addToBuffer(currentBuffer);

                            if (currentBuffer.full())

                                 currentBuffer = exchanger.exchange(currentBuffer);

                       }

                   }catch(InterruptedException ex) { ... handle ... }

              }

         }

 

         public class EmptyingLoop implements Runnable {

              public void run() {

                   DataBuffer currentBuffer = initialFullBuffer;

                   try {

                       while (currentBuffer != null) {

                            takeFromBuffer(currentBuffer);

                            if (currentBuffer.empty())

                                 currentBuffer = exchanger.exchange(currentBuffer);

                       }

                   } catch (InterruptedException ex) { ... handle ...}

              }

         }

 

         public void start() {

              new Thread(new FillingLoop()).start();

              new Thread(new EmptyingLoop()).start();

         }

     }

Exchange
 
 

 

    

 

 

 

 

 

 

 

 

8.      Lock,Condition

     锁是最基本的同步原语。通过在锁上面调用lock()和unlock()操作,可以达到与synchronized关键字相似的效果,但是有一点要注意的是,锁必须显式释放,如果由于抛出异常,而没有释放锁,将导致死锁出现。Condition提供的await(),signal(),signal()操作,与原来的wai(),notify(),notifyAll()操作具有相似的含义。Lock的两个主要子类是ReentrantLock和ReadWriteLock。其中ReadWriteLock的作用是允许多人读,而一人写。

     例子:

     使用Lock和Condition的生产者,消费者问题

     public class BoundedBuffer {

         final Lock lock = new ReentrantLock();

         final Condition notFull  = lock.newCondition();

         final Condition notEmpty = lock.newCondition();

         final Object[] items = new Object[100];

         int putptr, takeptr, count;

        

         public void put(Object x) throws InterruptedException {

              lock.lock();

              try {

                   while (count == items.length)

                       notFull.await();

                   items[putptr] = x;

                   if (++putptr == items.length)

                        putptr = 0;

                   ++count;

                   notEmpty.signal();

              } finally {

                   lock.unlock();

               }

          }

    

          public Object take() throws InterruptedException {

               lock.lock();

              try {

                   while (count == 0)

                       notEmpty.await();

                   Object x = items[takeptr];

                   if (++takeptr == items.length)

                        takeptr = 0;

                   --count;

                   notFull.signal();

                   return x;

               } finally {

                   lock.unlock();

              }

          }

     }   

 

9.      小结:新的concurrent包提供了一个从低到高的同步操作。

 



Lib 2007-01-22 17:33 发表评论
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics