基于阻塞队列可以分容易实现生产者消费者模式
基本思路
生产者:负责生产对象,并放入阻塞队列
消费者:while true线程,阻塞的从阻塞队列中获取对象 并处理。
应用场景
服务器段分发器的处理、消息队列实现等等
核心组件
核心组件为JDK提供的阻塞队列,LinkedBlockingQueue
下面一个简单的例子
生产者
package com.gbcom.java.blockqueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生产者线程。
*
* @author SYZ
* @date 2016-12-8 下午02:07:34
* @version 1.0.0
* @see com.gbcom.java.blockqueue.Producer
*/
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("启动生产者线程!");
try {
while (isRunning) {
System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (queue.size() >= 5) {
System.out
.println("/***************** clear**********************/");
queue.clear();
}
queue.put(data);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
}
public void stop() {
isRunning = false;
}
}
消费者
package com.gbcom.java.blockqueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 消费线程
*
* @author SYZ
* @date 2016-12-8 下午02:07:24
* @version 1.0.0
* @see com.gbcom.java.blockqueue.Consumer
*/
public class Consumer implements Runnable {
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
private static AtomicInteger count = new AtomicInteger();
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("启动消费者线程!");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正从队列获取数据...");
String data = queue.take();
if (null != data) {
System.out.println("拿到数据:" + data + " : queue size = "
+ queue.size());
System.out.println(Thread.currentThread().getName()
+ " - 正在消费数据:" + data + "::::consumer times="
+ count.incrementAndGet());
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
}
客户端
package com.gbcom.java.blockqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 生产者消费者客户端
*
* @author syz
* @date 2014-7-2
* @version v1.0.0
* @see com.gbcom.java.blockqueue.BlockingQueueClient
*/
public class BlockingQueueClient {
public static void main(String[] args) throws InterruptedException {
// 声明一个容量为10的缓存队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// Consumer consumer2 = new Consumer(queue);
// Consumer consumer3 = new Consumer(queue);
// 借助Executors
ThreadPoolExecutor service = (ThreadPoolExecutor) Executors
.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
// service.execute(consumer2);
// service.execute(consumer3);
service.execute(consumer);
// 执行10s
Thread.sleep(10 * 1000);
System.out.println("active count = " + service.getActiveCount());
// producer1.stop();
// producer2.stop();
// producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
分享到:
相关推荐
阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip
NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943
消息分发框架,基于java阻塞队列实现,生产者消费者模型 可用于任务分发,服务器消息消息,以及网络IO 性能优化,多线程
假设M个生产者和N个消费者共享一个具有K(K大于1)个缓冲区的循环缓冲结构BUFFER(提示:可以用一个循环队列或一个整型数组来表示),并设置两个指针IN和OUT,其中IN指向生产者线程当前可用的空缓冲区的在BUFFER中的...
那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者...
源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...
实现java模拟阻塞队列的例子,该代码包括,阻塞队列实现生产者,消费者。和模拟阻塞队列实现生产者及消费者模式,帮助你更好的理解java多线程
实验题目: 生产者与消费者(综合性实验) 实验环境: C语言编译器 实验内容: ① 由用户指定要产生的进程及其类别,存入进入就绪队列。 ② 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则...
熟练应用生产者消费者PV操作的实验, 实验内容 1. 由用户指定要产生的进程及其类别,存入进入就绪队列。 2. 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则进入相应的等待队列,调度程序调度...
通过实验模拟生产者与消费者之间的关系,了解并掌握他们之间的关系及其原理。由此增加对进程同步的问题的了解。 实验要求: 每个进程有一个进程控制块(PCB)表示。进程控制块可以包含如下信息:进程
一、原理 生产者线程: while (true) ...可以选择菜单项“开启线程->加快(减慢)生产,减慢(加快)消费”来随机调整生产和消费的时间,以观察生产者或消费者线程阻塞的状况。 程序在VC++6.0下编译通过。
环形队列,不加锁的生产者消费者模式,使用前提:1,缓冲区设置足够大,2,消费保证足够快
实验题目: 生产者与消费者(综合性实验) 实验环境: C语言编译器 实验内容: ① 由用户指定要产生的进程及其类别,存入进入就绪队列。 ② 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则...
生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不直接找生产者要数据,而是从阻塞队列里取,阻塞队列就相当于一
本次课程设计选到的题目为生产者消费者算法模拟,通过需求分析和资料搜寻,掌握到生产者/消费者的模式原理和优点,同时也了解到了几种可以实现生产者消费者的方式,如信号量方式,管程方式,阻塞队列方式等。
实验内容: ...一个就绪队列(ready),两个等待队列:生产者等待队列(producer);消费者等待队列(consumer)。一个链表(over),用于收集已经运行结束的进程 本程序通过函数模拟信号量的原子操作。
使用非阻塞队列的时候有一个很大问题是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,必须额外地实现同步策略以及线程间唤醒策略,这个实现起来非常麻烦。但是有了阻塞队列不一样了,它会
生产者和消 费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不 用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队 列里取,阻塞队列就相当于一个缓冲区...
注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择。 特征 击倒你的。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量的线程同时使用。 C ++ 11实现-尽...
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。 考虑下,这样一个多线程模型,程序有一个...