- 浏览: 870142 次
- 性别:
- 来自: 北京
博客专栏
-
Java Concurre...
浏览量:95963
文章分类
最新评论
-
guo_guo_guo:
mark
Zookeeper 安装和配置 -
solrer:
xunux 写道windows下,dataDir目录若是用需要 ...
Zookeeper 安装和配置 -
kevlearnjava:
你好 我是按照你的伪集群进行配置的 然后启动第二个和第三个 ...
Zookeeper 安装和配置 -
筑浪小子:
博主应该把vector设定为全局变量,更加清晰一些
线程安全的集合类--Java Concurrency In Practice C05读书笔记 -
tt5753:
jps16437 QuorumPeerMain16663 Zo ...
Zookeeper 安装和配置
[本文是我对Java Concurrency In Practice 5.3的归纳和总结. 转载请注明作者和出处, 如有谬误, 欢迎在评论中指正. ]
生产者消费者模式
以缓冲区作为生产者和消费者之间沟通的桥梁: 生产者只负责生产, 将生产出来的数据存入缓冲区. 而消费者只负责消费, 不断的从缓冲区中取出数据进行处理.
生产者消费者模式是非常常用的, 因为应用该模式有效的解耦了生产者和消费者. 生产者不需要知道有没有其他生产者在生产, 也不需要知道有多少个消费者在消费, 而消费者不需要知道数据来自哪个生产者. 另外该模式支持并发操作, 如果生产者直接调用消费者的方法, 生产者就必须等到消费者处理完毕才能返回, 万一消费者处理的速度很慢, 就会白白浪费生产者的时间. 而使用模式的话, 生产者只需要将数据存入缓冲区就可以了.
缓冲区是生产者消费者模式的核心. 生产者将数据存入缓冲区的一端, 消费者则负责从缓冲区的另一端取出数据进行处理, 队列非常适用这样的场景. 由于生产者消费者大多处于不同的线程, 队列就必须是线程安全的--java的BlockingQueue可以满足要求.
BlockingQueue
BlockingQueue的put方法用于将数据放入队列, 如果队列已满, put方法所在的线程将阻塞, 直到队列不满. take方法用于从队列中取出数据, 如果队列为空, take方法所在的线程将阻塞, 直到队列不为空.
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; // 锁定 lock.lockInterruptibly(); try { try { // 如果队列已满, 就阻塞线程 while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; // 插入数据后唤醒在非空条件上阻塞的线程 notEmpty.signal(); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 锁定 lock.lockInterruptibly(); try { try { // 如果队列为空, 就阻塞线程 while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } private E extract() { final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; // 取出数据后唤醒在notFull条件上阻塞的线程 notFull.signal(); return x; }
offer(E e, long timeout, TimeUnit unit)用于将数据放入队列, 如果队列已满, 将最多等待指定的时间, offer返回true时说明数据成功入队, 否则说明没有成功. poll(long timeout, TimeUnit unit)是与offer配对的方法, 用于从队列中取出数据, 如果队列为空, 最多等待指定的时间, poll返回值为null时说明没有取到数据.
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); // 获得阻塞的最大时间 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 如果队列没有满, 则插入数据并返回true if (count != items.length) { insert(e); return true; } // 如果剩余的等待时间小于等于0说明等待时间已超过最大值, 此时返回false, 表明插入没有成功 if (nanos <= 0) return false; try { // awaitNanos方法用于阻塞队列, 并返回剩余的时间值 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 如果队列不为空, 就取出数据然后返回 if (count != 0) { E x = extract(); return x; } // 如果阻塞时间已过最大时间, 就返回null, 说明没有取到数据 if (nanos <= 0) return null; try { // awaitNanos方法用于阻塞队列, 并返回剩余的时间值 nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } }
BlockingQueue的容量可以是无限的, 也可以是有限的. 无限容量的BlockingQueue永远也不会发生队列已满的事件.
BlockingQueue的常见实现类有ArrayBlockingQueue, LinkedBlockingQueue, 以及PriorityBlockingQueue等. ArrayBlockingQueue底层使用循环数组实现, LinkedBlockingQueue底层使用链表实现. PriorityBlockingQueue则是一个可排序的阻塞队列, 可以按照元素的自然顺序(元素需要实现Comparable接口)或者指定的Comparator排序.
生产者消费者模式的例子
该例子用于模拟对文件进行索引, 生产者FileCrawler类将待索引的文件放入队列, 消费者Indexer则负责从队列中取出文件进行索引标记.
/** * 生产者, 生产待索引的文件 */ public class FileCrawler implements Runnable { private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) { this.fileQueue = fileQueue; this.fileFilter = fileFilter; this.root = root; } public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) if (entry.isDirectory()) { // 递归调用 crawl(entry); } else if (!alreadyIndexed(entry)) { // 向队列中添加文件, 如果队列是BOUND的, 且队列已满, 则put方法将阻塞, 直到队列不满 System.out.println(entry + ": 等待进行索引 by " + Thread.currentThread().getName()); fileQueue.put(entry); } } } private boolean alreadyIndexed(File entry) { return false; } } /** * 消费者, 从队列中取出文件进行索引标记 */ public class Indexer implements Runnable { private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue) { this.queue = queue; } public void run() { try { while (true) { // 从队列中取出file标记索引. 如果队列为空, take方法将阻塞, 直到队列重新不为空. File file = queue.take(); indexFile(file); System.out.println(file + ": 已进行索引 by " + Thread.currentThread().getName()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void indexFile(File file) { } } /** * 测试生产者消费者模式 */ public class FileIndexer { public static void startIndexing(File[] roots) { // 创建一个BOUNDED队列, 队列中最大的元素为10个 BlockingQueue<File> queue = new LinkedBlockingQueue<File>(10); FileFilter filter = new FileFilter() { public boolean accept(File file) { return true; } }; for (int i = 0; i < roots.length; i++) { File root = roots[i]; // 启动生产者线程 new Thread(new FileCrawler(queue, filter, root), "producer " + i).start(); } // 启动3个消费者线程 for (int i = 0; i < 3; i++) { new Thread(new Indexer(queue), "consumer " + i).start(); } } public static void main(String[] args) { File dir = new File("E:\\TDDOWNLOAD\\mina doc"); startIndexing(dir.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { if (pathname.isDirectory()) { return true; } return false; } })); } }
发表评论
-
状态依赖的类--JCIP C14.1读书笔记
2012-04-11 10:24 2596[本文是我对Java Concurrency In Pract ... -
内置锁和显式锁的区别--JCIP C13读书笔记
2012-04-11 10:17 5721[本文是我对Java Concurrenc ... -
改善并发程序的可扩展性--JCIP C11读书笔记
2012-04-10 14:40 2432[本文是我对Java Concurrency In Pract ... -
如何避免死锁--JCIPC10读书笔记
2012-04-10 10:08 3006[本文是我对Java Concurrency In Pract ... -
task与execution--JCIPC08读书笔记
2012-04-09 10:34 2339[本文是我对Java Concurrency In Pract ... -
配置ThreadPoolExecutor
2012-04-09 10:34 6122[本文是我对Java Concurrency In Pract ... -
停止基于线程的Service--JCIP7.2读书笔记
2012-04-06 10:28 2290[本文是我对Java Concurrency In Pract ... -
处理不可中断的阻塞-JCIP7.1读书笔记
2012-04-06 10:23 5632[本文是我对Java Concurrenc ... -
处理InterruptedException异常--JCIP7.1读书笔记
2012-04-05 14:08 6040[本文是我对Java Concurrency In Pract ... -
中断线程--JCIP7.1读书笔记
2012-04-05 14:03 2549[本文是我对Java Concurrency In Pract ... -
改善并发性能--JCIP6.3读书笔记
2012-04-02 11:51 2510[本文是我对Java Concurrency In Pr ... -
Executor--JCIP C06读书笔记
2012-04-02 09:28 2814[本文是我对Java Concurrency In Pract ... -
设计高效的线程安全的缓存--JCIP5.6读书笔记
2012-04-01 22:49 5779[本文是我对Java Concurrency In Pract ... -
synchronizer--JCIP5.5读书笔记
2012-04-01 22:44 2281[本文是我对Java Concurrency In Pract ... -
ConcurrentHashMap和CopyOnWriteArrayList--Java Concurrency In Practice C05读书笔记
2012-03-31 11:27 3992[本文是我对Java Concurrenc ... -
线程安全的集合类--Java Concurrency In Practice C05读书笔记
2012-03-28 18:26 14152[本文是我对Java Concurrency In Pract ... -
利用对象限制和委托构建线程安全的类--Java Concurrency In Practice C04读书笔记
2012-03-27 18:23 3783[本文是我对Java Concurrency In Pract ... -
变量可见性和volatile, this逃逸, 不可变对象, 以及安全公开--Java Concurrency In Practice C03读书笔记
2012-03-26 21:55 12488[本文是我对Java Concurrency In Pract ... -
变量可见性和volatile, this逃逸, 不可变对象, 以及安全公开--Java Concurrency In Practice C02读书笔记
2012-03-26 21:53 2[本文是我对Java Concurrency In Pract ... -
Race condition--Java Concurrency In Practice C02读书笔记
2012-03-26 10:17 4927[本文是我对Java Concurrenc ...
相关推荐
主要介绍了java并发学习之BlockingQueue实现生产者消费者详解,具有一定参考价值,需要的朋友可以了解下。
java 多线程 生产者消费者模式,多个生产者对多个消费者,使用jdk 线程池及 BlockingQueue实现,解决了待生产的任务生产完成后,正常终止所有线程,避免线程(特别是消费者线程)因阻塞而无限等待的情况。源码中还简单...
主要介绍了Java多线程 BlockingQueue实现生产者消费者模型详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
主要介绍了Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析,涉及queue,BlockingQueue等有关内容,具有一定参考价值,需要的朋友可以参考。
源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...
线程----BlockingQueue 的介绍说明
BlockingQueue java 的工具类,初次要用于消费者,生产者的同步问题。
这个demo主要讲解了BlockingQueue的使用希望可以帮户需要的同学.
BlockingQueue支持两个附加操作的Queue:1)当Queue为空时,获取元素线程被阻塞直到Queue变为非空;2)当Queue满时,添加元素线程被阻塞直到Queue不满。BlockingQueue不允许元素为null,...常用于生产者消费者模式。
Java-concurrent-collections-concurrenthashmap-blockingqueue.pdf
- 阻塞队列BlockingQueue,生产者消费者模式 - Selector - Channel - ByteBuffer - ProtoStuff 高性能序列化 - HttpClient连接池 - Spring依赖注入 - lombok简化POJO开发 - 原子变量 - 内置锁 - ...
14-阻塞队列BlockingQueue实战及其原理分析二.pdf
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的...本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
阻塞队列BlockingQueue,生产者消费者模式 选择器 渠道 字节缓冲区 ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子指标 内置锁 竣工服务 log4j+slf4j日志 实现的功能 登录注销 ...
类似java BlockingQueue,C++写的,支持Windows与Linux。
简单实现BlockingQueue,BlockingQueue源码详解
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文简要介绍下BlockingQueue...
我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到生产者发布了新的事件。 ...
Java 多线程与并发(16_26)-JUC集合_ BlockingQueue详解
生产者消费者在 Java 中实现生产者 - 消费者问题。 基于此处介绍的示例: :