`
beyond429
  • 浏览: 93161 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

BlockingQueue队列学习

    博客分类:
  • java
阅读更多

package test;

 

import java.util.Random;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

 

/**

 * 生产者线程

 */

public class Producer implements Runnable {

 

public Producer(BlockingQueue<String> queue) {

this.queue = queue;

}

 

public void run() {

String data = null;

Random r = new Random();

 

System.out.println("启动生产者线程!");

try {

while (isRunning) {

System.out.println("正在生产数据...");

Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

 

data = "data:" + count.incrementAndGet();

System.out.println("将数据:" + data + "放入队列...");

if (!queue.offer(data, 2, TimeUnit.SECONDS)) {

System.out.println("========放入数据失败:" + data);

}

}

} catch (InterruptedException e) {

e.printStackTrace();

Thread.currentThread().interrupt();

} finally {

System.out.println("退出生产者线程!");

}

}

 

public void stop() {

isRunning = false;

}

 

private volatile boolean isRunning = true;

 

private BlockingQueue<String> queue;

 

private static AtomicInteger count = new AtomicInteger();

 

private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

 

}

 

==================================================================================

 

package test;

 

import java.util.ArrayList;

import java.util.Collection;

import java.util.concurrent.BlockingQueue;

 

/**

 * 消费者线程

 */

public class Consumer implements Runnable {

 

public Consumer(BlockingQueue<String> queue) {

this.queue = queue;

}

 

public void run() {

System.out.println("启动消费者线程!");

//Random r = new Random();

boolean isRunning = true;

try {

while (isRunning) {

System.out.println("正从队列获取数据...");

//String data = queue.poll(2, TimeUnit.SECONDS);

//if (null != data) {

//System.out.println("拿到数据:" + data);

//System.out.println("正在消费数据:" + data);

//Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

//} else {

//// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。

//isRunning = false;

//}

Collection<String> c = new ArrayList<String>();

queue.drainTo(c);

if(c.isEmpty()){

isRunning = false;

}else{

for(String s : c){

System.out.println("拿到数据:" + s);

System.out.println("正在消费数据:" + s);

}

}

Thread.sleep(DEFAULT_RANGE_FOR_SLEEP*3);

}

} catch (InterruptedException e) {

e.printStackTrace();

Thread.currentThread().interrupt();

} finally {

System.out.println("退出消费者线程!");

}

}

 

private BlockingQueue<String> queue;

 

private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

}

 

========================================================

 

package test;

 

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

public class BlockingQueueTest {

 

public static void main(String[] args) throws InterruptedException {

// 声明一个容量为10的缓存队列

BlockingQueue<String> queue = new ArrayBlockingQueue<String>(50);

 

Producer producer1 = new Producer(queue);

Producer producer2 = new Producer(queue);

Producer producer3 = new Producer(queue);

Producer producer4 = new Producer(queue);

Consumer consumer1 = new Consumer(queue);

Consumer consumer2 = new Consumer(queue);

Consumer consumer3 = new Consumer(queue);

 

// 借助Executors

ExecutorService service = Executors.newCachedThreadPool();

// 启动线程

service.execute(producer1);

service.execute(producer2);

service.execute(producer3);

service.execute(producer4);

 

Thread.sleep(2 * 1000);

service.execute(consumer1);

service.execute(consumer2);

service.execute(consumer3);

 

// 执行10s

Thread.sleep(10 * 1000);

producer1.stop();

producer2.stop();

producer3.stop();

producer4.stop();

 

Thread.sleep(5000);

 

// 退出Executor

service.shutdown();

}

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics