- 浏览: 130459 次
- 性别:
- 来自: ...
文章分类
最新评论
之前介绍时LinkedBlockingQueue提到了Queue主要应用于“生产消费模型”的实现。在尝试分析ConcurrentLinkedQueue之前,写了个简陋的“生产消费模型”的实现。分享的同时加深对LinkedBlockingQueue的印象,顺便再说说其特性:
ModelSample是主控,负责生产消费的调度,Worker和Seller分别作为生产者和消费者,并均实现Runnable接口。ModelSample构造器内的参数clear主要是用于:当主控调用线程池的shutdownNow()方法时,会给池内的所有线程发送中断信号,使得线程的中断标志置位。这时候对应的Runnable的run()方法使用响应中断的LinkedBlockingQueue的方法(入队,出队)时就会抛出InterruptedException异常,生产者线程对这个异常的处理是记录信息后终止任务。而消费者线程是记录信息后终止任务,还是消费完队列内的产品再终止任务,则取决于这个选项值。
多线程的一个难点在于适当得销毁线程,这里得益于LinkedBlockingQueue的入队和出队的操作均提供响应中断的API,使得控制起来相对的简单一点。在Worker和Seller中共享LinkedBlockingQueue的实例queue时,我没有使用put或者take在queue满和空状态时无限制的阻塞线程,而是使用offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)在指定的timeout时间内满足条件时阻塞线程。主要因为在于:先中断生产线程的情况下,如果所有的消费线程之前均被扔到等待集,那么无法它们将被唤醒。而后两者在超时后将自行恢复可运行状态。
再者看看queue的size()方法,这也是选择LinkedBlockingQueue而不选ArrayBlockingQueue作为阻塞队列的原因。因为前者使用的AtomicInteger的count.get()返回最新值,完全无锁;而后者则需要获取唯一的锁,在此期间无法进行任何出队,入队操作。而这个例子中clear==true时,主线程和所有的消费线程均需要使用size()方法检查queue的元素个数。这类的非业务操作本就不该影响别的操作,所以这里LinkedBlockingQueue使用AtomicInteger计数无疑是个优秀的设计。
另外编写这个例子时有点玩票的用了CountDownLatch,它的作用很简单。countDown()方法内部计数不为0时,执行了其await()方法的线程将会阻塞等待;一旦计数为0,这些线程将恢复可运行状态继续执行。这里用它就像一个发令枪,线程池submit任务的新线程在run内被阻塞,主线程一声令下countDown!这些生产消费线程均恢复执行状态。最后就是命令的实现过于简陋了,如果要响应其他的命令的话可以改造成响应事件处理的观察者模式,不过它不是演示的重点就从简了。后面就是试着写写ConcurrentLinkedQueue的分析了。
import java.util.Scanner; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * @author: yanxuxin * @date: 2010-1-25 */ public class ModelSample { /** 线程池提交的任务数*/ private final int taskNum = Runtime.getRuntime().availableProcessors() + 1; /** 用于多线程间存取产品的队列*/ private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(16); /** 记录产量*/ private final AtomicLong output = new AtomicLong(0); /** 记录销量*/ private final AtomicLong sales = new AtomicLong(0); /** 简单的线程起步开关*/ private final CountDownLatch latch = new CountDownLatch(1); /** 停产后是否售完队列内的产品的选项*/ private final boolean clear; /** 用于提交任务的线程池*/ private final ExecutorService pool; /** 简陋的命令发送器*/ private Scanner scanner; public ModelSample(boolean clear) { this.pool = Executors.newCachedThreadPool(); this.clear = clear; } /** * 提交生产和消费任务给线程池,并在准备完毕后等待终止命令 */ public void service() { doService(); waitCommand(); } /** * 提交生产和消费任务给线程池,并在准备完毕后同时执行 */ private void doService() { for (int i = 0; i < taskNum; i++) { if (i == 0) { pool.submit(new Worker(queue, output, latch)); } else { pool.submit(new Seller(queue, sales, latch, clear)); } } latch.countDown();//开闸放狗,线程池内的线程正式开始工作 } /** * 接收来自终端输入的终止命令 */ private void waitCommand() { scanner = new Scanner(System.in); while (!scanner.nextLine().equals("q")) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } scanner.close(); destory(); } /** * 停止一切生产和销售的线程 */ private void destory() { pool.shutdownNow(); //不再接受新任务,同时试图中断池内正在执行的任务 while (clear && queue.size() > 0) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("Products:" + output.get() + "; Sales:" + sales.get()); } public static void main(String[] args) { ModelSample model = new ModelSample(false); model.service(); } } /** * 生产者 * @author: yanxuxin * @date: 2010-1-25 */ class Worker implements Runnable { /** 假想的产品*/ private static final String PRODUCT = "Thinkpad"; private final LinkedBlockingQueue<String> queue; private final CountDownLatch latch; private final AtomicLong output; public Worker(LinkedBlockingQueue<String> queue, AtomicLong output,CountDownLatch latch) { this.output = output; this.queue = queue; this.latch = latch; } public void run() { try { latch.await(); // 放闸之前老实的等待着 for (;;) { doWork(); Thread.sleep(100); } } catch (InterruptedException e) { System.out.println("Worker thread will be interrupted..."); } } private void doWork() throws InterruptedException { boolean success = queue.offer(PRODUCT, 100, TimeUnit.MILLISECONDS); if (success) { output.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数 // 可以在此处生成记录日志 } } } /** * 消费者 * @author: yanxuxin * @date: 2010-1-25 */ class Seller implements Runnable { private final LinkedBlockingQueue<String> queue; private final AtomicLong sales; private final CountDownLatch latch; private final boolean clear; public Seller(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear) { this.queue = queue; this.sales = sales; this.latch = latch; this.clear = clear; } public void run() { try { latch.await(); // 放闸之前老实的等待着 for (;;) { sale(); Thread.sleep(500); } } catch (InterruptedException e) { if(clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程 cleanWarehouse(); } else { System.out.println("Seller Thread will be interrupted..."); } } } private void sale() throws InterruptedException { String item = queue.poll(50, TimeUnit.MILLISECONDS); if (item != null) { sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数 // 可以在此处生成记录日志 } } /** * 销售完队列剩余的产品 */ private void cleanWarehouse() { try { while (queue.size() > 0) { sale(); } } catch (InterruptedException ex) { System.out.println("Seller Thread will be interrupted..."); } } }
ModelSample是主控,负责生产消费的调度,Worker和Seller分别作为生产者和消费者,并均实现Runnable接口。ModelSample构造器内的参数clear主要是用于:当主控调用线程池的shutdownNow()方法时,会给池内的所有线程发送中断信号,使得线程的中断标志置位。这时候对应的Runnable的run()方法使用响应中断的LinkedBlockingQueue的方法(入队,出队)时就会抛出InterruptedException异常,生产者线程对这个异常的处理是记录信息后终止任务。而消费者线程是记录信息后终止任务,还是消费完队列内的产品再终止任务,则取决于这个选项值。
多线程的一个难点在于适当得销毁线程,这里得益于LinkedBlockingQueue的入队和出队的操作均提供响应中断的API,使得控制起来相对的简单一点。在Worker和Seller中共享LinkedBlockingQueue的实例queue时,我没有使用put或者take在queue满和空状态时无限制的阻塞线程,而是使用offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)在指定的timeout时间内满足条件时阻塞线程。主要因为在于:先中断生产线程的情况下,如果所有的消费线程之前均被扔到等待集,那么无法它们将被唤醒。而后两者在超时后将自行恢复可运行状态。
再者看看queue的size()方法,这也是选择LinkedBlockingQueue而不选ArrayBlockingQueue作为阻塞队列的原因。因为前者使用的AtomicInteger的count.get()返回最新值,完全无锁;而后者则需要获取唯一的锁,在此期间无法进行任何出队,入队操作。而这个例子中clear==true时,主线程和所有的消费线程均需要使用size()方法检查queue的元素个数。这类的非业务操作本就不该影响别的操作,所以这里LinkedBlockingQueue使用AtomicInteger计数无疑是个优秀的设计。
另外编写这个例子时有点玩票的用了CountDownLatch,它的作用很简单。countDown()方法内部计数不为0时,执行了其await()方法的线程将会阻塞等待;一旦计数为0,这些线程将恢复可运行状态继续执行。这里用它就像一个发令枪,线程池submit任务的新线程在run内被阻塞,主线程一声令下countDown!这些生产消费线程均恢复执行状态。最后就是命令的实现过于简陋了,如果要响应其他的命令的话可以改造成响应事件处理的观察者模式,不过它不是演示的重点就从简了。后面就是试着写写ConcurrentLinkedQueue的分析了。
发表评论
文章已被作者锁定,不允许评论。
-
一道位操作的趣味编程题
2010-03-14 10:50 2080看到一道很有意思的编程题:大厅里有64盏灯,每盏灯都编 ... -
一道字符串截取的编程题
2010-03-11 10:52 2272最近接触到一道字符串截取的编程题:编写一个截取字符串的 ... -
一道多线程趣味热身题
2010-02-28 18:01 1912保持对知识点或者技术的熟悉度对于程序员至关重要,要学会 ... -
疑似Google多线程面试题的Java实现
2010-02-24 17:39 4903来到一个完全陌生的地方,即将一切从新开始,内心兴奋又忐 ... -
Mina的线程池实现分析(2)
2010-02-10 17:31 4516分析了I/O事件的存储,下面看看多个Worker同时工 ... -
Mina的线程池实现分析(1)
2010-02-10 17:28 11571线程池是并发应用中,为了减少每个任务调用的开销增强性能 ... -
多线程基础总结十一--ConcurrentLinkedQueue
2010-02-03 17:52 12835ConcurrentLinkedQueue充分使用了a ... -
多线程基础总结十--LinkedBlockingQueue
2010-01-28 14:33 15368随着多线程基础总结的增多,却明显的感觉知道的越来越少, ... -
号称放倒一片的一道J2SE基础题的个人理解
2010-01-23 14:07 2788近日无意中看到一道Java基础题,号称在接受测试的10 ... -
多线程基础总结九--Mina窥探(1)
2010-01-21 23:46 5393一直以来的多线程的基础总结都是脱离应用的,但是要说多线 ... -
多线程基础总结八--ReentrantReadWriteLock
2010-01-15 23:22 7506说到ReentrantReadWriteLock,首先 ... -
多线程基础总结七--ReentrantLock
2010-01-09 23:17 7672之前总结了部分无锁机制的多线程基础,理想的状态当然是利 ... -
关于atomic问题的一点理解
2009-12-30 16:42 2434之前看到一个帖子是关于atomic使用的,当时没有仔细 ... -
多线程基础总结六--synchronized(2)
2009-12-18 18:45 1865早在总结一时,我就尽量的把synchronized的重点 ... -
多线程基础总结五--atomic
2009-12-17 19:46 3545在简单介绍java.util.c ... -
多线程基础总结四--ThreadLocal
2009-12-16 19:48 2715说到ThreadLocal,首先 ... -
多线程基础总结三--volatile
2009-12-15 20:09 2521前面的两篇总结简 ... -
多线程基础总结二--Thread
2009-12-12 23:27 2663对于Thread来说 ... -
多线程基础总结一--synchronized(1)
2009-12-12 23:23 3066最近写关于并发的小应 ... -
由destory-method引发的IOC容器设计的思考
2009-12-07 16:51 1678第一次读Spring的源 ...
相关推荐
NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943
linkedblockingqueue测试程序
并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解
首先 LinkedBlockingQueue 是一个 “可选且有界” 的阻塞队列实现,你可以根据需要指定队列的大小。 接下来,我将创建一个 LinkedBlockingQueue ,它最多可以包含100个元素: BlockingQueue<Integer> ...
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法
LinkedBlockingQueuejava.pdf
单向链表源码 博文链接:https://uule.iteye.com/blog/1561753
源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...
JDK容器学习之Queue:LinkedBlockingQueue
主要介绍了详细分析Java并发集合LinkedBlockingQueue的用法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
主要介绍了java中LinkedBlockingQueue与ArrayBlockingQueue的异同,需要的朋友可以参考下
【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 3.java内存模型以及happens-before规则 4.彻底理解synchronized ...27.一篇文章,让你彻底弄懂生产者--消费者问题
NULL 博文链接:https://kanpiaoxue.iteye.com/blog/2101309
主要重点是在不牺牲可读性的前提下实现简单性和高性能。 实际上,我试图提供有关代码的良好文档以及一些用法示例。 提供的队列 ArrayBlockingQueue :由切片支持的有界阻塞队列 LinkedBlockingQueue :由容器/列表...
Java线程池的几种实现方法和区别介绍 使用:LinkedBlockingQueue实现线程池讲解
-> Collection(集合接口)-> List(列表,线性表接口) :ArrayList、LinkedList-> Set(元素不重复的集合接口):HashSet、TreeSet-> Queue(队列): ArrayBlockingQueue、LinkedBlockingQueue-> Deque(双端队列):...
hashmap如何解决hash冲突,为什么hashmap中的链表需要转成红黑树? hashmap什么时候会触发扩容? jdk1.8之前并发操作...LinkedBlockingQueue、DelayQueue是如何实现的? CopyOnWriteArrayList是如何保证线程安全的?
│ 高并发编程第一阶段26讲、多线程下的生产者消费者模型,以及详细介绍notifyAll方法.mp4 │ 高并发编程第一阶段27讲、wait和sleep的本质区别是什么,深入分析(面试常见问题).mp4 │ 高并发编程第一阶段28讲、...