一、背景
在并发编程中生产者-消费者模式是一个典型的问题。是数据共享简单而有效的手段之一。下面是这个模式的一个简单示例
二、代码简介
多个数据生产者将数据存入缓冲区,一个或者多个数据消费者将数据从缓冲区取走
package com.two; import java.util.Date; import java.util.LinkedList; import java.util.List; public class EventStorage { public int maxSize; private List<Date> storage; public EventStorage(){ maxSize=10; storage=new LinkedList<Date>(); } public synchronized void set(){ while(storage.size()>=10){ try{ wait(); }catch(InterruptedException ex){ ex.printStackTrace(); } } storage.add(new Date()); System.out.println("set storage size :"+storage.size()); notifyAll(); } public synchronized void get(){ while(storage.size()<=0){ try{ wait(); }catch(InterruptedException e){ e.printStackTrace(); } } System.out.println("get "+((LinkedList<Date>)storage).poll()); notifyAll(); } }
package com.two; import java.util.Random; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable{ private EventStorage storage; public Consumer(EventStorage storage){ this.storage=storage; } @Override public void run() { while(true){ try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" storage.get.. "); storage.get(); } } }
package com.two; import java.util.Random; import java.util.concurrent.TimeUnit; public class Producer implements Runnable { private EventStorage storage; public Producer(EventStorage storage){ this.storage=storage; } @Override public void run() { while(true){ try { TimeUnit.SECONDS.sleep(new Random().nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" storage.set "); storage.set(); } } }
package com.two; /** * 生产者消费者主程序 * * @author yebing.li * @version $Id: Main.java, v 0.1 2014年7月25日 上午11:04:42 yebing.li Exp $ */ public class Main { public static void main(String[] args) { EventStorage storage=new EventStorage(); Producer producer=new Producer(storage); Thread thread1=new Thread(producer); Consumer consumer=new Consumer(storage); Thread thread2=new Thread(consumer); thread1.start(); thread2.start(); } }
三、另一种实现方式java.util.concurrent.locks.Condition
package com.two; import java.util.Random; public class FileMock { private String[] content; private int index; public FileMock(int size,int length){ content=new String[size]; for(int i=0;i<size;i++){ StringBuilder buffer=new StringBuilder(); for(int j=0;j<length;j++){ int indice=new Random().nextInt(1000); buffer.append(indice); } content[i]=buffer.toString(); } index=0; } public boolean hasMoreLines(){ return index<content.length; } public String getLine(){ if(hasMoreLines()){ return content[index++]; } return null; } }
package com.two; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Buffer { private LinkedList<String> buffer; private int maxSize; private ReentrantLock lock; private Condition lines; private Condition space; private boolean peedingLines; public Buffer(int maxSize){ this.maxSize=maxSize; buffer=new LinkedList<String>(); lock=new ReentrantLock(); lines=lock.newCondition(); space=lock.newCondition(); peedingLines=true; } public void insert(String line){ lock.lock(); try{ while(buffer.size()>=maxSize){ space.await(); } buffer.offer(line); lines.signalAll(); }catch(InterruptedException e){ e.printStackTrace(); }finally{ lock.unlock(); } } public String get(){ String line=null; lock.lock(); try{ while(buffer.size()==0&&hasPeedingLines()){ lines.await(); } if(hasPeedingLines()){ line=buffer.poll(); space.signalAll(); } }catch(InterruptedException e){ e.printStackTrace(); }finally{ lock.unlock(); } return line; } public void setPeedingLines(boolean peedingLines){ this.peedingLines=peedingLines; } public boolean hasPeedingLines(){ return peedingLines||buffer.size()>0; } }
package com.two; public class Producerfile implements Runnable{ private FileMock mock; private Buffer buffer; public Producerfile(FileMock mock,Buffer buffer){ this.mock=mock; this.buffer=buffer; } @Override public void run() { buffer.setPeedingLines(true); while(mock.hasMoreLines()){ String line=mock.getLine(); System.out.println("Producerfile,content:"+line); buffer.insert(line); } buffer.setPeedingLines(false); } }
package com.two; public class ConsumerFile implements Runnable{ private Buffer buffer; public ConsumerFile(Buffer buffer){ this.buffer=buffer; } @Override public void run() { while(buffer.hasPeedingLines()){ String line=buffer.get(); System.out.println(String.format("ConsumerFile content:%s", line)); } } }
package com.two; public class FileMockMain { public static void main(String[] args) { FileMock mock=new FileMock(100,10); Buffer buffer=new Buffer(20); Producerfile producer=new Producerfile(mock,buffer); Thread threadProducer=new Thread(producer,"Producer"); ConsumerFile consumers[]=new ConsumerFile[3]; Thread threadConsumer[]=new Thread[3]; for(int i=0;i<3;i++){ consumers[i]=new ConsumerFile(buffer); threadConsumer[i]=new Thread(consumers[i],"consumer"+i); } threadProducer.start(); for(int i=0;i<3;i++){ threadConsumer[i].start(); } } }
代码来源于《java7并发编程手册》
相关推荐
RocketMQ生产者和消费者Java代码示例
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
主要介绍了Java实现Kafka生产者消费者代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
主要介绍了Java实现简易生产者消费者模型过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
主要介绍了Java多线程生产者消费者模式实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
主要介绍了JAVA线程同步的代码学习示例,大家参考使用吧
producer_consumer.java 演示生产者-消费者线程 consumer.java 消费者线程 producer.java 生产者线程 common.java 公有类 第9章 示例描述:本章学习运行时类型识别。 Candy.java 一个用来测试的简单类 ...
主要介绍了kafka生产者和消费者的javaAPI的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
整理的Kafka示例代码,包括集群上的生产者/消费者的Java示例代码,以及Scala编写的示例
主要介绍了Java多种方式实现生产者消费者模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
主要介绍了Java多线程 生产者消费者模型实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
Java 中 Akka 的简单生产者消费者示例 此存储库包含 3 个简单网络爬虫的示例: 一个连续的例子 将逻辑拆分为 3 个 Actor 的示例 页面的检索由多个 Actor 并行处理的示例。 检索失败且应用程序挂起的示例 重新发送...
spring cloud微服务框架demo完整可用2版 比第一版多集成了mybatis...(注册中心+生产者+消费者+feign负载均衡+hystrix断路器+仪表盘+gate路由网关+config配置中心+mybatis+oracle+mybatisPlus generator代码自动生成)
主要介绍了Java多线程 BlockingQueue实现生产者消费者模型详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
面向对象思想 单子模式 多线程示例银行取款问题 多线程示例生产者和消费者问题 编程实现序列化
主要介绍了Java多线程并发生产者消费者设计模式实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
实现java模拟阻塞队列的例子,该代码包括,阻塞队列实现生产者,消费者。和模拟阻塞队列实现生产者及消费者模式,帮助你更好的理解java多线程
主要介绍了Java多线程中不同条件下编写生产消费者模型方法介绍,介绍了生产消费者模型,然后分享了相关代码示例,具有一定参考价值,需要的朋友可以了解下。
代码里面包含一个并发4个线程同时运行 全部开始 全部停止 单个停止还有点问题。 还有生产者消费者 里面的里面能帮助你理解多线程的运用!
此存储库包括演示如何使用 Java Kafka 生产者和消费者的项目。 您可以在 Confluent Platform 文档的找到代码的详细说明。 构建生产者项目 $ cd producer $ mvn clean package 构建消费者项目 $ cd consumer $ mvn...