`
DavyJones2010
  • 浏览: 148973 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

JavaSE: Producers&Consumers By Using BlockingQueue

阅读更多

1) BlockingQueue Intro

     A queue is a data structure with two fundamental operations: to add an element to the tail of the queue and remove an element from the head.

     That is, the queue follows FIFO(First In First Out) discipline.

     A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or when you try to enqueue to it and the queue is full. 

 

2) Simple ArrayBlockQueue Demo

package edu.xmu.thread;

import java.util.ArrayList;
import java.util.List;

public class MyArrayBlockingQueue<E> {
	private List<E> queue;
	private int capacity;

	public MyArrayBlockingQueue(int capacity) {
		queue = new ArrayList<E>(capacity);
		this.capacity = capacity;
	}

	public synchronized void enqueue(E item) throws InterruptedException {
		System.out.println(String.format(
				"Thread: [%s] attempts to enqueue. Current queue size: [%d]",
				Thread.currentThread(), queue.size()));
		while (queue.size() == capacity) {
			System.out.println(String.format(
					"Thread [%s] is waiting for enqueue.",
					Thread.currentThread()));
			wait(); //When current thread is notified, and jumped out of while loop, it still have to attempt to acquire the lock again 
		}
		if (queue.isEmpty()) {
			notifyAll();
		}
		queue.add(item);
		System.out.println(String.format("Thread [%s] executed enqueue.",
				Thread.currentThread()));
	}

	public synchronized E dequeue() throws InterruptedException {
		System.out.println(String.format(
				"Thread: [%s] attempts to dequeue. Current queue size: [%d]",
				Thread.currentThread(), queue.size()));

		while (queue.size() == 0) {
			System.out.println(String.format(
					"Thread [%s] is waiting for dequeue.",
					Thread.currentThread()));
			wait();
		}

		if (queue.size() == capacity) {
			notifyAll();
		}

		E item = queue.remove(0);
		System.out.println(String.format("Thread [%s] executed dequeue.",
				Thread.currentThread()));
		return item;
	}
}

 

3) Producer & Consumer Using BlockingQueue

package edu.xmu.thread;

public class BlockingQueueTest {
	private static final int INITIAL_CAPACITY = 10;

	public static void main(String[] args) {
		MyArrayBlockingQueue<Food> foodQueue = new MyArrayBlockingQueue<>(
				INITIAL_CAPACITY);
		Thread producer1 = new Thread(new FoodProducer(foodQueue));
		Thread producer2 = new Thread(new FoodProducer(foodQueue));
		Thread producer3 = new Thread(new FoodProducer(foodQueue));

		Thread consumer1 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer2 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer3 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer4 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer5 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer6 = new Thread(new FoodConsumer(foodQueue));

		producer1.start();
		producer2.start();
		producer3.start();

		consumer1.start();
		consumer2.start();
		consumer3.start();
		consumer4.start();
		consumer5.start();
		consumer6.start();
	}
}

class Food {
}

class FoodProducer implements Runnable {
	MyArrayBlockingQueue<Food> foodQueue;

	public FoodProducer(MyArrayBlockingQueue<Food> foodQueue) {
		super();
		this.foodQueue = foodQueue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep((long) (Math.random() * 1000));
				Food food = new Food();
				foodQueue.enqueue(food);
				System.out.println(String.format(
						"Thread: [%s] produces food: [%s]",
						Thread.currentThread(), food));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

class FoodConsumer implements Runnable {
	MyArrayBlockingQueue<Food> foodQueue;

	public FoodConsumer(MyArrayBlockingQueue<Food> foodQueue) {
		super();
		this.foodQueue = foodQueue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep(1000L);
				Food food = foodQueue.dequeue();
				Thread.sleep(1000L);
				System.out.println(String.format(
						"Thread: [%s] consumes food: [%s]",
						Thread.currentThread(), food));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

 

4) BlockingQueue provided by Java:

    1> ArrayBlockingQueue

    2> LinkedBlockingQueue

    3> PriorityBlockingQueue

    Operations: Will dig into the realization of these BlockingQueues.



 

Reference Links:

1) http://stackoverflow.com/questions/16760513/how-does-wait-get-the-lock-back-in-java

2) http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

3) Core Java Volume II

  • 大小: 55.4 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics