`

jdk1.5条件阻塞Condition的应用

阅读更多
package cn.com.songjy.test.socket.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * 
 * ClassName:ConditionCommunication 
 * jdk1.5条件阻塞Condition的应用(通信)
 * 子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着再回到主线程又循环100次,如此循环50次
 * 
 * @author songjy
 * @version 1.0
 * @since v1.0
 * @Date 2013-8-22 下午2:22:53
 */
public class ConditionCommunication {

	public static void main(String[] args) {

		final Businese businese = new Businese();

		new Thread(new Runnable() {
			public void run() {
				for (int j = 0; j < 50; j++) {
					businese.sub(j);
				}
			}
		}).start();

		for (int j = 0; j < 50; j++) {
			businese.main(j);
		}
	}

	static class Businese {

		private static Log log = LogFactory.getLog(Businese.class);
		private boolean sub = true;
		private Lock lock = new ReentrantLock();
		Condition condition = lock.newCondition();

		public /*synchronized*/ void sub(int j) {/*synchronized被Lock代替*/
			lock.lock();
			try {
				/* 这里用while不用if是因为有可能出现假唤醒的情况 */
				while (!sub) {// 还没有轮到我(sub),继续等待(睡觉)
					try {
						//wait();
						condition.await();/*代替wait(),但切记不要误写成condition.wait()*/
					} catch (InterruptedException e) {
						log.error(e.getMessage(), e);
					}
				}
				for (int i = 0; i < 10; i++) {
					log.info("sup thread sequence of" + i + "loop of " + j);
				}

				sub = false;// 执行完毕,变更状态
				//notify();// 本次执行完毕,唤醒其他线程(main)
				condition.signal();/*代替notify()*/
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			} finally {
				lock.unlock();//最终必须释放锁,即使程序出错!
			}
		}

		public /*synchronized*/ void main(int j) {
			lock.lock();
			try {
				/* 这里用while不用if是因为有可能出现假唤醒的情况 */
				while (sub) {// 还没有轮到我(main),继续等待(睡觉)
					try {
						//wait();
						condition.await();/*代替wait(),但切记不要误写成condition.wait()*/
					} catch (InterruptedException e) {
						log.error(e.getMessage(), e);
					}
				}
				for (int i = 0; i < 100; i++) {
					log.info("main thread sequence of" + i + "loop of " + j);
				}
				sub = true;// 执行完毕,变更状态
				//notify();// 本次执行完毕,唤醒其他线程(sub)
				condition.signal();/*代替notify()*/
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			} finally {
				lock.unlock();//即使程序出错也必须释放锁
			}
		}
	}
	
}

/*备注:wait()、notify()和notifyAll()都是Object类中的final方法,被所有的类继承、且不允许重写的方法*/ 


package cn.com.songjy.test.socket.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * 
 * ClassName:ConditionCommunication 
 * jdk1.5条件阻塞Condition的应用(通信)
 * 子线程1循环5次,接着子线程2循环10次,最后主线程循环15次,接着又回到子线程1循环5次,接着子线程2循环10次,最后主线程循环15次,如此循环10次
 * 
 * @author songjy
 * @version 1.0
 * @since v1.0
 * @Date 2013-8-22 下午2:22:53
 */
public class ThreeConditionCommunication {

	public static void main(String[] args) {

		final Businese businese = new Businese();

		new Thread(new Runnable() {
			public void run() {
				for (int j = 0; j < 10; j++) {
					businese.sub1(j);
				}
			}
		}).start();
		
		new Thread(new Runnable() {
			public void run() {
				for (int j = 0; j < 10; j++) {
					businese.sub2(j);
				}
			}
		}).start();

		for (int j = 0; j < 10; j++) {
			businese.main(j);
		}
	}

	static class Businese {

		private static Log log = LogFactory.getLog(Businese.class);
		private int sub = 0;
		private Lock lock = new ReentrantLock();
		Condition condition1 = lock.newCondition();
		Condition condition2 = lock.newCondition();
		Condition condition3 = lock.newCondition();

		public void sub1(int j) {
			lock.lock();
			try {
				/* 这里用while不用if是因为有可能出现假唤醒的情况 */
				while (0 != sub) {// 还没有轮到我(sub1),继续等待(睡觉)
					try {
						condition1.await();/*切记不要误写成condition.wait()*/
					} catch (InterruptedException e) {
						log.error(e.getMessage(), e);
					}
				}
				for (int i = 0; i < 5; i++) {
					log.info("sup1 thread sequence of" + i + "loop of " + j);
				}

				sub = 1;// 执行完毕,变更状态
				condition2.signal();/*唤醒sub2线程*/
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			} finally {
				lock.unlock();//最终必须释放锁,即使程序出错!
			}
		}
		
		public void sub2(int j) {
			lock.lock();
			try {
				/* 这里用while不用if是因为有可能出现假唤醒的情况 */
				while (1 != sub) {// 还没有轮到我(sub2),继续等待(睡觉)
					try {
						condition2.await();/*切记不要误写成condition.wait()*/
					} catch (InterruptedException e) {
						log.error(e.getMessage(), e);
					}
				}
				for (int i = 0; i < 10; i++) {
					log.info("sup2 thread sequence of" + i + "loop of " + j);
				}

				sub = 2;// 执行完毕,变更状态
				condition3.signal();/*唤醒main线程*/
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			} finally {
				lock.unlock();//最终必须释放锁,即使程序出错!
			}
		}

		public void main(int j) {
			lock.lock();
			try {
				/* 这里用while不用if是因为有可能出现假唤醒的情况 */
				while (2 != sub) {// 还没有轮到我(main),继续等待(睡觉)
					try {
						//wait();
						condition3.await();/*切记不要误写成condition.wait()*/
					} catch (InterruptedException e) {
						log.error(e.getMessage(), e);
					}
				}
				for (int i = 0; i < 15; i++) {
					log.info("main thread sequence of" + i + "loop of " + j);
				}
				sub = 0;// 执行完毕,变更状态
				condition1.signal();/*唤醒sub1线程*/
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			} finally {
				lock.unlock();//即使程序出错也必须释放锁
			}
		}
	}
	
}

/*备注:wait()、notify()和notifyAll()都是Object类中的final方法,被所有的类继承、且不允许重写的方法*/ 


package cn.com.songjy.test.socket.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 
 * ClassName:BoundedBuffer
 * 缓冲模拟(不是缓存)
 *
 * @author   songjy
 * @version  1.0
 * @since    v1.0
 * @Date	 2013-8-23	上午9:40:18
 */
class BoundedBuffer {
	
	final Lock lock = new ReentrantLock();
	final Condition notFull = lock.newCondition();
	final Condition notEmpty = lock.newCondition();

	final Object[] items = new Object[100];
	int putptr, takeptr, count;

	public void put(Object x) throws InterruptedException {
		lock.lock();
		try {
			while (count == items.length)
				notFull.await();
			items[putptr] = x;
			if (++putptr == items.length)
				putptr = 0;
			++count;
			notEmpty.signal();
		} finally {
			lock.unlock();
		}
	}

	public Object take() throws InterruptedException {
		lock.lock();
		try {
			while (count == 0)
				notEmpty.await();
			
			Object x = items[takeptr];
			if (++takeptr == items.length)
				takeptr = 0;
			--count;
			notFull.signal();
			return x;
		} finally {
			lock.unlock();
		}
	}
}


来自:http://down.51cto.com/data/443438
  • 大小: 226.2 KB
  • 大小: 246.7 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics