原先多线程并发编程的学习笔记和代码整理一下贴上来。
---------------------------------
队列
可以使用同步队列来解决任务协作问题,同步队列在任意时刻都只允许一个任务插入或移除元素。
同步队列的实现:
1、java.util.concurrent包中的BlockingQueue接口提供了这个队列,且该接口有大量实现,举例如下:
首先定义一个任务Task类,该任务有3步操作:
class Task{ public enum Status {FIRST,SECOND,THIRD}; private Status status = Status.FIRST; //default private final int id; public Task(int id){ this.id=id; } public void doFirst() throws Exception{ System.out.println("taskId="+id+" doFirst!"); TimeUnit.MILLISECONDS.sleep(1000);//执行第一步操作需要1s status=Status.FIRST; } public void doSecond() throws Exception{ System.out.println("taskId="+id+" doSecond!"); TimeUnit.MILLISECONDS.sleep(200);//执行第二步操作需要0.2s status=Status.SECOND; } public void doThird() throws Exception{ System.out.println("taskId="+id+" doThird!"); TimeUnit.MILLISECONDS.sleep(2000);//执行第三步操作需要2s status=Status.THIRD; } public Status getStatus(){ return status; } public int getId(){ return this.id; } }
然后,对这个task进行处理,每一步都定义一个处理器,这里用到了LinkedBlockingQueue。
第一个任务处理器,启动task,并执行第一步操作doFirst,执行完后放入队列。
class HandlerFirst implements Runnable{ private LinkedBlockingQueue<Task> firstQueue; private int count=0;//任务数 public HandlerFirst(LinkedBlockingQueue<Task> firstQueue){ this.firstQueue=firstQueue; } @Override public void run() { try{ while(!Thread.interrupted()){ Task task=new Task(count++); task.doFirst(); firstQueue.put(task); } }catch(InterruptedException e){ e.printStackTrace(); }catch(Exception e){ e.printStackTrace(); } } }
第二个处理器,从第一步完成的队列中取出任务,然后继续执行第二步操作,完成后再放入第二步完成队列:
class HandlerSecond implements Runnable{ private LinkedBlockingQueue<Task> firstQueue,secondQueue; public HandlerSecond(LinkedBlockingQueue<Task> firstQueue,LinkedBlockingQueue<Task> secondQueue){ this.firstQueue=firstQueue; this.secondQueue=secondQueue; } @Override public void run() { try{ while(!Thread.interrupted()){ Task task = firstQueue.take(); task.doSecond(); secondQueue.put(task); } }catch(InterruptedException e){ e.printStackTrace(); }catch(Exception e){ e.printStackTrace(); } } }
第三步,从完成第二步的队列中取出任务继续处理,之后放入全部完成的队列:
class HandlerThird implements Runnable{ private LinkedBlockingQueue<Task> secondQueue,finishQueue; public HandlerThird(LinkedBlockingQueue<Task> secondQueue,LinkedBlockingQueue<Task> finishQueue){ this.secondQueue=secondQueue; this.finishQueue=finishQueue; } @Override public void run() { try{ while(!Thread.interrupted()){ Task task = secondQueue.take(); task.doThird(); finishQueue.put(task); } }catch(InterruptedException e){ e.printStackTrace(); }catch(Exception e){ e.printStackTrace(); } } }
任务所有步骤都完成时的处理器,打印出taskid:
class FinishHandler implements Runnable{ private LinkedBlockingQueue<Task> finishQueue; public FinishHandler(LinkedBlockingQueue<Task> finishQueue){ this.finishQueue=finishQueue; } @Override public void run() { try{ while(!Thread.interrupted()){ Task task = finishQueue.take(); System.out.println("******* finish! taskId="+task.getId()); } }catch(InterruptedException e){ e.printStackTrace(); }catch(Exception e){ e.printStackTrace(); } } }
最后,测试一下这个程序:
LinkedBlockingQueue<Task> firstQueue=new LinkedBlockingQueue<Task>(), secondQueue=new LinkedBlockingQueue<Task>(), finishQueue=new LinkedBlockingQueue<Task>(); ExecutorService es = Executors.newCachedThreadPool(); es.execute(new HandlerFirst(firstQueue)); es.execute(new HandlerSecond(firstQueue,secondQueue)); es.execute(new HandlerThird(secondQueue,finishQueue)); es.execute(new FinishHandler(finishQueue)); es.shutdown();
定义3个阻塞队列,分别存放第一步、第二步、第三步完成后任务的队列。
然后分别启动他们,任务会不断的创建并将各个步骤完成的结果放入不同的队列中。
由于BlockingQueue内部已经进行了同步处理,所以并发访问时不再需要同步代码。
2、管道
主要就是使用管道流实现该功能:
首相,定义一个Sender,该sender不停的把字符写入流中:
class Sender implements Runnable { private Random rand = new Random(); private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter() { return out; } public void run() { while(true) { for(char c = 'A'; c <= 'z'; c++) { try { out.write(c); TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000)); } catch(Exception e) { throw new RuntimeException(e); } } } } }然后定义一个Receiver,不断的从管道中读取数据:
class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException { in = new PipedReader(sender.getPipedWriter()); } public void run() { try { while(true) { // Blocks until characters are there: System.out.println("Read: " + (char)in.read()); } } catch(IOException e) { throw new RuntimeException(e); } } }最后,测试一下这个程序,首先启动sender,然后启动receiver:
Sender sender = new Sender(); Receiver receiver = new Receiver(sender); ExecutorService es = Executors.newCachedThreadPool(); es.execute(sender); es.execute(receiver);
sender会不停的向管道中写入,receiver不停的从管道中读取,实现了一个队列的功能。
相关推荐
│ 高并发编程第二阶段41讲、多线程设计模式内容回顾与总结.mp4 │ 高并发编程第二阶段42讲、ClassLoader课程大纲介绍.mp4 │ 高并发编程第二阶段43讲、类加载的过程以及类主动使用的六种情况详细介绍.mp4 │ 高...
│ 高并发编程第二阶段41讲、多线程设计模式内容回顾与总结.mp4 │ 高并发编程第二阶段42讲、ClassLoader课程大纲介绍.mp4 │ 高并发编程第二阶段43讲、类加载的过程以及类主动使用的六种情况详细介绍.mp4 │ 高...
进程线程与协程:这个主题涉及到并发编程和并行计算的概念。了解进程、线程和协程之间的区别和如何使用它们可以帮助优化网络应用的性能和资源利用率。 定时任务模块:定时任务模块是一种常见的网络运维工具,用于在...
进程线程与协程:这个主题涉及到并发编程和并行计算的概念。了解进程、线程和协程之间的区别和如何使用它们可以帮助优化网络应用的性能和资源利用率。 定时任务模块:定时任务模块是一种常见的网络运维工具,用于在...
8.1.1 域名系统的回顾 8.1.2 通过地址获取主机信息 8.1.3 通过主机名获取主机信息 8.1.4 获取本地主机的信息 8.1.5 通过服务名获取服务端口 8.1.6 通过端口号获取服务名 8.2 套接...
第1章 linux平台环境简单回顾 1.1 文件系统及其操作 1.1.1 文件系统结构 1.1.2 文件i/o操作 1.1.3 文件、目录及操作 1.2 标推输入输出 1.2.1 流和buffer 1.2.2 i/o类型 1.3 进程概念及控制 1.3.1 ...
├─(8) 2-1 并发编程启蒙.mp4 ├─(9) 2-2 并发编程--协程.mp4 ├─(10) 2-3 Golang协程基本示例.mp4 ├─(11) 3-1 Golang协程特性实践.mp4 ├─(12) 3-2 golang select多队列选择器.mp4 ├─(13) 3-3 selete等待...
JavaGuide 【Java学习+面试指南】 一份...数据通信(消息队列、Dubbo ... ) 网站架构 面试指南 备战面试 常见面试题总结 面经 工具 Git Docker 资源 书单 Github榜单 Java基础 Java 基础知识回顾 Java 基础知识疑难点/
SocketServer多并发 多用户在线Ftp程序 第9周 上节回顾 paramiko模块详解 ssh密钥讲解 进程与线程 多线程 多线程案例 主线程与子线程 线程锁 线程之信号量 线程之Event 队列Queue 作业之主机批量管理 第10周 ...
08 线程队列 09 生产者消费者模型 10 多进程的调用 第35章 01 进程通信 02 进程池 03 协程 04 事件驱动模型 05 IO模型前戏 06 阻塞IO与非阻塞IO 07 select及触发方式 08 select监听多连接 09 select与epoll的实现...
1.3 回顾与展望 1.3.1 UNIX 好在哪里 1.3.2 UNIX 的误区在哪儿 1.4 本书的范围 1.5 参考文献 第2 章 进程与内核(17) 2.1 简介 2.2 模式.空间和上下文 2.3 进程抽象 2.3.1 进程状态 2.3.2 进程上下文 2.3.3 用户凭证 ...