`
nudtgk2000
  • 浏览: 71271 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

重写CountDownLatch以实现线程状态监视

阅读更多

需求:管理线程需要及时知道工作线程全部处于等待状态,并满足等待条件让他们恢复运行。

 

思想:有一种方法是“心跳方法”,让工作线程定时向管理线程“报到”。而这里我想通过并发的状态计数来实现状态监视。jdk提供有两种并发的状态计数:java.util.concurrent.CountDownLatch 和 java.util.concurrent.CyclicBarrier 。CountDownLatch 初始化后,工作线程调用countDown方法,计数减为0以前,调用await方法的其他线程会一直阻塞。它适用于一个或几个其他线程等待一组线程都经过某点的情形,但有一个局限,即它是一次性的能重置。CyclicBarrier 可以重置,但它只适用于一组线程分别到达某点后互相等待以实现步调一致的情形。(见JDK 1.6)

    于是我考虑修改CountDownLatch 类,使其支持重置,以胜任连续监视的需求。

 

代码如下:

修改后的新计数器类:ResetableCountDownLatch

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

public class ResetableCountDownLatch {
    /**
     * Synchronization control For ResetableCountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setCount(count);
        }
        
        void setCount(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code ResetableCountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResetableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    // 增加了这个方法
    public void setCount(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        sync.setCount(count);
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }

 

测试用例:

 

    public static void main(String[] args)
    {
        // 初始化计数器,本例有3个工作线程
        ResetableCountDownLatch latch = new ResetableCountDownLatch(3);
        ThreadGroup tg = new ThreadGroup("");

        class RunBody implements Runnable {
            ResetableCountDownLatch latch;
            
            RunBody(ResetableCountDownLatch latch) {
                this.latch = latch;
            }

            @Override
            public void run()
            {
                System.out.println(Thread.currentThread().getName() + " start.");
                
                for(int i=0; i<Thread.currentThread().getId(); ++i) {
                    try
                    {
                        Thread.sleep(1000);

                        // 在wait前递减计数器
                        synchronized(this) {
                            System.out.println(Thread.currentThread().getName() + " wait " + (i+1) + "time(s)");
                            latch.countDown();
                            this.wait();
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    
                    System.out.println(Thread.currentThread().getName() + " continue.");
                }

                // 线程结束前也要countDown “告知”监视线程
                System.out.println(Thread.currentThread().getName() + " finish.");
                latch.countDown();
            }
            
        }
        
        RunBody threadBody = new RunBody(latch);
        
        for(int i=0; i<3; ++i) {
            new Thread(tg, threadBody).start();
        }

        while(true) {
            try
            {
                latch.await();
                // 需要判断工作线程是否全结束了
                if(0==tg.activeCount()) {
                    break;
                }

                System.out.println("Main: there are " + tg.activeCount() + " live threads all waiting");
                synchronized(threadBody) {
                    // 重置计数器,注意:是存活的工作线程数
                    latch.setCount(tg.activeCount());
                    System.out.println("Main: wake them up.");
                    threadBody.notifyAll();
                }
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        System.out.println("Main: All threads finished.");
    }

 

测试输出:

Thread-1 start.
Thread-2 start.
Thread-0 start.
Thread-2 wait 1time(s)
Thread-0 wait 1time(s)
Thread-1 wait 1time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-2 wait 2time(s)
Thread-0 wait 2time(s)
Thread-1 wait 2time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-0 continue.
Thread-2 continue.
Thread-2 wait 3time(s)
Thread-1 wait 3time(s)
Thread-0 wait 3time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-1 continue.
Thread-2 continue.
Thread-2 wait 4time(s)
Thread-0 wait 4time(s)
Thread-1 wait 4time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-1 wait 5time(s)
Thread-2 wait 5time(s)
Thread-0 wait 5time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-2 continue.
Thread-1 continue.
Thread-0 wait 6time(s)
Thread-1 wait 6time(s)
Thread-2 wait 6time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-1 continue.
Thread-0 continue.
Thread-2 wait 7time(s)
Thread-1 wait 7time(s)
Thread-0 wait 7time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-0 continue.
Thread-1 continue.
Thread-2 continue.
Thread-0 wait 8time(s)
Thread-1 wait 8time(s)
Thread-2 wait 8time(s)
Main: there are 3 live threads all waiting
Main: wake them up.
Thread-1 continue.
Thread-2 continue.
Thread-0 continue.
Thread-0 finish.
Thread-2 wait 9time(s)
Thread-1 wait 9time(s)
Main: there are 2 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-1 continue.
Thread-1 finish.
Thread-2 wait 10time(s)
Main: there are 1 live threads all waiting
Main: wake them up.
Thread-2 continue.
Thread-2 finish.
Main: All threads finished.

 

0
2
分享到:
评论
2 楼 nudtgk2000 2012-11-05  
flysnail 写道
思路挺好,

谢谢鼓励,还是初学者,不懂的太多。:)
1 楼 flysnail 2012-11-05  
思路挺好,:)

相关推荐

Global site tag (gtag.js) - Google Analytics