`
maosheng
  • 浏览: 550287 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java 并发编程_ConcurrentLinkedQueue

    博客分类:
  • Java
 
阅读更多
ConcurrentLinkedQueue 的分析和使用:

在并发编程中我们有时候需要使用线程安全的队列。如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环 CAS 的方式来实现。


ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素,它会返回队列头部的元素。它采用了“wait-free”算法(即 CAS 算法)来实现。

ConcurrentLinkedQueue 的结构:






public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {

   ........

private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /**
     * A node from which the first live (non-deleted) node (if any)
     * can be reached in O(1) time.
     * Invariants:
     * - all live nodes are reachable from head via succ()
     * - head != null
     * - (tmp = head).next != tmp || tmp != head
     * Non-invariants:
     * - head.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     */
    private transient volatile Node<E> head;

    /**
     * A node from which the last node on list (that is, the unique
     * node with node.next == null) can be reached in O(1) time.
     * Invariants:
     * - the last node is always reachable from tail via succ()
     * - tail != null
     * Non-invariants:
     * - tail.item may or may not be null.
     * - it is permitted for tail to lag behind head, that is, for tail
     *   to not be reachable from head!
     * - tail.next may or may not be self-pointing to tail.
     */
    private transient volatile Node<E> tail;

   .......


}

ConcurrentLinkedQueue 由 head 节点和 tair 节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个 next 关联起来,从而组成一张链表结构的队列。默认情况下 head 节点存储的元素为空,tair 节点等于 head 节点。


入队列:

入队列就是将入队节点添加到队列的尾部。入队主要做两件事情,第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新 tail 节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,如果 tail 节点的 next 节点为空,则将入队节点设置成 tail的 next 节点,所以 tail 节点不总是尾节点。





步骤1 添加元素 1。队列更新 head 节点的 next 节点为元素 1 节点。又因为tail 节点默认情况下等于 head 节点,所以它们的 next 节点都指向元素 1 节点。
步骤2 添加元素 2。队列首先设置元素 1 节点的 next 节点为元素 2 节点,然后更新 tail 节点指向元素 2 节点。
步骤3 添加元素 3,设置 tail 节点的 next 节点为元素 3 节点。
步骤4 添加元素 4,设置元素 3 的 next 节点为元素 4 节点,然后将 tail 节点指向元素 4 节点。


    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }


    /**
     * Throws NullPointerException if argument is null.
     *
     * @param v the element
     */
    private static void checkNotNull(Object v) {
        if (v == null)
            throw new NullPointerException();
    }


定位尾节点:tail 节点并不总是尾节点,所以每次入队都必须先通过 tail 节点来找到尾节点,尾节点可能就是 tail 节点,也可能是 tail 节点的 next 节点。代码中循环体中的第一个 if 就是判断 tail是否有 next 节点,有则表示 next 节点可能是尾节点。获取 tail 节点的 next 节点需要注意的是 p 节点等于 p 的 next 节点的情况,只有一种可能就是 p 节点和 p 的 next 节点都等于空,表示这个队列刚初始化,正准备添加第一次节点,所以需要返回 head 节点。


设置入队节点为尾节点:p.casNext(null, n)方法用于将入队节点设置为当前队列尾节点的next 节点,p 如果是 null 表示 p 是当前队列的尾节点,如果不为 null 表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。


出队列:

出队列的就是 从队列里返回一个节点元素,并清空该节点对元素的引用。




并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出 head节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新head节点。这种做法也是通过 hops 变量来减少使用 CAS 更新 head 节点的消耗,从而提高出队效率。


public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }



首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用 CAS 的方式将头节点的引用设置成null,如果 CAS 成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了 head 节点,导致元素发生了变化,需要重新获取头节点。








  • 大小: 115.6 KB
  • 大小: 266.3 KB
  • 大小: 249.8 KB
分享到:
评论

相关推荐

    Java并发编程原理与实战

    并发容器ConcurrentLinkedQueue原理与使用.mp4 Java中的阻塞队列原理与使用.mp4 实战:简单实现消息队列.mp4 并发容器ConcurrentHashMap原理与使用.mp4 线程池的原理与使用.mp4 Executor框架详解.mp4 实战:简易web...

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    java并发编程综合讲解

    这份资源为您提供了关于 Java 并发编程的全面讲解,着重介绍了 JUC(java.util.concurrent)库中的核心概念、工具和最佳实践。通过深入学习,您将能够更好地理解并发编程的挑战,掌握构建高性能、高可伸缩性的并发...

    汪文君高并发编程实战视频资源下载.txt

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    龙果java并发编程完整视频

    第48节并发容器ConcurrentLinkedQueue原理与使用00:31:03分钟 | 第49节Java中的阻塞队列原理与使用00:26:18分钟 | 第50节实战:简单实现消息队列00:11:07分钟 | 第51节并发容器ConcurrentHashMap原理与使用00:38:...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    Java并发编程相关源码集 包括多任务线程,线程池等.rar

    Java并发编程常见知识点源码集锦,涉及到对象锁,Executors多任务线程框架,线程池等示例,列出一些源码包中包括的内容:  volatile关键字的非原子性、volatile关键字的使用、AtomicInteger原子性操作、线程安全小...

    Java并发编程(学习笔记).xmind

    Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...

    并发容器和线程池,java并发编程3

    JDK提供的这些容器⼤部分在java.util.concurrent包中。我们挑选出⼀些⽐较有代表性的并发容器 1 类,来感受⼀下JDK⾃带的并发集合带来的“快感”。 ConcurrentLinkedQueue是⼀个基于链接节点的⽆界线程安全队列,它...

    汪文君高并发编程实战视频资源全集

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    java并发编程

    第48节并发容器ConcurrentLinkedQueue原理与使用00:31:03分钟 | 第49节Java中的阻塞队列原理与使用00:26:18分钟 | 第50节实战:简单实现消息队列00:11:07分钟 | 第51节并发容器ConcurrentHashMap原理与使用00:38:...

    java编发编程:JUC综合讲解

    Java 并发编程在现代软件开发中占据重要地位,尤其是在多核处理器的时代。JUC(java.util.concurrent)库是 Java 标准库的一部分,提供了丰富的多线程并发工具,旨在帮助开发者编写高性能、高可伸缩性的并发程序。...

    【2018最新最详细】并发多线程教程

    1.并发编程的优缺点 2.线程的状态转换以及基本操作 3.java内存模型以及happens-before规则 4.彻底理解synchronized 5.彻底理解volatile 6.你以为你真的了解final吗? 7.三大性质总结:原子性、可见性以及有序性 8....

    JUC多线程学习个人笔记

    JUC(Java Util Concurrent)是Java中用于并发编程的工具包,提供了一组接口和类,用于处理多线程和并发操作。JUC提供了一些常用的并发编程模式和工具,如线程池、并发集合、原子操作等。 JUC的主要特点包括: ...

Global site tag (gtag.js) - Google Analytics