看了下JDK的源代码实现,模拟了一下ArrayBlockQueue,代码如下:
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockQueue {
private List<Integer> list;
private int size;
private final ReentrantLock lock;
private final Condition isEmpty;
private final Condition isFull;
public BlockQueue(int size) {
this.size = size;
list = new LinkedList<Integer>();
lock = new ReentrantLock();
isEmpty = lock.newCondition();
isFull = lock.newCondition();
}
public void put(int data) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (list.size() == size) {
try {
isFull.await();
} catch (InterruptedException e) {
isFull.signal();
throw e;
}
}
list.add(data);
System.out.println("size : " + list.size() + ", " + Thread.currentThread().getName() + " put " + data);
isEmpty.signal();
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (list.size() == 0) {
try {
isEmpty.await();
} catch (InterruptedException e) {
isEmpty.signal();
throw e;
}
}
int data = list.remove(0);
System.out.println("size : " + list.size() + ", " + Thread.currentThread().getName() + " get " + data);
isFull.signal();
return data;
} finally {
lock.unlock();
}
}
}
import java.util.Random;
public class Producer implements Runnable {
private BlockQueue bq;
private String name;
public Producer(String name, BlockQueue bq) {
this.name = name;
this.bq = bq;
}
@Override
public void run() {
Random ran = new Random();
for(int i = 0; i < 10; i++){
int data = ran.nextInt(20);
try {
bq.put(data);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//System.out.println(name + " put " + data);
}
}
}
public class Consumer implements Runnable {
private BlockQueue bq;
private String name;
public Consumer(String name, BlockQueue bq) {
this.bq = bq;
this.name = name;
}
@Override
public void run() {
for(int i = 0; i < 10; i++){
int data;
try {
data = bq.take();
//System.out.println(name + " get " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import org.junit.Test;
public class BlockQueueTest {
@Test
public void test() throws InterruptedException {
BlockQueue bq = new BlockQueue(5);
Producer p1 = new Producer("producer 1", bq);
Producer p2 = new Producer("producer 2", bq);
Producer p3 = new Producer("producer 3", bq);
Consumer c1 = new Consumer("consumer 1", bq);
Consumer c2 = new Consumer("consumer 2", bq);
Consumer c3 = new Consumer("consumer 3", bq);
new Thread(p1).start();
new Thread(p2).start();
new Thread(p3).start();
new Thread(c1).start();
new Thread(c2).start();
new Thread(c3).start();
Thread.sleep(10000);
}
}
运行结果如下:
size : 1, Thread-0 put 15
size : 2, Thread-0 put 0
size : 3, Thread-0 put 18
size : 4, Thread-0 put 9
size : 5, Thread-0 put 6
size : 4, Thread-3 get 15
size : 3, Thread-5 get 0
size : 2, Thread-5 get 18
size : 1, Thread-5 get 9
size : 0, Thread-5 get 6
size : 1, Thread-1 put 12
size : 0, Thread-4 get 12
size : 1, Thread-0 put 3
size : 2, Thread-2 put 2
size : 3, Thread-2 put 13
size : 4, Thread-2 put 12
size : 5, Thread-2 put 14
size : 4, Thread-5 get 3
size : 3, Thread-5 get 2
size : 2, Thread-5 get 13
size : 1, Thread-5 get 12
size : 0, Thread-5 get 14
size : 1, Thread-1 put 0
size : 2, Thread-1 put 3
size : 3, Thread-1 put 19
size : 4, Thread-1 put 13
size : 5, Thread-1 put 2
size : 4, Thread-4 get 0
size : 3, Thread-4 get 3
size : 2, Thread-4 get 19
size : 1, Thread-4 get 13
size : 0, Thread-4 get 2
size : 1, Thread-2 put 1
size : 2, Thread-2 put 9
size : 3, Thread-2 put 10
size : 4, Thread-2 put 14
size : 5, Thread-2 put 3
size : 4, Thread-5 get 1
size : 3, Thread-3 get 9
size : 2, Thread-3 get 10
size : 1, Thread-3 get 14
size : 0, Thread-3 get 3
size : 1, Thread-1 put 17
size : 2, Thread-1 put 14
size : 3, Thread-1 put 1
size : 4, Thread-1 put 10
size : 3, Thread-4 get 17
size : 2, Thread-4 get 14
size : 1, Thread-4 get 1
size : 0, Thread-4 get 10
size : 1, Thread-2 put 6
size : 2, Thread-0 put 1
size : 3, Thread-0 put 14
size : 4, Thread-0 put 9
size : 5, Thread-0 put 4
size : 4, Thread-3 get 6
size : 3, Thread-3 get 1
size : 2, Thread-3 get 14
size : 1, Thread-3 get 9
size : 0, Thread-3 get 4
说明:在Producer和Consumer线程里面打印语句的显示结果很令人迷惑,改成在BlockQueue里面打印就好了。
分享到:
相关推荐
简单实现BlockingQueue,BlockingQueue源码详解
目标是在接近零开销的情况下实现极低的延迟。 如何使用我们的低延迟有界队列之一的示例。 // writer thread Executors.newSingleThreadExecutor().execute(new Runnable() { @Override public void run() { ...
定义全局线程池,将用户的请求放入自定义队列中,排队等候线程调用,等待超时则自动取消该任务,实现超时可取消的异步任务
主要介绍了Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析,涉及queue,BlockingQueue等有关内容,具有一定参考价值,需要的朋友可以参考。
BlockingQueue是一种特殊的Queue,若BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态直到BlocingkQueue进了新货才会被唤醒,下面是用BlockingQueue来实现Producer和Consumer的例子
主要介绍了java并发学习之BlockingQueue实现生产者消费者详解,具有一定参考价值,需要的朋友可以了解下。
源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...
主要介绍了Java多线程 BlockingQueue实现生产者消费者模型详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
主要介绍了详解Java阻塞队列(BlockingQueue)的实现原理,阻塞队列是Java util.concurrent包下重要的数据结构,有兴趣的可以了解一下
Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...
多线程练习 Java MultiThread练习代码,包括ReaderWriter,BlockingQueue,ProducerConsumer等的实现。
主要介绍了Java并发之BlockingQueue的使用,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
它们都继承自AbstractQueue类,并实现了BlockingQueue接口,提供了线程安全的队列操作。 ArrayBlockingQueue实现原理: ArrayBlockingQueue内部使用数组存储元素,使用ReentrantLock来保证线程安全,使用Condition...
生产者消费者在 Java 中实现生产者 - 消费者问题。 基于此处介绍的示例: :
阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 CompletionService log4j+slf4j日志 实现的...
阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 CompletionService log4j+slf4j日志 实现的...
java集合类 Java中有哪些容器(集合类)? 线程安全和线程不安全的分别有哪些? Map接口有哪些实现类?...BlockingQueue是怎么实现的 Stream(不是IOStream)有哪些方法 BlockingQueue中有哪些方法,为什么这样设计?
阻塞队列BlockingQueue,生产者消费者模式 选择器 渠道 字节缓冲区 ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子指标 内置锁 竣工服务 log4j+slf4j日志 实现的功能 登录注销 ...
并发包 同步容器类 Vector与ArrayList区别 1.ArrayList是最常用的List实现类,内部是通过数组实现的,它允许对元素进行快速随机访问。...列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。
首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: BlockingQueue<Integer> ...