有几种实现方法,一种是通过临界缓冲区的wait和notify来协调多个线程的并发,一种可以借用jdk 1.5+自带的BlockingQueue来实现,还有一种可以通过jdk1.5+的信号量机制来控制并发。
jdk1.5- 采用Object的wait 和notify方法来实现:
package com.xx.concurrent.commonUse; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; import com.fangming.pub.StringUtils; class Productor extends Thread { Queue<String> buffer; int quality; public Productor(Queue<String> buffer, int quality) { this.buffer = buffer; this.quality = quality; } @Override public void run() { try { product(); } catch (InterruptedException e) { e.printStackTrace(); } ProductorAndCustomer.latch.countDown(); } private void product() throws InterruptedException { synchronized (buffer) { while (quality > 0) { if (buffer.size() == ProductorAndCustomer.BUFFERSIZE) { buffer.wait(); } else { String str = StringUtils.getRandomString(10); buffer.offer(str); quality--; System.out.println("####producer product " + str); buffer.notify(); } } } } } class Customer extends Thread { Queue<String> buffer; int quality; public Customer(Queue<String> buffer, int quality) { this.buffer = buffer; this.quality = quality; } @Override public void run() { try { cusume(); } catch (InterruptedException e) { e.printStackTrace(); } ProductorAndCustomer.latch.countDown(); } private void cusume() throws InterruptedException { synchronized (buffer) { while (quality > 0) { if (buffer.size() == 0) { buffer.wait(); } else { String str = buffer.poll(); System.out.println("$$$$customer cocume " + str); quality--; buffer.notify(); } } } } } public class ProductorAndCustomer { static final int BUFFERSIZE = 10; static CountDownLatch latch = new CountDownLatch(15); public static void main(String[] args) throws InterruptedException { long startTime = System.nanoTime(); Queue<String> buffer = new LinkedList<String>(); for (int i = 0; i < 10; i++) { Thread t1 = new Productor(buffer, 100); t1.start(); } for (int i = 0; i < 5; i++) { Thread t2 = new Customer(buffer, 200); t2.start(); } latch.await(); long endTime = System.nanoTime(); System.out.println(endTime - startTime); } }
jdk1.5+ 采用BlockingQueue来实现
package com.xx.concurrent.commonUse; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; public class MutiProductorAndCustomer { ArrayBlockingQueue<String> buffer = new ArrayBlockingQueue<String>(5); static CountDownLatch latch = new CountDownLatch(15); class Productor extends Thread { int quality; Productor(int quality){ this.quality = quality; } @Override public void run(){ while(quality > 0){ try { product(); } catch (InterruptedException e) { e.printStackTrace(); quality++; } quality--; } latch.countDown(); } public void product() throws InterruptedException{ String str = StringUtils.getRandomString(10); buffer.put(str); System.out.println(this.getName() + " product " + str); } } class Customer extends Thread { int quality; Customer(int quality){ this.quality = quality; } @Override public void run(){ while(quality > 0){ try { consume(); } catch (InterruptedException e) { e.printStackTrace(); quality++; } quality--; } latch.countDown(); } public void consume() throws InterruptedException{ String str = buffer.take(); System.out.println(this.getName() + " cusume " + str); } } /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { long startTime = System.nanoTime(); MutiProductorAndCustomer demo = new MutiProductorAndCustomer(); for(int i =0 ; i< 10; i++){ Thread t1 = demo.new Productor(100); t1.start(); } for(int i =0 ; i< 5; i++){ Thread t2 = demo.new Customer(200); t2.start(); } latch.await(); long endTime = System.nanoTime(); System.out.println(endTime - startTime); } }
字符串随机生成器StringUtils
package com.xx.pub; import java.util.Random; public class StringUtils { public static String getRandomString(int length) { //length表示生成字符串的长度 String base = "abcdefghijklmnopqrstuvwxyz0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(base.length()); sb.append(base.charAt(number)); } return sb.toString(); } }
jdk 1.5+ 采用信号量实现
使用了3个信号量,mutex用来控制对临界缓冲区的访问,slots标识空闲的缓冲区,items标识已装入的缓冲区。如果使用ConcurrentLinkedQueue做缓冲区的话,互斥信号量mutex可以不用。
package com.xx.concurrent.commonUse;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
public class ProductorAndCustomerWithSemaphore {
//此处使用的是非线程安全LinkedList
Queue<String> buffer = new LinkedList<String>();
//使用ConcurrentLinkedQueue的话,互斥信号量mutex可以不用,可以提高一定的性能
//Queue<String> buffer = new ConcurrentLinkedQueue<String>();
static int BUFFERSIZE = 10;
//线程数量
static CountDownLatch latch = new CountDownLatch(15);
static Semaphore mutex = new Semaphore(1);
static Semaphore slots = new Semaphore(BUFFERSIZE);
static Semaphore items = new Semaphore(0);
class Productor extends Thread {
int quality;
Productor(int quality){
this.quality = quality;
}
@Override
public void run(){
while(quality > 0){
try {
slots.acquire();
mutex.acquire();
product();
mutex.release();
items.release();
} catch (InterruptedException e) {
e.printStackTrace();
quality++;
}
quality--;
}
latch.countDown();
}
public void product() throws InterruptedException{
String str = StringUtils.getRandomString(10);
buffer.offer(str);
System.out.println(this.getName() + " product " + str);
}
}
class Customer extends Thread {
int quality;
Customer(int quality){
this.quality = quality;
}
@Override
public void run(){
while(quality > 0){
try {
items.acquire();
mutex.acquire();
consume();
mutex.release();
slots.release();
} catch (InterruptedException e) {
e.printStackTrace();
quality++;
}
quality--;
}
latch.countDown();
}
public void consume() throws InterruptedException{
String str = buffer.poll();
System.out.println(this.getName() + " cusume " + str);
}
}
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
long startTime = System.nanoTime();
ProductorAndCustomerWithSemaphore demo = new ProductorAndCustomerWithSemaphore();
for(int i =0 ; i< 10; i++){
Thread t1 = demo.new Productor(100);
t1.start();
}
for(int i =0 ; i< 5; i++){
Thread t2 = demo.new Customer(200);
t2.start();
}
latch.await();
long endTime = System.nanoTime();
System.out.println(endTime - startTime);
}
}
性能比较:
jdk1.5- wait() & notify()
111658991ns
jdk1.5+ ArrayBlockingQueue
98588747ns
jdk1.5+ Semaphore 使用线程不安全的buffer linkedList
123800982ns
jdk1.5+ Semaphore 使用线程不安全的buffer ConcurrentLinkedQueue
110885827ns
可以看出使用BlockQueue来实现生产者和消费者问题,性能最好。
相关推荐
高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4 高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...
面试高级开发的期间整理的面试题目,记录我面试遇到过的并发题目以及答案 目录 并发 常说的并发问题是哪些;资源竞争、死锁、事务、可见性 ...实现一个阻塞队列(用Condition写生产者与消费者就)?BlockingQueue
基于JAVA的UDP服务器模型源代码,内含UDP服务器端模型和UDP客户端模型两个小程序,向JAVA初学者演示UDP C/S结构的原理。 简单聊天软件CS模式 2个目标文件 一个简单的CS模式的聊天软件,用socket实现,比较简单。 ...
基于JAVA的UDP服务器模型源代码,内含UDP服务器端模型和UDP客户端模型两个小程序,向JAVA初学者演示UDP C/S结构的原理。 简单聊天软件CS模式 2个目标文件 一个简单的CS模式的聊天软件,用socket实现,比较简单。 ...
高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4 高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4 高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...
NIO(通道,缓冲区,选择器) Java服务器端开发面试题篇2 thread, start(), run() 多线程里面的关键字,wait, notfiy, 锁(synchronized), lock接口 线程状态,上下文切换,守护线程 消费者和生产者的几种实现方式,...
ConsumerAndProduction:生产者消费者模式的几种Java实现 NiuKe: 牛客网一些题目练习 offer: 2016年4月以来,参加在线笔试的一些公司的编程题目 pattern: 设计模式练习 Practice: 一些无目的的代码练习 base: ...
16.4.5 “生产者-消费者”案例的实际运行 365 16.4.6 notify方法的使用 366 16.4.7 同步的语句块 367 16.4.8 线程的死锁 369 16.4.9 防止错误的使用wait、notify、notifyAll方法 371 16.5 获取当前正在...
有几种画法?艺术大家通常是创造出自己的套路,比如明末清初,水墨画法开始成熟,这时画树就不用勾勒这个模式了,而是一笔 下去,浓淡几个叶子,待毛笔的水墨要干枯时,画一下树干,这样,一个活生写意的树就画出来. 我上面...
这个包下面的东西都是与并发编程相关的东西,比如阻塞队列实现生产者消费者模型,CAS,n种单例模式,线程交替打印,线程通信,读写锁,信号量,countdLaunch,circleBarrier等等 五. LeetCode 记录力扣刷题 由于本人不经常在...