package test;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者线程
*/
public class Producer implements Runnable {
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("启动生产者线程!");
try {
while (isRunning) {
System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("========放入数据失败:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
}
public void stop() {
isRunning = false;
}
private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
==================================================================================
package test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
/**
* 消费者线程
*/
public class Consumer implements Runnable {
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("启动消费者线程!");
//Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正从队列获取数据...");
//String data = queue.poll(2, TimeUnit.SECONDS);
//if (null != data) {
//System.out.println("拿到数据:" + data);
//System.out.println("正在消费数据:" + data);
//Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
//} else {
//// 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
//isRunning = false;
//}
Collection<String> c = new ArrayList<String>();
queue.drainTo(c);
if(c.isEmpty()){
isRunning = false;
}else{
for(String s : c){
System.out.println("拿到数据:" + s);
System.out.println("正在消费数据:" + s);
}
}
Thread.sleep(DEFAULT_RANGE_FOR_SLEEP*3);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
}
========================================================
package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 声明一个容量为10的缓存队列
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(50);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Producer producer4 = new Producer(queue);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(producer4);
Thread.sleep(2 * 1000);
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
// 执行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
producer4.stop();
Thread.sleep(5000);
// 退出Executor
service.shutdown();
}
}
相关推荐
定义全局线程池,将用户的请求放入自定义队列中,排队等候线程调用,等待超时则自动取消该任务,实现超时可取消的异步任务
在新增的Concurrent包中,BlockingQueue...通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
14-阻塞队列BlockingQueue实战及其原理分析二.pdf
java中线程队列BlockingQueue的用法
弹簧阻挡队列 用Spring Boot阻止队列
10、阻塞队列BlockingQueue实战及其原理分析_
6.6 阻塞队列BlockingQueue 实战及其原 理分析一副本.mp4
6.7 阻塞队列BlockingQueue 实战及其原 理分析二副本.mp4
6.6 阻塞队列BlockingQueue 实战及其原 理分析一副本副本.mp4
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文简要介绍下BlockingQueue...
6.7 阻塞队列BlockingQueue 实战及其原 理分析二副本副本.mp4
6.6 阻塞队列BlockingQueue 实战及其原 理分析一副本副本副本.mp4
主要介绍了java 中 阻塞队列BlockingQueue详解及实例的相关资料,需要的朋友可以参考下
6.JUC并发工具类在大厂的应用场景详解 (1).pdf 7、深入理解 AQS 独占锁之 Reentrantlock 源码分析 (1).pdf 8、读写锁ReentrantReadWriteLock&StampLock详解.pdf ...10、阻塞队列BlockingQueue 实战及其原理分析.pdf
线程----BlockingQueue 的介绍说明
BlockingQueue接口 – 阻塞队列2.1 ArrayBlockingQueue类(有界阻塞队列)2.2 LinkedBlockingQueue类(无界阻塞队列)3. 源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public ...
BlockingQueue java 的工具类,初次要用于消费者,生产者的同步问题。
类似java BlockingQueue,C++写的,支持Windows与Linux。
简单实现BlockingQueue,BlockingQueue源码详解
这个demo主要讲解了BlockingQueue的使用希望可以帮户需要的同学.