`
assertmyself
  • 浏览: 28630 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

生产者消费者模式,基于阻塞队列

阅读更多
基于阻塞队列可以分容易实现生产者消费者模式

基本思路
生产者:负责生产对象,并放入阻塞队列
消费者:while true线程,阻塞的从阻塞队列中获取对象 并处理。


应用场景
服务器段分发器的处理、消息队列实现等等

核心组件
核心组件为JDK提供的阻塞队列,LinkedBlockingQueue


下面一个简单的例子
生产者
package com.gbcom.java.blockqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 生产者线程。
 * 
 * @author SYZ
 * @date 2016-12-8 下午02:07:34
 * @version 1.0.0
 * @see com.gbcom.java.blockqueue.Producer
 */
public class Producer implements Runnable {

	private volatile boolean isRunning = true;
	private BlockingQueue queue;
	private static AtomicInteger count = new AtomicInteger();
	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;

	public Producer(BlockingQueue queue) {
		this.queue = queue;
	}

	public void run() {
		String data = null;
		Random r = new Random();

		System.out.println("启动生产者线程!");
		try {
			while (isRunning) {
				System.out.println("正在生产数据...");
				Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

				data = "data:" + count.incrementAndGet();
				System.out.println("将数据:" + data + "放入队列...");
				if (queue.size() >= 5) {
					System.out
							.println("/***************** clear**********************/");
					queue.clear();
				}
				queue.put(data);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			System.out.println("退出生产者线程!");
		}
	}

	public void stop() {
		isRunning = false;
	}

}





消费者
package com.gbcom.java.blockqueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消费线程
 * 
 * @author SYZ
 * @date 2016-12-8 下午02:07:24
 * @version 1.0.0
 * @see com.gbcom.java.blockqueue.Consumer
 */
public class Consumer implements Runnable {
	private BlockingQueue<String> queue;
	private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
	private static AtomicInteger count = new AtomicInteger();

	public Consumer(BlockingQueue<String> queue) {
		this.queue = queue;
	}

	public void run() {
		System.out.println("启动消费者线程!");
		Random r = new Random();
		boolean isRunning = true;
		try {
			while (isRunning) {
				System.out.println("正从队列获取数据...");
				String data = queue.take();
				if (null != data) {
					System.out.println("拿到数据:" + data + "  : queue size = "
							+ queue.size());
					System.out.println(Thread.currentThread().getName()
							+ " - 正在消费数据:" + data + "::::consumer times="
							+ count.incrementAndGet());
					Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
				} else {
					isRunning = false;
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
			Thread.currentThread().interrupt();
		} finally {
			System.out.println("退出消费者线程!");
		}
	}

}



客户端
package com.gbcom.java.blockqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 生产者消费者客户端
 * 
 * @author syz
 * @date 2014-7-2
 * @version v1.0.0
 * @see com.gbcom.java.blockqueue.BlockingQueueClient
 */
public class BlockingQueueClient {

	public static void main(String[] args) throws InterruptedException {
		// 声明一个容量为10的缓存队列
		BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

		Producer producer1 = new Producer(queue);
		Producer producer2 = new Producer(queue);
		Producer producer3 = new Producer(queue);
		Consumer consumer = new Consumer(queue);
		// Consumer consumer2 = new Consumer(queue);
		// Consumer consumer3 = new Consumer(queue);

		// 借助Executors
		ThreadPoolExecutor service = (ThreadPoolExecutor) Executors
				.newCachedThreadPool();
		// 启动线程
		service.execute(producer1);
		service.execute(producer2);
		service.execute(producer3);
		// service.execute(consumer2);
		// service.execute(consumer3);
		service.execute(consumer);

		// 执行10s
		Thread.sleep(10 * 1000);

		System.out.println("active count = " + service.getActiveCount());
		// producer1.stop();
		// producer2.stop();
		// producer3.stop();
		Thread.sleep(2000);
		// 退出Executor
		service.shutdown();
	}
}
分享到:
评论

相关推荐

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    消息分发框架,基于java阻塞队列实现,生产者消费者模型 可用于任务分发,服务器消息消息,以及网络IO 性能优化,多线程

    生产者与消费者问题

    假设M个生产者和N个消费者共享一个具有K(K大于1)个缓冲区的循环缓冲结构BUFFER(提示:可以用一个循环队列或一个整型数组来表示),并设置两个指针IN和OUT,其中IN指向生产者线程当前可用的空缓冲区的在BUFFER中的...

    JAVA多线程之生产者消费者模型.docx

    那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者...

    【Java】Queue、BlockingQueue和队列实现生产者消费者模式

    源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...

    java模拟阻塞队列

    实现java模拟阻塞队列的例子,该代码包括,阻塞队列实现生产者,消费者。和模拟阻塞队列实现生产者及消费者模式,帮助你更好的理解java多线程

    生产者与消费者 进程的同步与互斥模拟

    实验题目: 生产者与消费者(综合性实验) 实验环境: C语言编译器 实验内容: ① 由用户指定要产生的进程及其类别,存入进入就绪队列。   ② 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则...

    操作系统实验 生产者消费者 PV操作

    熟练应用生产者消费者PV操作的实验, 实验内容 1. 由用户指定要产生的进程及其类别,存入进入就绪队列。  2. 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则进入相应的等待队列,调度程序调度...

    操作系统实验:生产者与消费者.cpp

    通过实验模拟生产者与消费者之间的关系,了解并掌握他们之间的关系及其原理。由此增加对进程同步的问题的了解。 实验要求: 每个进程有一个进程控制块(PCB)表示。进程控制块可以包含如下信息:进程

    使用VC++6.0实现的“操作系统”课程中的生产者-消费者问题

    一、原理 生产者线程: while (true) ...可以选择菜单项“开启线程-&gt;加快(减慢)生产,减慢(加快)消费”来随机调整生产和消费的时间,以观察生产者或消费者线程阻塞的状况。 程序在VC++6.0下编译通过。

    不加锁、非阻塞模式的环形队列

    环形队列,不加锁的生产者消费者模式,使用前提:1,缓冲区设置足够大,2,消费保证足够快

    生产者与消费者 进程调度模拟(c++)

    实验题目: 生产者与消费者(综合性实验) 实验环境: C语言编译器 实验内容: ① 由用户指定要产生的进程及其类别,存入进入就绪队列。   ② 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则...

    【每日爬虫】:生产者与消费者模式爬取王者荣耀壁纸

    生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不直接找生产者要数据,而是从阻塞队列里取,阻塞队列就相当于一

    基于Java实现生产者与消费者算法模拟【100010232】

    本次课程设计选到的题目为生产者消费者算法模拟,通过需求分析和资料搜寻,掌握到生产者/消费者的模式原理和优点,同时也了解到了几种可以实现生产者消费者的方式,如信号量方式,管程方式,阻塞队列方式等。

    操作系统生产者与消费者(综合性实验)北林

    实验内容: ...一个就绪队列(ready),两个等待队列:生产者等待队列(producer);消费者等待队列(consumer)。一个链表(over),用于收集已经运行结束的进程 本程序通过函数模拟信号量的原子操作。

    Java并发编程:阻塞队列

     使用非阻塞队列的时候有一个很大问题是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,必须额外地实现同步策略以及线程间唤醒策略,这个实现起来非常麻烦。但是有了阻塞队列不一样了,它会

    计算机操作系统课程设计报告《生产者---消费者问题》.doc

    生产者和消 费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不 用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队 列里取,阻塞队列就相当于一个缓冲区...

    concurrentqueue:C ++ 11的快速多生产者,多消费者,无锁的并发队列

    注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择。 特征 击倒你的。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量的线程同时使用。 C ++ 11实现-尽...

    利用C++如何实现一个阻塞队列详解

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。 考虑下,这样一个多线程模型,程序有一个...

Global site tag (gtag.js) - Google Analytics