`
iluoxuan
  • 浏览: 571218 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java生产者和消费者模型三种实现

 
阅读更多

1: 生产者和消费者的问题,生产者生产产品到缓冲区,消费者从缓冲区中取,缓冲区如果满了,生产者就不能再生产,如果缓冲区为空,消费者则不能消费

 

1: 多线程并发执行,在生产产品和消费的时候可能是多个线程并发,所以必须加上锁,不然缓冲区的产生并发问题

2:当缓冲区满或者缓冲区空洞时候要阻塞,直到符合条件 唤醒

 

1:第一种实现, wait(), notifyAll();

public class ProductConsumer2 {

    private final int MAX_SIZE=100;

    private final LinkedList<Object> list=new LinkedList<Object>();

    public void product(int num) throws InterruptedException {
        synchronized(list) {
            if(list.size() + num > MAX_SIZE) { // 剩余容量不足 线程阻塞
                list.wait();
            }
            for(int i=0; i < num; i++) {
                System.out.println("thread name " + Thread.currentThread().getName() + "  product");
                list.add(new Object());
            }
            list.notifyAll();
        }
    }

    public void cosume(int num) throws InterruptedException {
        synchronized(list) {
            if(list.size() < num) {
                list.wait();
            }
            for(int i=0; i < num; i++) {
                System.out.println("thread name " + Thread.currentThread().getName() + "  remove");
                list.remove();
            }
            list.notifyAll();
        }
    }

    class ProductThread implements Runnable {

        public void run() {
            try {
                product(10);
            } catch(InterruptedException e) {
                System.out.println("中断");
            }
        }
    }

    class CosumeThread implements Runnable {

        public void run() {
            try {
                cosume(10);
            } catch(InterruptedException e) {
                System.out.println("中断");
            }
        }
    }

    public static void main(String... args) {
        ProductConsumer2 pc=new ProductConsumer2();
        for(int i=0; i < 10; i++) {
            new Thread(pc.new ProductThread()).start();
        }
        for(int i=0; i < 10; i++) {
            new Thread(pc.new CosumeThread()).start();
        }
    }
}

 

第2种,ReentrantLock  和Condition实现阻塞队列

/**
 * 生产者和消费者 Condition实现
 * @author ljq
 */
public class ProductQueue<T> {

    private final T[] items;

    private final Lock lock=new ();

    private Condition notFull=lock.newCondition();

    private Condition notEmpty=lock.newCondition();

    private int head, tail, count;

    @SuppressWarnings("unchecked")
    public ProductQueue(int maxSize) {
        this.items=(T[])new Object[maxSize];
    }

    public void put(T t) throws InterruptedException {
        lock.lock();
        try {
            while(count == getCapacity()) { // 数组已经满了
                notFull.await();
            }
            items[tail]=t;
            if(++tail == getCapacity()) {
                tail=0;
            }
            count++;
            notEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public T take() throws InterruptedException {
        lock.lock();
        try {
            while(count == 0) {
                notEmpty.await();
            }
            T t=items[head];
            items[head]=null;
            if(++head == getCapacity()) {
                head=0;
            }
            count--;
            notFull.signalAll();
            return t;
        } finally {
            lock.unlock();
        }
    }

    private int getCapacity() {
        return items.length;
    }

    public int size() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

}

 

第三种 juc中 LinkedBlockingQueue

 

public class ProductConsumer {

    private final int MAX_SIZE=100;

    private LinkedBlockingQueue<Object> queue=new LinkedBlockingQueue<Object>(MAX_SIZE);

    public void product(int num) throws InterruptedException {
        if(queue.size() == MAX_SIZE) {
            System.out.println("已经满了");
            return;
        }
        for(int i=0; i < num; i++) {
            queue.put(new Object());
        }
    }

    public void Consume(int num) throws InterruptedException {
        if(queue.size() == 0) {
            System.out.println("队列为空");
            return;
        }
        for(int i=0; i < num; i++) {
            queue.take();
        }
    }

}

 

 4: 还有一种用Semaphore信号量实现

  

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics