`

用java实现的生产者与消费者多线程同步互斥模型

    博客分类:
  • java
阅读更多

生产者与消费者模型中,要保证以下几点:
 (1)同一时间内只能有一个生产者生产。
 (2)同一时间内只能有一个消费者消费。
 (3)生产者生产的同时消费者不能消费。
 (4)消息队列满时生产者不能继续生产,消息队列空时消费者不能继续消费,此时必须等待。

代码:

1、Message.java

view plaincopy to clipboardprint?
package com.test;  
 
/** 
 * 消息对象 
 * 
 */ 
public class Message {  
 
    private int id;  
    private String message;  
    public int getId() {  
        return id;  
    }  
    public void setId(int id) {  
        this.id = id;  
    }  
    public String getMessage() {  
        return message;  
    }  
    public void setMessage(String message) {  
        this.message = message;  
    }  
      
      

package com.test;

/**
 * 消息对象
 *
 */
public class Message {

 private int id;
 private String message;
 public int getId() {
  return id;
 }
 public void setId(int id) {
  this.id = id;
 }
 public String getMessage() {
  return message;
 }
 public void setMessage(String message) {
  this.message = message;
 }
 
 
}
 

2、Queue.java

 view plaincopy to clipboardprint?
package com.test;  
 
import java.util.ArrayList;  
import java.util.List;  
/** 
 * 存放消息的缓存队列,共享资源 
 * 
 */ 
public class Queue {  
 
    private List<Message> list = new ArrayList<Message>();  
      
    private int maxCount = 5;  
      
    /** 
     * 生产 
     * @param message 
     */ 
    public synchronized void product(Message message){  
          
        this.notifyAll();  
        while (maxCount == getList().size()) {  
            System.out.println(Thread.currentThread().getName()+",队列已满,等待中。。。。");  
            try {  
                this.wait();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
        getList().add(message);  
        System.out.println(Thread.currentThread().getName()+",正在生产。。。队列当前个数:"+getList().size());  
    }  
      
    /** 
     * 消费 
     */ 
    public synchronized void consume(){  
        this.notifyAll();  
        while(getList().size()==0){  
            System.out.println(Thread.currentThread().getName()+",队列为空,等待中。。。");  
            try {  
                this.wait();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
        Message message = getList().get(0);  
        getList().remove(message);  
        System.out.println(Thread.currentThread().getName()+",正在消费。。。队列当前个数:"+getList().size());  
          
    }  
 
    public List<Message> getList() {  
        return list;  
    }  
 
    public void setList(List<Message> list) {  
        this.list = list;  
    }  
 
    public int getMaxCount() {  
        return maxCount;  
    }  
 
    public void setMaxCount(int maxCount) {  
        this.maxCount = maxCount;  
    }  

package com.test;

import java.util.ArrayList;
import java.util.List;
/**
 * 存放消息的缓存队列,共享资源
 *
 */
public class Queue {

 private List<Message> list = new ArrayList<Message>();
 
 private int maxCount = 5;
 
 /**
  * 生产
  * @param message
  */
 public synchronized void product(Message message){
  
  this.notifyAll();
  while (maxCount == getList().size()) {
   System.out.println(Thread.currentThread().getName()+",队列已满,等待中。。。。");
   try {
    this.wait();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
  getList().add(message);
  System.out.println(Thread.currentThread().getName()+",正在生产。。。队列当前个数:"+getList().size());
 }
 
 /**
  * 消费
  */
 public synchronized void consume(){
  this.notifyAll();
  while(getList().size()==0){
   System.out.println(Thread.currentThread().getName()+",队列为空,等待中。。。");
   try {
    this.wait();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
  Message message = getList().get(0);
  getList().remove(message);
  System.out.println(Thread.currentThread().getName()+",正在消费。。。队列当前个数:"+getList().size());
  
 }

 public List<Message> getList() {
  return list;
 }

 public void setList(List<Message> list) {
  this.list = list;
 }

 public int getMaxCount() {
  return maxCount;
 }

 public void setMaxCount(int maxCount) {
  this.maxCount = maxCount;
 }
}
 

3、Producer.java

view plaincopy to clipboardprint?
package com.test;  
 
import java.util.Random;  
 
/** 
 * 生产者生产 
 * 
 */ 
public class Producer implements Runnable {  
 
    private Queue queue = null;  
 
    public Producer(Queue queue) {  
        this.queue = queue;  
    }  
 
    public void run() {  
        while (true) {  
            Message message = new Message();  
            message.setId(new Random().nextInt());  
            message.setMessage("message," + new Random().nextInt());  
            queue.product(message);  
            try {  
                Thread.sleep(100);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
 
    }  
 

package com.test;

import java.util.Random;

/**
 * 生产者生产
 *
 */
public class Producer implements Runnable {

 private Queue queue = null;

 public Producer(Queue queue) {
  this.queue = queue;
 }

 public void run() {
  while (true) {
   Message message = new Message();
   message.setId(new Random().nextInt());
   message.setMessage("message," + new Random().nextInt());
   queue.product(message);
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }

 }

}
 

4、Consumer .java

view plaincopy to clipboardprint?
package com.test;  
 
/** 
 * 消费者消费 
 * 
 */ 
public class Consumer implements Runnable {  
 
    private Queue queue;  
 
    public Consumer(Queue queue) {  
        this.queue = queue;  
    }  
 
    public void run() {  
 
        while (true) {  
            queue.consume();  
            try {  
                Thread.sleep(100);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
 
    }  

package com.test;

/**
 * 消费者消费
 *
 */
public class Consumer implements Runnable {

 private Queue queue;

 public Consumer(Queue queue) {
  this.queue = queue;
 }

 public void run() {

  while (true) {
   queue.consume();
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }

 }
}
 

5、Test.java

view plaincopy to clipboardprint?
package com.test;  
 
public class Test {  
 
    public static void main(String[] args) {  
        Queue queue = new Queue();  
//      生产者两个  
        Producer producer1 = new Producer(queue);  
        Producer producer2 = new Producer(queue);  
          
//    消费者三个  
        Consumer consumer1 = new Consumer(queue);  
        Consumer consumer2 = new Consumer(queue);  
        Consumer consumer3 = new Consumer(queue);  
          
        Thread thread1 = new Thread(producer1,"producer1");  
        Thread thread2 = new Thread(producer2,"producer2");  
        Thread thread3 = new Thread(consumer1,"consumer1");  
        Thread thread4 = new Thread(consumer2,"consumer2");  
        Thread thread5 = new Thread(consumer3,"consumer3");  
        thread1.start();  
        thread2.start();  
        thread3.start();  
        thread4.start();  
        thread5.start();  
          
          
    }  

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics