`

Producer Consumer Pattern

阅读更多

        生产者与消费者之间加入一个“桥梁参与者”,它用来缓冲两者之间的处理速度。

        示例:MakerThread用来不断的产生cake,把cake送到table上;而EaterThread不断的eat cake。Table用来缓冲两者的生产消费过程,put方法中如果出现table满了,那么put方法就要等待,table不满了就被notify了,然后把cake放到table上,并notify那些EaterThread;take方法发现table空了,那么一直等到有cake(被notify的时候当然有cake咯),然后吃掉一个。

import java.util.Random;

public class MakerThread extends Thread{
	private final Random random;
	private final Table table;
	private static int id=0;//蛋糕的流水线号码
	public MakerThread(String name,Table table,long seed){
		super(name);
		this.table=table;
		this.random=new Random(seed);
	}
	public void run(){
		try{
			while(true){
				Thread.sleep(random.nextInt(1000));
				String cake="[cake No."+nextId()+" by "+getName()+"]";
			    table.put(cake);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
	private static synchronized int nextId() {
		return id++;
	}
}

 

import java.util.Random;

public class EaterThread extends Thread {
	private final Random random;
	private final Table table;
	public EaterThread(String name,Table table,long seed){
		super(name);
		this.table=table;
		this.random=new Random(seed);
	}
	public void run(){
		try{
			while(true){
				table.take();
				Thread.sleep(random.nextInt(1000));
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}

}

 

public class Table {
	private final String[] buffer;
	private int tail;    //下一个put的地方
	private int head;    //下一个take的地方
	private int count;
	public Table(int count){
		this.buffer=new String[count];
		this.head=0;
		this.tail=0;
		this.count=0;
	}
	public synchronized void put(String cake) throws InterruptedException{
		System.out.println(Thread.currentThread().getName()+" put "+cake);
		while(count>=buffer.length){
			wait();
		}
		buffer[tail]=cake;
		tail=(tail+1)%buffer.length;
		count++;
		notifyAll();
	}
	public synchronized String take() throws InterruptedException{
		while(count<=0){
			wait();
		}
		String cake=buffer[head];
		head=(head+1)%buffer.length;
		count--;
		notifyAll();
		System.out.println(Thread.currentThread().getName()+" takes "+cake);
		return cake;
	}

}

 

public class Main {
	public static void main(String[] args){
		Table table=new Table(3);
		new MakerThread("Maker1",table,31415).start();
		new MakerThread("Maker2",table,92653).start();
		new MakerThread("Maker3",table,58979).start();
		new EaterThread("Eater1",table,32384).start();
		new EaterThread("Eater2",table,62643).start();
		new EaterThread("Eater3",table,38327).start();
	}
}

 运行结果:

Maker2 put [cake No.0 by Maker2]
Eater3 takes [cake No.0 by Maker2]
Maker1 put [cake No.1 by Maker1]
Eater1 takes [cake No.1 by Maker1]
Maker3 put [cake No.2 by Maker3]
Eater2 takes [cake No.2 by Maker3]
Maker2 put [cake No.3 by Maker2]
Eater3 takes [cake No.3 by Maker2]
Maker3 put [cake No.4 by Maker3]

 

 线程为了协调合作,所以必须进行共享互斥,使得共享的东西不被损坏。而线程的共享互斥,也是为了让线程合作才进行的。这个模式中的中间参与者就是协调线程运行的关键部分。

 

关于几个耗时的操作:

        1、sleep。执行sleep的线程会暂停执行参数内设置的时间。线程Alice执行Thread.sleep(604800000),然后就一直暂停到给定的时间,如果要取消,那么可以在线程Bobby中执行alice.interrupted();这里alice是线程Alice对应的Thread实例。当sleep的线程调用interrupt方法时,就会放弃暂停状态并且抛出InterruptedException异常。这样一来,线程Alice的控制权就交给捕获这个异常的catch块了。

        2、wait。调用wait方法线程会进入等待区,等待的时候线程不会活动,它会直到被notify或notifyAll。

同样可以用interrupt来取消wait,线程Bobby调用alice.interrupted,同样,也会抛出InterruptedException。但要小心锁定的问题,线程进入等待区会把锁定解除,在wait状态下调用interrupt会重新获取锁定,然后再抛出异常。

        3、join。执行join方法会等待到指定的线程结束为止。也就是会花费掉指定线程结束这段时间。和sleep方法一样,调用interrupt方法后,由于join方法不需要获得锁定,它会马上跳到catch块里面。

 

interrupt方法只是改变中断的状态而已。isInterrupted方法可以检查中断状态,如果为中断状态那么返回true,而非中断返回false。Thread.interruted方法可以检查并且清除中断状态,线程会被设置成非中断状态。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics