`
dingchd
  • 浏览: 15030 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

并发队列实现练习

    博客分类:
  • java
阅读更多

代码:

package conSet;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 并发单向队列简单实现
 * 
 * @author dingchd
 * 
 * @param <T>
 */
public class NoBlockQueue<T> {
	private Node<T> header;
	private AtomicReference<Node<T>> tail;

	private AtomicInteger size;

	public NoBlockQueue() {
		header = new Node<T>();
		tail = new AtomicReference<Node<T>>(header);
		size = new AtomicInteger(0);
	}

	/**
	 * 存元素的过程分两步骤:原子更新尾节点的next、原子更新尾节点 如果第二部更新失败 则原子还原尾节点的next
	 * 
	 * @return
	 */
	public void add(T t) {
		// 创建一个节点
		Node<T> node = new Node<T>();
		node.value = t;

		Node<T> curTail = null;
		for (;;) {
			curTail = tail.get();

			if (curTail.next.get() == null) {
				if (casNext(curTail, null, node)) {
					if (casTail(curTail, node)) {
						size.incrementAndGet();
						return;
					} else {
						curTail.next.getAndSet(null);
					}
				}
			}
		}
	}

	/**
	 * 取元素分两部:原子更新header的next、第一个元素为尾节点,则将尾节点原子更新到header 如果第二部失败,则原则还原第一步
	 * 
	 * @return
	 */
	public T poll() {
		Node<T> first = null;
		T value = null;
		for (;;) {
			first = header.next.get();
			Node<T> curTail = tail.get();

			// 队列空
			if (curTail == header && first == null) {
				break;
			}

			// 中间状态
			if ((first != null && curTail == header)
					|| (first == null && curTail != header)) {
				continue;
			}

			if (first != null) {
				// 如果tail指向第一个元素,则取队首后将tail更新至header
				if (curTail == first) {
					if (casHeaderNext(first, null)) {
						if (casTail(curTail, header)) {
							value = first.value;
							break;
						} else {
							header.next.getAndSet(first);
						}
					}
				} else {
					Node<T> second = first.next.get();

					// 如果second为null,则说明当前获得的first已经被其他线程取走
					if (second != null) {
						if (casHeaderNext(first, second)) {
							value = first.value;
							break;
						}
					}
				}
			}
		}

		if (value != null) {
			size.decrementAndGet();
		}

		return value;
	}

	public boolean isEmpty() {
		return tail.get().value == null;
	}

	public T top() {
		Node<T> first = header.next.get();
		return first == null ? null : first.value;
	}

	public int size() {
		return size.get();
	}

	private final boolean casHeaderNext(Node<T> before, Node<T> after) {
		return header.next.compareAndSet(before, after);
	}

	private final boolean casTail(Node<T> before, Node<T> after) {
		return tail.compareAndSet(before, after);
	}

	private final boolean casNext(Node<T> node, Node<T> before, Node<T> after) {
		return node.next.compareAndSet(before, after);
	}

	static class Node<T> {
		T value;
		AtomicReference<Node<T>> next = new AtomicReference<Node<T>>();
	}
}

 
测试代码:

package conSet;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;

public class NoBlockQueueTest2 {
	public static int SIZE = 10000;
	public static int C_NUM = 10;

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		for (int i = 0; i < 10000; i++) {
			test();
		}
	}

	public static void test() {
		NoBlockQueue<String> queue = new NoBlockQueue<String>();

		Queue<String> input = new ConcurrentLinkedQueue<String>();
		Queue<String> output = new ConcurrentLinkedQueue<String>();

		for (int i = 0; i < C_NUM; i++) {
			Runnable mp = new MP(queue, input);
			new Thread(mp).start();
		}

		List<Thread> list = new ArrayList<Thread>();
		for (int i = 0; i < C_NUM; i++) {
			Runnable mc = new MC(queue, output);
			Thread t = new Thread(mc);
			t.start();
			list.add(t);
		}

		for (Thread t : list) {
			try {
				t.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		ArrayList<String> sort1 = new ArrayList<String>();
		ArrayList<String> sort2 = new ArrayList<String>();
		while (!input.isEmpty()) {
			sort1.add(input.poll());
		}
		while (!output.isEmpty()) {
			sort2.add(output.poll());
		}

		Collections.sort(sort1);
		Collections.sort(sort2);

		if (sort1.size() != sort2.size()) {
			throw new RuntimeException("test error,size not equal");
		}

		for (int i = 0; i < sort1.size(); i++) {
			String left = sort1.get(i);
			String right = sort2.get(i);
			if (!left.equals(right)) {
				throw new RuntimeException("test error,data wrong");
			}
		}

		System.out.println("test ok size=" + queue.size());
	}

	static class MP implements Runnable {
		NoBlockQueue<String> queue;
		Queue<String> input;

		public MP(NoBlockQueue<String> queue, Queue<String> input) {
			super();
			this.queue = queue;
			this.input = input;
		}

		public void run() {
			for (int i = 0; i < NoBlockQueueTest2.SIZE; i++) {
				String s = UUID.randomUUID().toString();
				input.add(s);
				queue.add(s);
			}
		}
	}

	static class MC implements Runnable {
		NoBlockQueue<String> queue;
		Queue<String> output;

		public MC(NoBlockQueue<String> queue, Queue<String> output) {
			super();
			this.queue = queue;
			this.output = output;
		}

		public void run() {
			final int count = NoBlockQueueTest2.C_NUM * NoBlockQueueTest2.SIZE;
			for (;;) {
				String s = queue.poll();
				if (s != null) {
					output.add(s);
				} else {
					if (output.size() == count) {
						break;
					}
				}
			}
		}
	}
}

 
因为没有实现remove和itr功能,因此复杂度甚微,经过10000次的不断测试,尚未发现测试失败

 

分享到:
评论

相关推荐

    Java并发编程进阶练习代码

    java并发编程的联系代码,由低到高,包含线程安全、锁、线程通讯、线程池、Concurrent包内容的使用、各种阻塞队列的联系等知识。

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...

    Concurrent Queue Exerciser:展示并发双端队列是如何工作的。-开源

    练习学习 java 包 java.util.concurrent 并直观地展示并发双端队列的工作原理。

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段44讲、被动引用和类加载过程的练习巩固训练题.mp4 │ 高并发编程第二阶段45讲、ClassLoader加载阶段发生的故事.mp4 │ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)...

    UDP+阻塞队列+多线程

    练习实现网络并发服务的编程技术。 学习如何实现多线程间的相互同步和相互协作。 理解什么是线程安全。 二. 设计要求 功能概述:实现一个支持并发服务的网络运算服务器程序。该服务器能够同时接收来自 于多个客户端...

    go-web:后端开发指南(笔记)

    实现一个线程安全的队列 实现一个无锁队列 实现一个线程池 Go 连接数据库 Go Web 开发 Web 框架 Gin 框架 Beego 框架 Go 可视化库 Echarts C++ 攻略 STL vector list stack queue deque priority_queue set ...

    基于Java的在线考试系统,优秀毕业设计,小白必看!

    为了减小学员考试交卷时大量并发带来的系统风险,我们尝试采用成熟的消息队列框架RabbitMQ来解决这一问题,因此我们的数据库以及系统的架构同第一版相比,发生了不小的变化。 **系统架构** 1. 管理后台现在独立成...

    job-queue-challenge:作业队列编码挑战

    在单个数据库上运行的服务器集群、限制并发执行作业的数量和崩溃恢复等功能都超出了本练习的范围。要求除了package.json列出的依赖项之外,唯一的要求是 MongoDB。 可以在config.json更改 mongo 的连接设置。应用...

    价值过亿的架构师训练营课面试题和答案.pptx

    价值过亿的架构师训练营课面试题和答案.pptx 架构师职责 听课总结 – 第一课 架构视图,设计文档 – 第二课 编程的本质与未来 第三课 听课总结 框架设计、设计原则、设计模式 第四课 听课总结 框架开发 设计原则 ...

    JenkovTutorial:这是http上Jenkov教程的练习示例

    詹科夫以下是Jakob Jenkov的教程的练习示例,该教程位于Java并发这是Java并发部分中用于修改某些内容的示例并学习新的。 在我的示例中实现了以下主题: Java同步块。 (2个示例使用一个实例和两个实例)。 线程本地...

    2021互联网大厂Java架构师面试题突击视频教程

    02_互联网Java工程师面试突击训练课程第一季的内容说明 03_关于互联网Java工程师面试突击训练课程的几点说明 04_体验一下面试官对于消息队列的7个连环炮 05_知其然而知其所以然:如何进行消息队列的技术选型? 06_...

    在线 Java 硕士加薪课程 Term-05 班 (10.68G)

    ---【课时12】12-进程内线程通讯实现(手写阻塞式队列).mp4 ---【课时2】02-如何理解线程安全与不安全.mp4 ---【课时3】03-导致线程不安全的因素.mp4 ---【课时4】04-如何保证线程安全.mp4 ---【课时5】05-...

    UNIX 高级教程系统技术内幕

    3.1.3 并发和并行 3.2 基本抽象概念 3.2.1 内核线程 3.2.2 轻量级进程 3.2.3 用户线程 3.3 轻量级进程设计——要考虑的问题 3.3.1 fork 的语义 3.3.2 其他的系统调用 3.3.3 信号传递和处理 3.3.4 可视性 3.3.5 堆栈...

    操作系统 c程序-进程调度1.txt

    练习答案 下列操作系统中,交互性最强的是(分时系统);批处理操作系统的缺点是(交互能力弱);(多样性)不是分时操作系统的特征;下列系统中,(火炮的自动化控制系统)是实时系统;在操作系统中,(进程)是竞争和分配计算机系统...

    VC与Labview、Matlab编程论文资料[2].rar

    以VC++为平台电传训练系统研究与实现.pdf 使用MFC和ADO实现不规则窗口通讯录.pdf 分布式软件动态配置环境可视化的研究与实现.pdf 利用Debug探索VisualC_编程原理.pdf 利用MFC实现Windows下工业控制的高精度定时.pdf ...

    VC与Labview、Matlab编程论文资料

    以VC++为平台电传训练系统研究与实现.pdf 使用MFC和ADO实现不规则窗口通讯录.pdf 分布式软件动态配置环境可视化的研究与实现.pdf 利用Debug探索VisualC_编程原理.pdf 利用MFC实现Windows下工业控制的高精度定时.pdf ...

    VC与Labview、Matlab编程论文资料[4].rar

    以VC++为平台电传训练系统研究与实现.pdf 使用MFC和ADO实现不规则窗口通讯录.pdf 分布式软件动态配置环境可视化的研究与实现.pdf 利用Debug探索VisualC_编程原理.pdf 利用MFC实现Windows下工业控制的高精度定时.pdf ...

    knuth-elevator:Donald E. Knuth描述的电梯模拟器的Go实现

    这是在( TAOCP )第1卷中描述的电梯模拟器的实现。在阅读该书的同时,我发现他的示例应用是如此之长和荒谬,以至于值得进一步探索。 。 Knuth的示例是一个其中涉及并发执行的彼此交互的实体。 它演示了排序的未决...

    大数据学习路线从Linux基础到大型网站高并发处理项目实战

    为了解决大家这个困惑,小编整理了从Linux基础到大型网站高并发处理项目实战的学习路线和知识点,希望大家能够喜欢,文末还有小编整理的视频和电子书籍,也希望大家能够喜欢。 Linux大纲 1Linux的介绍,Linu的安装: ...

    并行深度学习系统SpeeDO.zip

    l Akka:JVM上的消息队列库,负责参数服务器和工作节点之间的并发消息处理。 l Redis:基于内存的高效并行Key-Value数据库。主要用于在参数服务器和工作节点之间传递训练的模型。这些模型一般比较大(几十至上千...

Global site tag (gtag.js) - Google Analytics