`
zengshaotao
  • 浏览: 752662 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

生产者消费者

    博客分类:
  • java
 
阅读更多

package function.thread;

 

import java.util.ArrayList;

import java.util.Arrays;

import java.util.List;

import java.util.Random;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

功能:三个生产者,往容量最大为5的容器里put数据

三个消费者从容器中取数据。只要容器不为空,就可以取数据,

只要容器不满,就可以存放数据

容器里不能拥有相同的数据

*/

public class ConditionTest {

 

public static void main(String args[]) {

 

final String goonFlag[] = {"true"};

final BoundedBuffer bf = new BoundedBuffer();

System.out.println("**************************************** main thread begin********************************");

//刚开始是抢占式的,take和put线程都可能先执行

for (int i = 0; i < 3; i++) {

new Thread(" put thread "+i) {

public void run() {

try {

//这里是匿名的内部类,所以要使用final变量

while(goonFlag[0].equals("true")){

bf.put(new Random().nextInt(100));

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}.start();

 

new Thread(" take thread "+i) {

public void run() {

try {

while(goonFlag[0].equals("true")){

try {

bf.take();

} catch (Exception e) {

e.printStackTrace();

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

}.start();

}//for block,new thread

 

try {

System.out.println("**************************************** main thread sleep********************************");

Thread.sleep(15000);

goonFlag[0]= "false";

} catch (InterruptedException e) {

e.printStackTrace();

}

}//main method

}

 

class BoundedBuffer {

final Lock lock = new ReentrantLock();// 锁对象

final Condition putCond = lock.newCondition();// 队列未满,写线程可以满足执行条件

final Condition takeCond = lock.newCondition();// 读线程条件

 

final List items = new ArrayList();// 缓存队列

 

public void put(Object x) throws InterruptedException {

String threadName = Thread.currentThread().getName();

System.out.println("############### 【"+threadName+"】 come in put method...... ");

lock.lock();

 

System.out.println("############### 【"+threadName+"】 get lock and sleep 500 ms ");

Thread.sleep(500);

System.out.println("############### 【"+threadName+"】wake up。。。。。。 ");

try {

if (items.size()>=5){

System.out.println("############### 【"+threadName+"】 stack is full and await ");

takeCond.signalAll();//队列已经满了,读线程可以进行读取了

// 阻塞写线程

//一个线程被阻塞,从业务角度理解,它的使命已经完成

//如果要重新参与作业,就要重新参与线程的竞争

putCond.await();

}else{

if(!items.contains(x)){

items.add(x);// 赋值

System.out.println("############### 【"+threadName+"】 put: "+x);

System.out.println("############### 【"+threadName+"】 list length "+items.size()+", list content:"+items.toString());

takeCond.signalAll();//队列不为空,读线程可以进行读取了

}else{

System.out.println("############### 【"+threadName+"】 put same: "+x);

System.out.println("############### 【"+threadName+"】 list content:"+items.toString());

}

 

}

 

} finally {

System.out.println("############### 【"+threadName+"】 release the lock ");

lock.unlock();

}

}

 

public Object take() throws Exception {

String threadName = Thread.currentThread().getName();

System.out.println("############### 【"+threadName+"】 come in take method...... ");

lock.lock();

//int listSize = items.size();不能使用局部变量,因为如果使用,得到的可能是await之前的值

//await期间,该值可能已经被修改

System.out.println("**************** 【"+threadName+"】 get a lock and sleep 500ms ");

Thread.sleep(500);

System.out.println("**************** 【"+threadName+"】 wake up。。。。。。 ");

try {

if (items.size() == 0){

System.out.println("**************** 【"+threadName+"】 stack is empty and await ");

putCond.signalAll();// 队列为空,就可以继续写入,也就是可以唤醒写线程

//当前线程调用了condition,被打上了相应的标签

//阻塞读线程。这里需要注意,被贴上了参与共享资源标签的线程在被唤醒后,

//可能会和还未打上标签的线程竞争。还有一种就是被打上了标签之后的多个线程同时竞争

takeCond.await();

return null;

}else{

Object x = items.get(items.size()-1);// 取值

System.out.println("**************** 【"+threadName+"】 get the value: "+x);

items.remove(x);

System.out.println("**************** 【"+threadName+"】 remain list : "+items.toString());

putCond.signalAll();// 队列未满,就可以继续写入,也就是可以唤醒写线程

return x;

}

 

 

}catch(Exception e){

e.printStackTrace();

return null;

}finally {

System.out.println("**************** 【"+threadName+"】 release the lock ");

lock.unlock();

}

}

}

分享到:
评论

相关推荐

    Linux下生产者消费者问题的实现

    利用互斥锁和计数信号完成生产者消费者问题 一组生产者进程和一组消费者进程共享一个初始为空、大小为n的缓冲区,只有缓冲区没满时,生产者才把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中...

    生产者消费者问题

    生产者消费者问题,C++。生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者...

    python实现生产者消费者并发模型

    多线程实现生产者消费者模型:锁(Lock)、信号量(Semaphore、BoundedSemaphore)、条件(Condition)、队列(Queue)、事件(Event) 多进程程实现生产者消费者模型:信号量(Semaphore)、条件(Condition)、...

    C语言实现生产者消费者问题

    C语言实现生产者消费者问题,分配具有n个缓冲区的缓冲池,作为共享资源。 定义两个资源型信号量empty 和full,empty信号量表示当前空的缓冲区数量,full表示当前满的缓冲区数量。 定义互斥信号量mutex,当某个进程...

    操作系统课程设计——生产者消费者问题Java图形界面动态演示

    设计目的:通过研究Linux 的进程机制和信号量实现生产者消费者问题的并发控制。说明:有界缓冲区内设有20 个存储单元,放入/取出的数据项设定为1‐20 这20 个整型数。设计要求:1)每个生产者和消费者对有界缓冲区...

    生产者消费者问题(信号量)(linux)实现代码

    参考教材中的生产者消费者算法,创建5个进程,其中两个进程为生产者进程,3个进程为消费者进程。一个生产者进程试图不断地在一个缓冲中写入大写字母,另一个生产者进程试图不断地在缓冲中写入小写字母。3个消费者...

    生产者消费者问题总结

    生产者消费者问题总结 信号量概念总结 经典生产者-消费者问题 较为复杂的生产者-消费者问题 华南理工大学生产者和消费者问题 个人总结

    操作系统实验(生产者消费者问题)

    实验四、生产者消费者问题(15分) • 一个大小为3的缓冲区,初始为空 • 2个生产者 – 随机等待一段时间,往缓冲区添加数据, – 若缓冲区已满,等待消费者取走数据后再添加 – 重复6次 • 3个消费者 – ...

    生产者消费者程序的实现

    生产者消费者的实现。可以自主地改变生产者,消费者的数目,和缓冲区。

    华南理工大学操作系统实验:生产者消费者问题

    参考教材中的生产者消费者算法,创建5个进程,其中两个进程为生产者进程,3个进程为消费者进程。一个生产者进程试图不断地在一个缓冲中写入大写字母,另一个生产者进程试图不断地在缓冲中写入小写字母。3个消费者...

    操作系统 课程设计 实现生产者消费者(Bounded – Buffer Problem)问题

    通过研究Linux的线程机制和信号量实现生产者消费者(Bounded Buffer)问题的并发控制。 实验条件要求:每人一台与Linux主机联网的Windows主机,普通用户权限。 (1) 每个生产者和消费者对有界缓冲区进行操作后,即时...

    线程问题生产者消费者流程图

    生产者消费者流程图; 生产者消费者流程图。

    生产者消费者问题 MFC 实现

    多线程同步互斥 生产者消费者问题 MFC 实现

    生产者 消费者 模式 c++

    生产者 消费者 模式 c++ 算是老外写的一个使用demo 可以参考一下

    多线程代码 经典线程同步互斥问题 生产者消费者问题

    a: 创建一个线程 ...h: problem1 生产者消费者问题 (1生产者 1消费者 1缓冲区) problem1 more 生产者消费者问题 (1生产者 2消费者 4缓冲区) problem2 读者与写着问题 I: 信号量 semaphore 解决线程同步问题

    生产者消费者图形化演示

    操作系统实验课实现的生产者消费者模型图形化演示。通过“企鹅吃苹果”这个小故事吧,苹果是生产者生产出来的,而企鹅是消费者。可以调节生产和消费的速度,也可以暂停程序方便演示。用C++实现的,使用的是纯粹的API...

    进程线程之间的同步生产者消费者信号量读者写者写者优先

    生产者消费者问题(信号量+mutex) 参考教材中的生产者消费者算法,创建5个进程,其中两个进程为生产者进程,3个进程为消费者进程。一个生产者进程试图不断地在一个缓冲中写入大写字母,另一个生产者进程试图不断地...

Global site tag (gtag.js) - Google Analytics