`

并发编程回顾:队列

 
阅读更多

原先多线程并发编程的学习笔记和代码整理一下贴上来。

---------------------------------

队列

可以使用同步队列来解决任务协作问题,同步队列在任意时刻都只允许一个任务插入或移除元素。

同步队列的实现:

1、java.util.concurrent包中的BlockingQueue接口提供了这个队列,且该接口有大量实现,举例如下:

首先定义一个任务Task类,该任务有3步操作:

class Task{
	public enum Status {FIRST,SECOND,THIRD};
	private Status status = Status.FIRST; //default
	private final int id;
	public Task(int id){
		this.id=id;
	}
	public void doFirst() throws Exception{
		System.out.println("taskId="+id+" doFirst!");
		TimeUnit.MILLISECONDS.sleep(1000);//执行第一步操作需要1s
		status=Status.FIRST;
	}
	public void doSecond() throws Exception{
		System.out.println("taskId="+id+" doSecond!");
		TimeUnit.MILLISECONDS.sleep(200);//执行第二步操作需要0.2s
		status=Status.SECOND;
	}
	public void doThird() throws Exception{
		System.out.println("taskId="+id+" doThird!");
		TimeUnit.MILLISECONDS.sleep(2000);//执行第三步操作需要2s
		status=Status.THIRD;
	}
	public Status getStatus(){
		return status;
	}
	public int getId(){
		return this.id;
	}
}

然后,对这个task进行处理,每一步都定义一个处理器,这里用到了LinkedBlockingQueue。

第一个任务处理器,启动task,并执行第一步操作doFirst,执行完后放入队列。

class HandlerFirst implements Runnable{
	private LinkedBlockingQueue<Task> firstQueue;
	private int count=0;//任务数
	public HandlerFirst(LinkedBlockingQueue<Task> firstQueue){
		this.firstQueue=firstQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task=new Task(count++);
				task.doFirst();
				firstQueue.put(task);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

第二个处理器,从第一步完成的队列中取出任务,然后继续执行第二步操作,完成后再放入第二步完成队列:

class HandlerSecond implements Runnable{
	private LinkedBlockingQueue<Task> firstQueue,secondQueue;
	public HandlerSecond(LinkedBlockingQueue<Task> firstQueue,LinkedBlockingQueue<Task> secondQueue){
		this.firstQueue=firstQueue;
		this.secondQueue=secondQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task = firstQueue.take();
				task.doSecond();
				secondQueue.put(task);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

第三步,从完成第二步的队列中取出任务继续处理,之后放入全部完成的队列:

class HandlerThird implements Runnable{
	private LinkedBlockingQueue<Task> secondQueue,finishQueue;
	public HandlerThird(LinkedBlockingQueue<Task> secondQueue,LinkedBlockingQueue<Task> finishQueue){
		this.secondQueue=secondQueue;
		this.finishQueue=finishQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task = secondQueue.take();
				task.doThird();
				finishQueue.put(task);
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

任务所有步骤都完成时的处理器,打印出taskid:

class FinishHandler implements Runnable{
	private LinkedBlockingQueue<Task> finishQueue;
	public FinishHandler(LinkedBlockingQueue<Task> finishQueue){
		this.finishQueue=finishQueue;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Task task = finishQueue.take();
				System.out.println("******* finish! taskId="+task.getId());
			}
		}catch(InterruptedException e){
			e.printStackTrace();
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

最后,测试一下这个程序:

LinkedBlockingQueue<Task> firstQueue=new LinkedBlockingQueue<Task>(),
		          secondQueue=new LinkedBlockingQueue<Task>(),
		          finishQueue=new LinkedBlockingQueue<Task>();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(new HandlerFirst(firstQueue));
es.execute(new HandlerSecond(firstQueue,secondQueue));
es.execute(new HandlerThird(secondQueue,finishQueue));
es.execute(new FinishHandler(finishQueue));
es.shutdown();

定义3个阻塞队列,分别存放第一步、第二步、第三步完成后任务的队列。

然后分别启动他们,任务会不断的创建并将各个步骤完成的结果放入不同的队列中。

由于BlockingQueue内部已经进行了同步处理,所以并发访问时不再需要同步代码。

2、管道

主要就是使用管道流实现该功能:

首相,定义一个Sender,该sender不停的把字符写入流中:

class Sender implements Runnable {
  private Random rand = new Random();
  private PipedWriter out = new PipedWriter();
  public PipedWriter getPipedWriter() { return out; }
  public void run() {
    while(true) {
      for(char c = 'A'; c <= 'z'; c++) {
        try {
          out.write(c);
          TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
        } catch(Exception e) {
          throw new RuntimeException(e);
        }
      }
    }
  }
}
然后定义一个Receiver,不断的从管道中读取数据:
class Receiver implements Runnable {
  private PipedReader in;
  public Receiver(Sender sender) throws IOException {
    in = new PipedReader(sender.getPipedWriter());
  }
  public void run() {
    try {
      while(true) {
        // Blocks until characters are there:
        System.out.println("Read: " + (char)in.read());
      }
    } catch(IOException e) {
      throw new RuntimeException(e);
    }
  }
}
最后,测试一下这个程序,首先启动sender,然后启动receiver:
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService es = Executors.newCachedThreadPool();
es.execute(sender);
es.execute(receiver);

sender会不停的向管道中写入,receiver不停的从管道中读取,实现了一个队列的功能。

分享到:
评论

相关推荐

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段41讲、多线程设计模式内容回顾与总结.mp4 │ 高并发编程第二阶段42讲、ClassLoader课程大纲介绍.mp4 │ 高并发编程第二阶段43讲、类加载的过程以及类主动使用的六种情况详细介绍.mp4 │ 高...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段41讲、多线程设计模式内容回顾与总结.mp4 │ 高并发编程第二阶段42讲、ClassLoader课程大纲介绍.mp4 │ 高并发编程第二阶段43讲、类加载的过程以及类主动使用的六种情况详细介绍.mp4 │ 高...

    网络运维技术复习大纲-朔方鸟编辑.pdf

    进程线程与协程:这个主题涉及到并发编程和并行计算的概念。了解进程、线程和协程之间的区别和如何使用它们可以帮助优化网络应用的性能和资源利用率。 定时任务模块:定时任务模块是一种常见的网络运维工具,用于在...

    网络运维总复习.xmind

    进程线程与协程:这个主题涉及到并发编程和并行计算的概念。了解进程、线程和协程之间的区别和如何使用它们可以帮助优化网络应用的性能和资源利用率。 定时任务模块:定时任务模块是一种常见的网络运维工具,用于在...

    网络编程教程,很好的一本写linux网络编程书,这是我上传的源码

     8.1.1 域名系统的回顾  8.1.2 通过地址获取主机信息  8.1.3 通过主机名获取主机信息  8.1.4 获取本地主机的信息  8.1.5 通过服务名获取服务端口  8.1.6 通过端口号获取服务名  8.2 套接...

    linux programming instances网络编程教程 附源代码

    第1章 linux平台环境简单回顾 1.1 文件系统及其操作 1.1.1 文件系统结构 1.1.2 文件i/o操作 1.1.3 文件、目录及操作 1.2 标推输入输出 1.2.1 流和buffer 1.2.2 i/o类型 1.3 进程概念及控制 1.3.1 ...

    Go语言企业级数据处理项目实战案例 Golang实现企业级数据可视化 Go语言项目案例课程

    ├─(8) 2-1 并发编程启蒙.mp4 ├─(9) 2-2 并发编程--协程.mp4 ├─(10) 2-3 Golang协程基本示例.mp4 ├─(11) 3-1 Golang协程特性实践.mp4 ├─(12) 3-2 golang select多队列选择器.mp4 ├─(13) 3-3 selete等待...

    【Java学习+面试指南】 一份涵盖大部分Java程序员所需要掌握的核心知识.rar

    JavaGuide 【Java学习+面试指南】 一份...数据通信(消息队列、Dubbo ... ) 网站架构 面试指南 备战面试 常见面试题总结 面经 工具 Git Docker 资源 书单 Github榜单 Java基础 Java 基础知识回顾 Java 基础知识疑难点/

    最新Python3.5零基础+高级+完整项目(28周全)培训视频学习资料

    SocketServer多并发 多用户在线Ftp程序 第9周 上节回顾 paramiko模块详解 ssh密钥讲解 进程与线程 多线程 多线程案例 主线程与子线程 线程锁 线程之信号量 线程之Event 队列Queue 作业之主机批量管理 第10周 ...

    python入门到高级全栈工程师培训 第3期 附课件代码

    08 线程队列 09 生产者消费者模型 10 多进程的调用 第35章 01 进程通信 02 进程池 03 协程 04 事件驱动模型 05 IO模型前戏 06 阻塞IO与非阻塞IO 07 select及触发方式 08 select监听多连接 09 select与epoll的实现...

    UNIX 高级教程系统技术内幕

    1.3 回顾与展望 1.3.1 UNIX 好在哪里 1.3.2 UNIX 的误区在哪儿 1.4 本书的范围 1.5 参考文献 第2 章 进程与内核(17) 2.1 简介 2.2 模式.空间和上下文 2.3 进程抽象 2.3.1 进程状态 2.3.2 进程上下文 2.3.3 用户凭证 ...

Global site tag (gtag.js) - Google Analytics