`

java 并发(2) 生产者和消费者问题的几种实现

    博客分类:
  • Java
阅读更多

有几种实现方法,一种是通过临界缓冲区的wait和notify来协调多个线程的并发,一种可以借用jdk 1.5+自带的BlockingQueue来实现,还有一种可以通过jdk1.5+的信号量机制来控制并发。

 

jdk1.5- 采用Object的wait 和notify方法来实现:

package com.xx.concurrent.commonUse;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;

import com.fangming.pub.StringUtils;

class Productor extends Thread {

	Queue<String> buffer;

	int quality;

	public Productor(Queue<String> buffer, int quality) {
		this.buffer = buffer;
		this.quality = quality;
	}

	@Override
	public void run() {
		try {
			product();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		ProductorAndCustomer.latch.countDown();
	}

	private void product() throws InterruptedException {
		synchronized (buffer) {
			while (quality > 0) {
				if (buffer.size() == ProductorAndCustomer.BUFFERSIZE) {
					buffer.wait();
				} else {
					String str = StringUtils.getRandomString(10);
					buffer.offer(str);
					quality--;
					System.out.println("####producer product " + str);
					buffer.notify();
				}
			}
		}
	}
}

class Customer extends Thread {

	Queue<String> buffer;
	int quality;

	public Customer(Queue<String> buffer, int quality) {
		this.buffer = buffer;
		this.quality = quality;
	}

	@Override
	public void run() {
		try {
			cusume();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		ProductorAndCustomer.latch.countDown();
	}

	private void cusume() throws InterruptedException {
		synchronized (buffer) {
			while (quality > 0) {
				if (buffer.size() == 0) {
					buffer.wait();
				} else {
					String str = buffer.poll();
					System.out.println("$$$$customer cocume " + str);
					quality--;
					buffer.notify();
				}
			}
		}
	}
}

public class ProductorAndCustomer {

	static final int BUFFERSIZE = 10;

	static CountDownLatch latch = new CountDownLatch(15);

	public static void main(String[] args) throws InterruptedException {

		long startTime = System.nanoTime();
		Queue<String> buffer = new LinkedList<String>();
		for (int i = 0; i < 10; i++) {
			Thread t1 = new Productor(buffer, 100);
			t1.start();
		}

		for (int i = 0; i < 5; i++) {
			Thread t2 = new Customer(buffer, 200);
			t2.start();
		}
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime);

	}

}

 

jdk1.5+ 采用BlockingQueue来实现

package com.xx.concurrent.commonUse;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;


public class MutiProductorAndCustomer {
	
	ArrayBlockingQueue<String> buffer = new ArrayBlockingQueue<String>(5);
	
	static CountDownLatch latch = new CountDownLatch(15); 
	
	class Productor extends Thread {
		
		int quality;
		
		Productor(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					product();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++;
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void product() throws InterruptedException{
			
			String str = StringUtils.getRandomString(10);
			buffer.put(str);
			System.out.println(this.getName() + " product " + str);
			
		}
	}
	
	class Customer extends Thread {
		
		int quality;
		
		Customer(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					consume();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++; 
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void consume() throws InterruptedException{
			
			String str = buffer.take();
			System.out.println(this.getName() + " cusume " + str);
			
		}
	}
	
	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		MutiProductorAndCustomer demo = new MutiProductorAndCustomer();
		for(int i =0 ; i< 10; i++){
			Thread t1 = demo.new Productor(100);
			t1.start();
		}
		
		for(int i =0 ; i< 5; i++){
			Thread t2 = demo.new Customer(200);
			t2.start();
		}
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime);
	}

}

 字符串随机生成器StringUtils

package com.xx.pub;

import java.util.Random;

public class StringUtils {

	public static String getRandomString(int length) { //length表示生成字符串的长度
	    String base = "abcdefghijklmnopqrstuvwxyz0123456789";   
	    Random random = new Random();   
	    StringBuffer sb = new StringBuffer();   
	    for (int i = 0; i < length; i++) {   
	        int number = random.nextInt(base.length());   
	        sb.append(base.charAt(number));   
	    }   
	    return sb.toString();   
	 } 
	
}

jdk 1.5+ 采用信号量实现 

使用了3个信号量,mutex用来控制对临界缓冲区的访问,slots标识空闲的缓冲区,items标识已装入的缓冲区。如果使用ConcurrentLinkedQueue做缓冲区的话,互斥信号量mutex可以不用。

 

package com.xx.concurrent.commonUse;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;


public class ProductorAndCustomerWithSemaphore {
	//此处使用的是非线程安全LinkedList
	Queue<String> buffer = new LinkedList<String>();
        //使用ConcurrentLinkedQueue的话,互斥信号量mutex可以不用,可以提高一定的性能
	//Queue<String> buffer = new ConcurrentLinkedQueue<String>();

	static int BUFFERSIZE = 10;
	
	//线程数量
	static CountDownLatch latch = new CountDownLatch(15); 
	
	static Semaphore mutex = new Semaphore(1);
	static Semaphore slots = new Semaphore(BUFFERSIZE);
	static Semaphore items = new Semaphore(0);
	
	class Productor extends Thread {
		
		int quality;
		
		Productor(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					slots.acquire();
					mutex.acquire();
					product();
					mutex.release();
					items.release();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++;
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void product() throws InterruptedException{
			String str = StringUtils.getRandomString(10);
			buffer.offer(str);
			System.out.println(this.getName() + " product " + str);
			
		}
	}
	
	class Customer extends Thread {
		
		int quality;
		
		Customer(int quality){
			this.quality = quality;
		}
		
		@Override
		public void run(){
			while(quality > 0){
				try {
					items.acquire();
					mutex.acquire();
					consume();
					mutex.release();
					slots.release();
				} catch (InterruptedException e) {
					e.printStackTrace();
					quality++; 
				}
				quality--;
			}
			latch.countDown();
		}
		
		public void consume() throws InterruptedException{
			
			String str = buffer.poll();
			System.out.println(this.getName() + " cusume " + str);
			
		}
	}
	
	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
		long startTime = System.nanoTime();
		ProductorAndCustomerWithSemaphore demo = new ProductorAndCustomerWithSemaphore();
		for(int i =0 ; i< 10; i++){
			Thread t1 = demo.new Productor(100);
			t1.start();
		}
		
		for(int i =0 ; i< 5; i++){
			Thread t2 = demo.new Customer(200);
			t2.start();
		}
		latch.await();
		long endTime = System.nanoTime();
		System.out.println(endTime - startTime);
	}
}
 

 

性能比较:

jdk1.5-  wait() & notify()
111658991ns

jdk1.5+  ArrayBlockingQueue
98588747ns 

jdk1.5+  Semaphore 使用线程不安全的buffer  linkedList
123800982ns 

jdk1.5+  Semaphore 使用线程不安全的buffer  ConcurrentLinkedQueue
110885827ns

 

可以看出使用BlockQueue来实现生产者和消费者问题,性能最好。

 

分享到:
评论

相关推荐

    汪文君高并发编程实战视频资源下载.txt

     高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4  高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4  高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...

    高级开发并发面试题和答案.pdf

    面试高级开发的期间整理的面试题目,记录我面试遇到过的并发题目以及答案 目录 并发 常说的并发问题是哪些;资源竞争、死锁、事务、可见性 ...实现一个阻塞队列(用Condition写生产者与消费者就)?BlockingQueue

    JAVA上百实例源码以及开源项目

     基于JAVA的UDP服务器模型源代码,内含UDP服务器端模型和UDP客户端模型两个小程序,向JAVA初学者演示UDP C/S结构的原理。 简单聊天软件CS模式 2个目标文件 一个简单的CS模式的聊天软件,用socket实现,比较简单。 ...

    JAVA上百实例源码以及开源项目源代码

     基于JAVA的UDP服务器模型源代码,内含UDP服务器端模型和UDP客户端模型两个小程序,向JAVA初学者演示UDP C/S结构的原理。 简单聊天软件CS模式 2个目标文件 一个简单的CS模式的聊天软件,用socket实现,比较简单。 ...

    汪文君高并发编程实战视频资源全集

     高并发编程第三阶段12讲 sun.misc.Unsafe介绍以及几种Counter方案性能对比.mp4  高并发编程第三阶段13讲 一个JNI程序的编写,通过Java去调用C,C++程序.mp4  高并发编程第三阶段14讲 Unsafe中的方法使用,一半是...

    Java服务器端开发面试.doc

    NIO(通道,缓冲区,选择器) Java服务器端开发面试题篇2 thread, start(), run() 多线程里面的关键字,wait, notfiy, 锁(synchronized), lock接口 线程状态,上下文切换,守护线程 消费者和生产者的几种实现方式,...

    牛客的代码leetcode代码区别-offer:代码练习

    ConsumerAndProduction:生产者消费者模式的几种Java实现 NiuKe: 牛客网一些题目练习 offer: 2016年4月以来,参加在线笔试的一些公司的编程题目 pattern: 设计模式练习 Practice: 一些无目的的代码练习 base: ...

    javaSE代码实例

    16.4.5 “生产者-消费者”案例的实际运行 365 16.4.6 notify方法的使用 366 16.4.7 同步的语句块 367 16.4.8 线程的死锁 369 16.4.9 防止错误的使用wait、notify、notifyAll方法 371 16.5 获取当前正在...

    二十三种设计模式【PDF版】

    有几种画法?艺术大家通常是创造出自己的套路,比如明末清初,水墨画法开始成熟,这时画树就不用勾勒这个模式了,而是一笔 下去,浓淡几个叶子,待毛笔的水墨要干枯时,画一下树干,这样,一个活生写意的树就画出来. 我上面...

    leetcode下载-LeetCode-Nowcoder-DataStruct:记录日常刷题与数据结构

    这个包下面的东西都是与并发编程相关的东西,比如阻塞队列实现生产者消费者模型,CAS,n种单例模式,线程交替打印,线程通信,读写锁,信号量,countdLaunch,circleBarrier等等 五. LeetCode 记录力扣刷题 由于本人不经常在...

Global site tag (gtag.js) - Google Analytics