生产者
package producer_customer; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<Data> queue; private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME = 10000; public Producer(BlockingQueue<Data> queue) { this.queue = queue; } public void stop() { isRunning = false; } @Override public void run() { Data data = null; Random random = new Random(); System.out.println("start producer id = " + Thread.currentThread().getId()); try { while (isRunning) { Thread.sleep(random.nextInt(SLEEPTIME)); data = new Data(count.incrementAndGet()); System.out.println("Put " + data.getIntData() + " is put into queue"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("failed to put data: " + data); } } } catch (Exception e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
消费者
package producer_customer; import java.util.Random; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue<Data> queue; private static final int SLEEPTIME = 10000; public Consumer(BlockingQueue<Data> queue) { this.queue = queue; } @Override public void run() { System.out.println("start consumer id = " + Thread.currentThread().getId()); Random random = new Random(); try { while (true) { Data data = queue.take(); if (null != data) { int result = data.getIntData() * data.getIntData(); System.out.println("Take " + data.getIntData() + " from queue, and result = " + result); Thread.sleep(random.nextInt(SLEEPTIME)); } } } catch (Exception e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } }
数据
package producer_customer; public final class Data { private final int intData; public Data(int d) { this.intData = d; } public Data(String s) { this.intData = Integer.valueOf(s); } public int getIntData() { return intData; } }
测试类
package producer_customer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class Test { public static void main(String[] args) throws InterruptedException { BlockingQueue<Data> queue = new LinkedBlockingDeque<Data>(10); Producer p1 = new Producer(queue); Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Producer p4 = new Producer(queue); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); Consumer c4 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(p4); service.execute(c1); service.execute(c2); service.execute(c3); service.execute(c4); Thread.sleep(10000); p1.stop(); p2.stop(); p3.stop(); Thread.sleep(10000); service.shutdown(); } }
相关推荐
非常好的程序资源,一秒钟一千多条!
(注意,本资源附带书中源代码可供参考) 多线程与并发处理是程序设计好坏优劣的重要课题,本书通过浅显易懂的文字与实例来介绍Java线程相关的设计模式概念,并且通过实际的Java程序范例和 UML图示来一一解说,书中...
java的多线程示例(生产者消费者问题)
目录: 漫谈UML Introduction 1 Java语言的线程 Introduction 2 多线程...总结 多线程程序设计的模式语言 附录A 练习问题的解答 附录B Java的内存模型 附录C Java线程的优先级 附录D 线程相关的主要API 附录E 参考文献
Java多线程模式详解 目录: 一、漫谈UML Java语言的线程 多线程的评量标准 二、 1、Single Threaded Execution ———— 能通过这座桥的,只有一个人 2、Immutable ———— 想破坏它也没办法 3、Guarded ...
producer_consumer_using_multithreading_in_java 用Java实现的经典生产者消费者问题的多线程解决方案
采用2个producer和一个Consumer把整数放入到(或者从中取出)一个环形缓冲Circle Buffer中,这块连续的存储会被反复使用。一个写指针指向数据写入的位置,如果写到Buffer的末端,则从Buffer的头端开始写。类似的,一...
ThreadImRunnable.java 继承Runnable接口实现多线程 mulThread.java 创建多个线程对象的类 demoJoin.java 演示使用join()以确保主线程最后结束 clicker.java 一个计数用的线程类 demoPri.java 调用上面这个类...
SynchronizedQueue,一个可以被多个线程使用的队列。 主线程 DiskSearcher 启动其余线程并配置搜索的后缀、根和结果路径。 Scoter,侦察所有子目录的根目录。 搜索器,扫描某个目录的文件以查找具有某个后缀的...
消费者生产者产品管理系统 这是消费者生产者产品管理系统的简单模型。 它的工作方式类似于Amazon,Flipkart,Myntra等在线购物网站。 请按照以下步骤运行项目:通过执行命令:“ ... 关于项目:此作业全部与多线程有关
基于队列实现的生产消费者模式java 源码,并且采用多线程进行消费
│ 高并发编程第二阶段31讲、多线程Producer and Consumer设计模式.mp4 │ 高并发编程第二阶段32讲、多线程Count Down设计模式.mp4 │ 高并发编程第二阶段33讲、多线程Thread-Per-Message设计模式.mp4 │ 高并发...
│ 高并发编程第二阶段31讲、多线程Producer and Consumer设计模式.mp4 │ 高并发编程第二阶段32讲、多线程Count Down设计模式.mp4 │ 高并发编程第二阶段33讲、多线程Thread-Per-Message设计模式.mp4 │ 高并发...
生产者-消费者模式下的多线程读/写方案,对应PRODUCERCONSUMER执行模式。 基于Disruptor的生产者-消费者模式下的多线程读/写方案,对于DISRUPTOR执行模式。 程序目录结构 com.daoqidlv.filespilt —— 公共类,及...
//唤醒Consumer线程. } } <br>class Producer implements Runnable{ Q q=null; public Producer(Q q){ this.q=q; } public void run(){ int i=0; while(true){ if...
流量控制分布式系统项目-针对多生产者和多消费者的多线程服务器T中的缓冲区的流控制开始吧git clone [this_repository_url] 打开Eclipse IDE并添加存储库开始编码用法转到src位置运行服务器java fr / dauphine / sar...
java中的涉及到生产者 消费者的多线程操作问题
多线程练习 Java MultiThread练习代码,包括ReaderWriter,BlockingQueue,ProducerConsumer等的实现。
解决方案是使生产者进入睡眠状态,当消费者移除一件或多件物品时将其唤醒。 类似地,当消费者想要从缓冲区中删除一个项目并看到缓冲区为空时,它将进入睡眠状态,直到生产者将某些东西放入缓冲区中并唤醒消费者为止...
消费者对于KafkaConsumer而言,它不是线程安全的,所以实现多线程时通常由两种实现方法:每个线程维护一个KafkaConsumer维护一个或多个KafkaConsumer,并用一个任务线程池处理任务优缺点分析:方法一:实现简单(即...