`
yanguz123
  • 浏览: 556056 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

\(^_^)/ Java多线程Producer-Consumer模式

    博客分类:
  • Code
 
阅读更多

生产者

package producer_customer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

	private volatile boolean isRunning = true;
	private BlockingQueue<Data> queue;
	private static AtomicInteger count = new AtomicInteger();
	private static final int SLEEPTIME = 10000;

	public Producer(BlockingQueue<Data> queue) {
		this.queue = queue;
	}

	public void stop() {
		isRunning = false;
	}

	@Override
	public void run() {
		Data data = null;
		Random random = new Random();
		System.out.println("start producer id = " + Thread.currentThread().getId());
		try {
			while (isRunning) {
				Thread.sleep(random.nextInt(SLEEPTIME));
				data = new Data(count.incrementAndGet());
				System.out.println("Put " + data.getIntData() + " is put into queue");
				if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
					System.out.println("failed to put data: " + data);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}
	}

}

 

 

消费者

package producer_customer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

	private BlockingQueue<Data> queue;
	private static final int SLEEPTIME = 10000;

	public Consumer(BlockingQueue<Data> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		System.out.println("start consumer id = " + Thread.currentThread().getId());
		Random random = new Random();
		try {
			while (true) {
				Data data = queue.take();
				if (null != data) {
					int result = data.getIntData() * data.getIntData();
					System.out.println("Take " + data.getIntData() + " from queue, and result = " + result);
					Thread.sleep(random.nextInt(SLEEPTIME));
				}
			}

		} catch (Exception e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		}

	}

}

 

数据

package producer_customer;

public final class Data {
	private final int intData;

	public Data(int d) {
		this.intData = d;
	}

	public Data(String s) {
		this.intData = Integer.valueOf(s);
	}

	public int getIntData() {
		return intData;
	}

}

 

测试类

package producer_customer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

public class Test {
	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<Data> queue = new LinkedBlockingDeque<Data>(10);

		Producer p1 = new Producer(queue);
		Producer p2 = new Producer(queue);
		Producer p3 = new Producer(queue);
		Producer p4 = new Producer(queue);
		Consumer c1 = new Consumer(queue);
		Consumer c2 = new Consumer(queue);
		Consumer c3 = new Consumer(queue);
		Consumer c4 = new Consumer(queue);

		ExecutorService service = Executors.newCachedThreadPool();
		service.execute(p1);
		service.execute(p2);
		service.execute(p3);
		service.execute(p4);
		service.execute(c1);
		service.execute(c2);
		service.execute(c3);
		service.execute(c4);

		Thread.sleep(10000);
		p1.stop();
		p2.stop();
		p3.stop();
		Thread.sleep(10000);
		service.shutdown();
	}
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics