`
sosojustdo
  • 浏览: 8196 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

多线程共享ArrayBlockingQueue操作

阅读更多

             一直以来对Java多线程方面挺感兴趣的;在刚开始学习Java时,也只仅仅知道一些关于线程方面的基础知识,Thread t = new Thread(new RunnableImpl()); t.start(); 要么就是直接实现Runnabled接口,其实自己实现类继承Thread时,底层Thread类其实同样是实现了Runnable接口:public  class Thread implements Runnable ;

             我相信大家都会使用以上两种方式创建和启动线程,但是可能有些同学对线程的生命周期不太了解;首先提个问题,Thread().start()和Thread.run();两个方法的区别,start();方法是准备接受系统调度,一旦线程获取到CPU资源,线程立马运行起来,则此在原来主进程中又多了一个你刚刚新建的线程启动了;而run();方法则是Thread中一个普通的方法,当调用时其该run方法时,它的生命周期在当前主线程里执行,则此它的生命周期是伴随主线程的生命周期。

             当然了线程的基础知识还很多了,比如面试经常问到的Wait和Sleep区别,谈谈线程的Interrupter机制,以及Synchronized和Lock(Concurrent包下的锁),还有难点多线程编程以及性能调优等等,后续有时间我会再接下来的博客中,把自己的线程知识分享出来。第一个次在Java Eye上写博客,先前两篇算是摘要吧,往后会继续更新自己的博客。欢迎大家来拍砖,共同学习Java提高。

             啰嗦半天圆规正传,实际工作中队列算是用的比较多,有自己实现的比如LinkedList和Redis List数据结构Lpush和Rpop来实现,也有现成的比如:Concurrent包下各种队列,该包下的队列特性围绕在阻塞非阻塞以及是否双端。我个人实际工作中用到的队列,都是自己实现的如前面提到的。今天索性使用ArrayBlockingQueue写了一个Demo,主要功能是一个线程定时往队里仍数据,另一个线程消费该队列中数据,代码如下:

             

package com.test.mulit.thread;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

public class MulitThread
{	
	//写线程池容量
	public static final Integer writerCorePoolSize = 1;
	//读线程池容量
	public static final Integer readCorePoolSize = 1;
	
	//队列大小
	public static final Integer queueSize = 5;
	
	//定义使用日志
	public static final Logger logger  = Logger.getLogger("com.test.mulit.thread.MulitThread");
	
	//读写线程共享该队列
	private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize, true);
	
	public static void main(String[] args)
	{	
		
		ScheduledExecutorService exec_w = Executors.newScheduledThreadPool(writerCorePoolSize);
		//满足周期性执行
		logger.info("-------------准备周期性往队列扔数字啦--------------------");
		exec_w.scheduleAtFixedRate(new WriterTask(queue), 0L, 4L, TimeUnit.SECONDS);
		
		ExecutorService exe_r = Executors.newFixedThreadPool(readCorePoolSize);
		while(true){
			logger.info("----------------开始消费队列中的数字---------------");
			//倘若队列为空则消费队列阻塞,在take()方法
			//休息2秒钟,与生产者节奏一致
			try
			{
				TimeUnit.SECONDS.sleep(1L);
			} catch (InterruptedException e)
			{
				logger.info(String.format("使用TimeUnit线程睡眠发生异常:", new Object[]{e.getLocalizedMessage()}));
				e.printStackTrace();
			}
			ReadTask readTask = new ReadTask(queue);
			exe_r.submit(readTask);
			readTask.setExec(new AtomicBoolean(true));
		}
	}
}

class WriterTask implements Runnable{
	
	private Logger logger = Logger.getLogger("com.test.mulit.thread.EriterTask");
	
	private ArrayBlockingQueue<Integer> queue_w;
	
	public WriterTask(ArrayBlockingQueue<Integer> queue){
		this.queue_w = queue;
	}
	
	@Override
	public void run()
	{	
		Random r = new Random();
		int random = r.nextInt(10000);
		boolean flage = queue_w.offer(random);
		System.out.println("队列大小:" + queue_w.size()+ " Add 返回结果:" + flage);
		logger.info(String.format("准备写入队列数字:[wwwwwwwwwwwwwwwwwwwww] %s", new Object[]{random}));
	}
}


class ReadTask implements Runnable {
	
	public static final Logger logger  = Logger.getLogger("com.test.mulit.thread.ReadTask");
		
	private ArrayBlockingQueue<Integer> queue_r;
	
	private volatile AtomicBoolean exec = new AtomicBoolean(false);
	
	public ReadTask(ArrayBlockingQueue<Integer> queue){
		this.queue_r = queue;
	}

	public AtomicBoolean getExec()
	{
		return exec;
	}

	public void setExec(AtomicBoolean exec)
	{
		this.exec = exec;
	}

	@Override
	public void run()
	{
		while(exec.get()){
			Integer result = 0;
			try
			{
				result = queue_r.take();
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			logger.info(String.format("读取队列数字: [rrrrrrrrrrrrrrrrrrr] %s", new Object[]{result}));
		}
	}
}

 

                注意红色标注的文字,写线程和读线程的操作队列周期都刚好是2秒,控制台打印的结果如下:

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:23:47 com.test.mulit.thread.ReadTask run

信息: 读取队列数字: [rrrrrrrrrrrrrrrrrrr] 3478

2014-1-13 0:23:47 com.test.mulit.thread.WriterTask run

信息: 准备写入队列数字:[wwwwwwwwwwwwwwwwwwwww] 3478

2014-1-13 0:23:47 com.test.mulit.thread.MulitThread main

 

                当我把读线程TimeUnit.MILLISECONDS.sleep(2L);调成2微妙时,刚好和入定时入队列操作周期2秒相差1000倍,我们再看控制台打印:

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:26:54 com.test.mulit.thread.MulitThread main

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:26:54 com.test.mulit.thread.MulitThread main

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:26:54 com.test.mulit.thread.MulitThread main

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:26:54 com.test.mulit.thread.MulitThread main

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:26:54 com.test.mulit.thread.MulitThread main

信息: ----------------开始消费队列中的数字---------------

2014-1-13 0:26:54 com.test.mulit.thread.MulitThread main

......................后面还有很多,看不到消费数字打印的日志信息,很简单的印证了当Queue为空时,调用queue.take();方法会阻塞。后来我又在WriterTask类中稍作改动代码,使用queue.add(),同时队列大小我设置为5,那么按道理队列一会就会满了,引用API中的话:“将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。“但是我发现后台没有抛异常,Debug发现,FutureTask里面innerRunAndReset讲该异常吃掉了。后来验证了poll() 获取并移除此队列的头,如果此队列为空,则返回 null;peek() 获取但不移除此队列的头;如果此队列为空,则返回 null;put()将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。等方法。

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics