`

简单的java生产者消费者代码示例

 
阅读更多

一、背景

在并发编程中生产者-消费者模式是一个典型的问题。是数据共享简单而有效的手段之一。下面是这个模式的一个简单示例

 

二、代码简介

多个数据生产者将数据存入缓冲区,一个或者多个数据消费者将数据从缓冲区取走

package com.two;

import java.util.Date;
import java.util.LinkedList;
import java.util.List;

public class EventStorage {
    
    public int maxSize;
    private List<Date> storage;
    
    public EventStorage(){
        maxSize=10;
        storage=new LinkedList<Date>();
    }

    public synchronized void set(){
       while(storage.size()>=10){
           
           try{
               wait();
           }catch(InterruptedException ex){
               ex.printStackTrace();
           }
       } 
       storage.add(new Date());
       System.out.println("set storage size :"+storage.size());
       notifyAll();
    }
    
    
    public synchronized void get(){
        
        while(storage.size()<=0){
            try{
                wait();
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
        System.out.println("get "+((LinkedList<Date>)storage).poll());
        notifyAll();
    }
}

 

    

package com.two;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable{
    
    private EventStorage storage;
    
    public Consumer(EventStorage storage){
        this.storage=storage;
    }

    @Override
    public void run() {
        while(true){
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(" storage.get.. ");
            storage.get();
        }
    }

}

 

   

package com.two;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Producer implements Runnable {
    
    private EventStorage storage;
    
    public Producer(EventStorage storage){
       this.storage=storage; 
    }

    @Override
    public void run() {
        
        while(true){
            
            try {
                TimeUnit.SECONDS.sleep(new Random().nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(" storage.set ");
            storage.set();
        }
        
    }

}

   

package com.two;

/**
 * 生产者消费者主程序
 * 
 * @author yebing.li
 * @version $Id: Main.java, v 0.1 2014年7月25日 上午11:04:42 yebing.li Exp $
 */
public class Main {

    public static void main(String[] args) {
        
        EventStorage storage=new EventStorage();
        
        Producer producer=new Producer(storage);
        Thread thread1=new Thread(producer);
        
        Consumer consumer=new Consumer(storage);
        Thread thread2=new Thread(consumer);

        thread1.start();
        thread2.start();
    }

}

 

 

三、另一种实现方式java.util.concurrent.locks.Condition

 

package com.two;

import java.util.Random;

public class FileMock {
    
    private String[] content;
    private int index;
    
    public FileMock(int size,int length){
        content=new String[size];
        for(int i=0;i<size;i++){
            StringBuilder buffer=new StringBuilder();
            for(int j=0;j<length;j++){
                int indice=new Random().nextInt(1000);
                buffer.append(indice);
            }
            content[i]=buffer.toString();
        }
        index=0;
    }

    public boolean hasMoreLines(){
        return index<content.length;
    }
    
    public String getLine(){
        if(hasMoreLines()){
            return content[index++];  
        }
        return null;
    }
}

 

   

package com.two;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Buffer {
    
    private LinkedList<String> buffer;
    private int maxSize;
    private ReentrantLock lock;
    private Condition lines;
    private Condition space;
    private boolean peedingLines;
    
    public Buffer(int maxSize){
        this.maxSize=maxSize;
        buffer=new LinkedList<String>();
        lock=new ReentrantLock();
        lines=lock.newCondition();
        space=lock.newCondition();
        peedingLines=true;
    }

    public void insert(String line){
        lock.lock();
        try{
            while(buffer.size()>=maxSize){
                space.await();
            }
            buffer.offer(line);
            lines.signalAll();
            
        }catch(InterruptedException e){
           e.printStackTrace(); 
        }finally{
            lock.unlock();
        }
    }
    
    public String get(){
        String line=null;
        lock.lock();
        try{
            while(buffer.size()==0&&hasPeedingLines()){
                lines.await();
            }
            if(hasPeedingLines()){
                line=buffer.poll();
                space.signalAll();
            }
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
        return line;
    }
    
    
    public void setPeedingLines(boolean peedingLines){
        this.peedingLines=peedingLines;
    }
    
    public boolean hasPeedingLines(){
        return peedingLines||buffer.size()>0;
    }
}

 

   

package com.two;

public class Producerfile implements Runnable{

    private FileMock mock;
    private Buffer buffer;
    
    public Producerfile(FileMock mock,Buffer buffer){
        this.mock=mock;
        this.buffer=buffer;
    }
    
    @Override
    public void run() {
        buffer.setPeedingLines(true);
        while(mock.hasMoreLines()){
           String line=mock.getLine();
           System.out.println("Producerfile,content:"+line);
           buffer.insert(line);
        }
        buffer.setPeedingLines(false);
    }

}

 

    

package com.two;

public class ConsumerFile implements Runnable{

    private Buffer buffer;
    
    public ConsumerFile(Buffer buffer){
       this.buffer=buffer; 
    }
    
    @Override
    public void run() {
         
        while(buffer.hasPeedingLines()){
           String line=buffer.get();
           System.out.println(String.format("ConsumerFile content:%s", line));
        }
        
    }

}

    

package com.two;

public class FileMockMain {

    public static void main(String[] args) {
        
        FileMock mock=new FileMock(100,10);
        Buffer buffer=new Buffer(20);
        
        Producerfile producer=new Producerfile(mock,buffer);
        Thread threadProducer=new Thread(producer,"Producer");
        
        ConsumerFile consumers[]=new ConsumerFile[3];
        Thread threadConsumer[]=new Thread[3];
        for(int i=0;i<3;i++){
            consumers[i]=new ConsumerFile(buffer);
            threadConsumer[i]=new Thread(consumers[i],"consumer"+i);
        }

        threadProducer.start();
        for(int i=0;i<3;i++){
            threadConsumer[i].start();  
        }
    }

}

 

 

   代码来源于《java7并发编程手册》

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics