`
lanqiu17
  • 浏览: 17181 次
社区版块
存档分类
最新评论

java 消费者与生产者

阅读更多

原理:

生产者-消费者(producer-consumer)是共享一个公共的固定大小的 缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入 一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费 者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。

 

    公共资源

package test.thread.producer;

import java.util.LinkedList;


public class ShareResource {

	private LinkedList share = new LinkedList<>();
    private int capacity = 20;
	public synchronized void addResource(Object resource) {
		if (isFull()) {
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		share.add(resource);
		notify();
	}

	public synchronized Object get() {
		if (isEmpty()) {
			try {
				wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		notify();
		return share.poll();
	}
	
	public boolean isFull() {
		return share.size() == capacity;
	}
	
	public boolean isEmpty() {
		return share.isEmpty();
	}
}

 

   //生产者

 

package test.thread.producer;

import org.apache.log4j.Logger;

public class Producer implements Runnable {

	private Logger log = Logger.getLogger(getClass());
	private ShareResource sr;
	public Producer(ShareResource sr) {
		this.sr = sr;
	}
	
	public void product() {
		sr.addResource("添加任务");
		log.debug("添加任务");
	}
	
	@Override
	public void run() {
		while(true) {
			product();
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

 

  //消费者

package test.thread.producer;

import org.apache.log4j.Logger;

public class Customer  implements Runnable {

	private ShareResource sr;
	private Logger log = Logger.getLogger(getClass());
	public Customer(ShareResource sr) {
		this.sr = sr;
	}
	
	public void cust() {
		sr.get();
		log.debug("消费任务");
	}
	
	@Override
	public void run() {
		while(true) {
			try {
				cust();
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

  

package test.thread.producer;

import org.junit.Test;

public class TestCustomer {

	
	@Test
	public void test() {
		ShareResource sr = new ShareResource();
		new Thread(new Producer(sr),"生产者线程1").start();
		new Thread(new Producer(sr),"生产者线程2").start();
		new Thread(new Customer(sr),"消费者线程1").start();
		new Thread(new Customer(sr),"消费者线程2").start();
		new Thread(new Customer(sr),"消费者线程3").start();
		try {
			Thread.sleep(1000000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics