`

jdk1.5——可阻塞队列(自定义和jdk提供类的两种实现写法)

 
阅读更多
 

 1 查看api 搜索关键词: BlockingQueue

  可以看到具体实现类:

所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue

2 概念理解: 一个队列(理解成数组),一个人不停的放面包,一个人不停的取面包,当放满时,放队列阻塞,放面包的人等待;  当取面包到没有时,取动作阻塞,取面包的人等待;

 

 

3 使用jdk提供实现类,模拟阻塞效果:

 

 

package cn.itcast.heima2;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

/**
 *   其内部代码 和自定义写法一样  也是弄出两个队列来
 *    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
 * @author zm
 * 
 * 测试结果:
 * Thread-0准备放数据!
Thread-0已经放了数据,队列目前有1个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有2个数据
Thread-2准备取数据!
Thread-2已经取走数据,数据为: 1 队列目前有1个数据
Thread-1准备放数据!
Thread-1已经放了数据,队列目前有2个数据
Thread-0准备放数据!
Thread-0已经放了数据,队列目前有3个数据
Thread-1准备放数据!    ------------------------> 此时篮子满了, 放队列进入阻塞
Thread-0准备放数据!
Thread-2准备取数据!
Thread-2已经取走数据,数据为: 1 队列目前有2个数据 -----------------------> 此时取走了一个面包 篮子不满
Thread-1已经放了数据,队列目前有3个数据  -----------------------> 此时才能继续放面包
 *
 */
public class BlockingQueueTest {
	public static void main(String[] args) {
		final BlockingQueue queue = new ArrayBlockingQueue(3);   // 定义篮子为3个大小
		for(int i=0;i<2;i++){ // 两个人,不停向篮子 放面包
			new Thread(){
				public void run(){
					while(true){
						try {
							Thread.sleep((long)(Math.random()*1000));
							System.out.println(Thread.currentThread().getName() + "准备放数据!");							
							queue.put(1);
							System.out.println(Thread.currentThread().getName() + "已经放了数据," + 							
										"队列目前有" + queue.size() + "个数据");
						} catch (InterruptedException e) {
							e.printStackTrace();
						}

					}
				}
				
			}.start();
		}
		
		new Thread(){
			public void run(){ // 一个人 不停的从篮子中 取面包
				while(true){ 
					try {
						//将此处的睡眠时间分别改为100和1000,观察运行结果
						Thread.sleep(1000); // 
						System.out.println(Thread.currentThread().getName() + "准备取数据!");
						Object object = queue.take();
						
						System.out.println(Thread.currentThread().getName() + "已经取走数据,数据为: " + object +							
								" 队列目前有" + queue.size() + "个数据");					
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
			
		}.start();			
	}
}

 

 
4 使用自定义阻塞队列方式,实现 存取篮子面包效果:
 
package thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 
 * @author zm
 
 *  结果:
向第0个篮子生产了一个面包,面包重量为 25g
从第0个篮子拿走了一个面包,面包重量为 25g
向第1个篮子生产了一个面包,面包重量为 73g
向第2个篮子生产了一个面包,面包重量为 40g
从第1个篮子拿走了一个面包,面包重量为 73g
向第3个篮子生产了一个面包,面包重量为 50g
向第4个篮子生产了一个面包,面包重量为 90g
从第2个篮子拿走了一个面包,面包重量为 40g
从第3个篮子拿走了一个面包,面包重量为 50g
向第5个篮子生产了一个面包,面包重量为 50g
向第6个篮子生产了一个面包,面包重量为 77g
从第4个篮子拿走了一个面包,面包重量为 90g
 *  
 *  
 *  案例2: 两队人,一对不停像100个篮子生产面包,一对不停从篮子拿走面包(阻塞队列)
 *  
 *  如果是我不停的生产面包放在篮子里,我不管消费者拿面包, notfull队列(篮子还没存放满面包队列 有自己独自的wait nodify)
 *  而消费者不停的去从篮子里拿面包,而不管生产者生产了多少,  notempty队列(篮子面包还没空队列 有自己独自的wait nodify)
 *  那么这就是两个队列,
 *  notfull队列, 只管像篮子里放面包,当100个篮子满的时,放面包动作处于等待
 *  notempty队列,只管从篮子里拿面包,当100个篮子空的时,存面包动作处于等待
 *  此时需要使用同一个lock的两个condition来实现第二个案例
 *  
 *  
 *  注意: count表示当前面包真实个数(生产一个 同时下个线程拿走一个 那么count = 0)
 *  items: 存放面包篮子
 *  putptr, 生产面包后 存放在篮子的角标,
 *  takeptr,取走面包时,面包所在篮子的角标
 * 
 */
public class CommunicateWithConditionThread {

	public static void main(String[] args) {
		
		final BoundedBuffer boundedBuffer = new BoundedBuffer();
		
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep((long)(Math.random()*100));
						boundedBuffer.put((int)(Math.random()*100));
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				
			}
		}).start();
		
		new Thread(new Runnable() {
			
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep((long)(Math.random()*100));
						boundedBuffer.take();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
		
		
		
	}

}




class BoundedBuffer {
	   final Lock lock = new ReentrantLock();
	   final Condition notFull  = lock.newCondition(); // 生产队列
	   final Condition notEmpty = lock.newCondition(); // 消费队列

	   final Object[] items = new Object[20];
	   int putptr, takeptr, count;

	   public void put(int x) throws InterruptedException {
	     lock.lock(); // 生产面包上锁
	     try {
	       while (count == items.length)  // 当生产队列并发大到突然生产了100个面包时,生产队列等待
	         notFull.await();
			 // 否则执行生产面包操作, 不断向数组下一个单元格内放新面包
	       items[putptr] = x; 
	       System.out.println("向第" + putptr + "个篮子生产了一个面包,面包重量为 " + x + "g" );
	       if (++putptr == items.length) putptr = 0;// 当存放的面包到达数组最后位置时,篮子存放面包位置又从0开始
	       ++count; // 记录面包个数
	       
	       notEmpty.signal();// 生产了面包,就立即通知消费队列去取走面包
	     } finally {
	       lock.unlock();// 生产面包完成 解锁 让下个生产执行
	     }
	   }

	   public Object take() throws InterruptedException {
	     lock.lock();// 取面包上锁
	     try {
	       while (count == 0) // 当消费队列消费并发过大,或者刚开始没生产出面包时,消费队列等待
	         notEmpty.await();
	       Object x = items[takeptr]; 
	       System.out.println("从第" + takeptr + "个篮子拿走了一个面包,面包重量为 " + x + "g" );
	       if (++takeptr == items.length) takeptr = 0;// 当取走面包到篮子最后一个位置时,重置,再从篮子最开始位置取面包
	       --count;// 记录面包个数 取走一次面包 个数减一
	       notFull.signal(); // 取走面包, 立即通知生产队列生产面包
	       return x;
	     } finally {
	       lock.unlock(); // 取面包完成 解锁 让下个取面包动作执行 
	     }
	  
 
 5 使用阻塞队列方式,实现 打印日志提高效率:
 
// 原来写法,耗时16S
public class Thread16 {

	/**
	 * 打印16个日志  耗时16S	
	 */
	public static void main(String[] args) {

		System.out.println("begin: " + (System.currentTimeMillis()/1000));
		for(int i=0; i<16; i++){ // 不能改
			String log = "" + (i + 1);// 不能改   
			parseLog(log);
		}
	}
	
	public static void parseLog(String log){
		
		System.out.println("log: " + (System.currentTimeMillis()/1000));
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}

}


修改程序代码,开四个线程让这16个对象在4秒钟打完
思路: 
	0创建容量16的队列
	1 将16个任务增加到 阻塞队列中	
	2开启4个线程,每次从队列中获取数据
	
	这样主线程不停的放, 并发来的4个线程不停的取, 你可以理解为并发一次来了4个线程,每个线程取到后内部打印1S操作仍旧不变,
	执行4次,一共耗时4S完成原来16秒不用并发下的操作
	
	主线程放log 和 子线程取log 之间用condtion notEmpty notFull 来实现阻塞


public class Test {
	
	public static void main(String[] args){
		// 0 创建容量只为1的队列
        final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(16);
        // 2 开启4个线程,每次从队列中获取数据
		for(int i=0;i<4;i++){
			new Thread(new Runnable(){
				@Override
				public void run() {
					while(true){
						try {
							String log = queue.take();
							parseLog(log);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
				
			}).start();
		}
		
		// 1 将16个任务在主线程中增加到阻塞队列中
		System.out.println("begin:"+(System.currentTimeMillis()/1000));
		for(int i=0;i<16;i++){  //这行代码不能改动
			final String log = ""+(i+1);//这行代码不能改动
			{
					try {
						queue.put(log);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
	     			//Test.parseLog(log);
			}
		}
	}
	
	//parseLog方法内部的代码不能改动
	public static void parseLog(String log){
		System.out.println(log+":"+(System.currentTimeMillis()/1000));
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		
	}
	
}
 
 
脑图:


 
 
  • 大小: 11.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics