代码:
package conSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** * 并发单向队列简单实现 * * @author dingchd * * @param <T> */ public class NoBlockQueue<T> { private Node<T> header; private AtomicReference<Node<T>> tail; private AtomicInteger size; public NoBlockQueue() { header = new Node<T>(); tail = new AtomicReference<Node<T>>(header); size = new AtomicInteger(0); } /** * 存元素的过程分两步骤:原子更新尾节点的next、原子更新尾节点 如果第二部更新失败 则原子还原尾节点的next * * @return */ public void add(T t) { // 创建一个节点 Node<T> node = new Node<T>(); node.value = t; Node<T> curTail = null; for (;;) { curTail = tail.get(); if (curTail.next.get() == null) { if (casNext(curTail, null, node)) { if (casTail(curTail, node)) { size.incrementAndGet(); return; } else { curTail.next.getAndSet(null); } } } } } /** * 取元素分两部:原子更新header的next、第一个元素为尾节点,则将尾节点原子更新到header 如果第二部失败,则原则还原第一步 * * @return */ public T poll() { Node<T> first = null; T value = null; for (;;) { first = header.next.get(); Node<T> curTail = tail.get(); // 队列空 if (curTail == header && first == null) { break; } // 中间状态 if ((first != null && curTail == header) || (first == null && curTail != header)) { continue; } if (first != null) { // 如果tail指向第一个元素,则取队首后将tail更新至header if (curTail == first) { if (casHeaderNext(first, null)) { if (casTail(curTail, header)) { value = first.value; break; } else { header.next.getAndSet(first); } } } else { Node<T> second = first.next.get(); // 如果second为null,则说明当前获得的first已经被其他线程取走 if (second != null) { if (casHeaderNext(first, second)) { value = first.value; break; } } } } } if (value != null) { size.decrementAndGet(); } return value; } public boolean isEmpty() { return tail.get().value == null; } public T top() { Node<T> first = header.next.get(); return first == null ? null : first.value; } public int size() { return size.get(); } private final boolean casHeaderNext(Node<T> before, Node<T> after) { return header.next.compareAndSet(before, after); } private final boolean casTail(Node<T> before, Node<T> after) { return tail.compareAndSet(before, after); } private final boolean casNext(Node<T> node, Node<T> before, Node<T> after) { return node.next.compareAndSet(before, after); } static class Node<T> { T value; AtomicReference<Node<T>> next = new AtomicReference<Node<T>>(); } }
测试代码:
package conSet; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; public class NoBlockQueueTest2 { public static int SIZE = 10000; public static int C_NUM = 10; /** * @param args */ public static void main(String[] args) { for (int i = 0; i < 10000; i++) { test(); } } public static void test() { NoBlockQueue<String> queue = new NoBlockQueue<String>(); Queue<String> input = new ConcurrentLinkedQueue<String>(); Queue<String> output = new ConcurrentLinkedQueue<String>(); for (int i = 0; i < C_NUM; i++) { Runnable mp = new MP(queue, input); new Thread(mp).start(); } List<Thread> list = new ArrayList<Thread>(); for (int i = 0; i < C_NUM; i++) { Runnable mc = new MC(queue, output); Thread t = new Thread(mc); t.start(); list.add(t); } for (Thread t : list) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } ArrayList<String> sort1 = new ArrayList<String>(); ArrayList<String> sort2 = new ArrayList<String>(); while (!input.isEmpty()) { sort1.add(input.poll()); } while (!output.isEmpty()) { sort2.add(output.poll()); } Collections.sort(sort1); Collections.sort(sort2); if (sort1.size() != sort2.size()) { throw new RuntimeException("test error,size not equal"); } for (int i = 0; i < sort1.size(); i++) { String left = sort1.get(i); String right = sort2.get(i); if (!left.equals(right)) { throw new RuntimeException("test error,data wrong"); } } System.out.println("test ok size=" + queue.size()); } static class MP implements Runnable { NoBlockQueue<String> queue; Queue<String> input; public MP(NoBlockQueue<String> queue, Queue<String> input) { super(); this.queue = queue; this.input = input; } public void run() { for (int i = 0; i < NoBlockQueueTest2.SIZE; i++) { String s = UUID.randomUUID().toString(); input.add(s); queue.add(s); } } } static class MC implements Runnable { NoBlockQueue<String> queue; Queue<String> output; public MC(NoBlockQueue<String> queue, Queue<String> output) { super(); this.queue = queue; this.output = output; } public void run() { final int count = NoBlockQueueTest2.C_NUM * NoBlockQueueTest2.SIZE; for (;;) { String s = queue.poll(); if (s != null) { output.add(s); } else { if (output.size() == count) { break; } } } } } }
因为没有实现remove和itr功能,因此复杂度甚微,经过10000次的不断测试,尚未发现测试失败
相关推荐
java并发编程的联系代码,由低到高,包含线程安全、锁、线程通讯、线程池、Concurrent包内容的使用、各种阻塞队列的联系等知识。
│ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...
练习学习 java 包 java.util.concurrent 并直观地展示并发双端队列的工作原理。
│ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...
练习实现网络并发服务的编程技术。 学习如何实现多线程间的相互同步和相互协作。 理解什么是线程安全。 二. 设计要求 功能概述:实现一个支持并发服务的网络运算服务器程序。该服务器能够同时接收来自 于多个客户端...
实现一个线程安全的队列 实现一个无锁队列 实现一个线程池 Go 连接数据库 Go Web 开发 Web 框架 Gin 框架 Beego 框架 Go 可视化库 Echarts C++ 攻略 STL vector list stack queue deque priority_queue set ...
为了减小学员考试交卷时大量并发带来的系统风险,我们尝试采用成熟的消息队列框架RabbitMQ来解决这一问题,因此我们的数据库以及系统的架构同第一版相比,发生了不小的变化。 **系统架构** 1. 管理后台现在独立成...
在单个数据库上运行的服务器集群、限制并发执行作业的数量和崩溃恢复等功能都超出了本练习的范围。要求除了package.json列出的依赖项之外,唯一的要求是 MongoDB。 可以在config.json更改 mongo 的连接设置。应用...
价值过亿的架构师训练营课面试题和答案.pptx 架构师职责 听课总结 – 第一课 架构视图,设计文档 – 第二课 编程的本质与未来 第三课 听课总结 框架设计、设计原则、设计模式 第四课 听课总结 框架开发 设计原则 ...
詹科夫以下是Jakob Jenkov的教程的练习示例,该教程位于Java并发这是Java并发部分中用于修改某些内容的示例并学习新的。 在我的示例中实现了以下主题: Java同步块。 (2个示例使用一个实例和两个实例)。 线程本地...
02_互联网Java工程师面试突击训练课程第一季的内容说明 03_关于互联网Java工程师面试突击训练课程的几点说明 04_体验一下面试官对于消息队列的7个连环炮 05_知其然而知其所以然:如何进行消息队列的技术选型? 06_...
---【课时12】12-进程内线程通讯实现(手写阻塞式队列).mp4 ---【课时2】02-如何理解线程安全与不安全.mp4 ---【课时3】03-导致线程不安全的因素.mp4 ---【课时4】04-如何保证线程安全.mp4 ---【课时5】05-...
3.1.3 并发和并行 3.2 基本抽象概念 3.2.1 内核线程 3.2.2 轻量级进程 3.2.3 用户线程 3.3 轻量级进程设计——要考虑的问题 3.3.1 fork 的语义 3.3.2 其他的系统调用 3.3.3 信号传递和处理 3.3.4 可视性 3.3.5 堆栈...
练习答案 下列操作系统中,交互性最强的是(分时系统);批处理操作系统的缺点是(交互能力弱);(多样性)不是分时操作系统的特征;下列系统中,(火炮的自动化控制系统)是实时系统;在操作系统中,(进程)是竞争和分配计算机系统...
以VC++为平台电传训练系统研究与实现.pdf 使用MFC和ADO实现不规则窗口通讯录.pdf 分布式软件动态配置环境可视化的研究与实现.pdf 利用Debug探索VisualC_编程原理.pdf 利用MFC实现Windows下工业控制的高精度定时.pdf ...
以VC++为平台电传训练系统研究与实现.pdf 使用MFC和ADO实现不规则窗口通讯录.pdf 分布式软件动态配置环境可视化的研究与实现.pdf 利用Debug探索VisualC_编程原理.pdf 利用MFC实现Windows下工业控制的高精度定时.pdf ...
以VC++为平台电传训练系统研究与实现.pdf 使用MFC和ADO实现不规则窗口通讯录.pdf 分布式软件动态配置环境可视化的研究与实现.pdf 利用Debug探索VisualC_编程原理.pdf 利用MFC实现Windows下工业控制的高精度定时.pdf ...
这是在( TAOCP )第1卷中描述的电梯模拟器的实现。在阅读该书的同时,我发现他的示例应用是如此之长和荒谬,以至于值得进一步探索。 。 Knuth的示例是一个其中涉及并发执行的彼此交互的实体。 它演示了排序的未决...
为了解决大家这个困惑,小编整理了从Linux基础到大型网站高并发处理项目实战的学习路线和知识点,希望大家能够喜欢,文末还有小编整理的视频和电子书籍,也希望大家能够喜欢。 Linux大纲 1Linux的介绍,Linu的安装: ...
l Akka:JVM上的消息队列库,负责参数服务器和工作节点之间的并发消息处理。 l Redis:基于内存的高效并行Key-Value数据库。主要用于在参数服务器和工作节点之间传递训练的模型。这些模型一般比较大(几十至上千...