`
benx
  • 浏览: 272161 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java concurrent包理解

    博客分类:
  • java
 
阅读更多

Java并发处理

<!--[if !supportLists]-->1、  <!--[endif]-->简介

了解java并发之前先了解java内存模型,java内存有主内存和工作内存,比有个对象Person,有实例变量name,那么Person的实例中name属性就是在主内存中,如果多个线程同时操作Person,那么每个线程会有Person属性name的副本放在每个线程的工作内存中,每个工作内存修改后会同步到主内存中,但是这里就有问题:一致性问题和可见性问题,导致数据丢失或脏数据。

为了解决这个问题,引入了同步机制synchronized,是多个线程同时只有一个线程可以操作共享变量(主内存对象)

 

<!--[if !supportLists]-->2、  <!--[endif]-->java5sun引入了concurrent包的一些同步机制,要了解这个首先了解AbstractQueuedSynchronizer

 

<!--[if !supportLists]-->3、  <!--[endif]-->AbstractQueuedSynchronizer了解

实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架,此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制

以上是java API的描述 ,简单就是提供线程阻塞和同步的对象,子类需要实现tryAcquiretryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively等方法

 

下面是常用方法介绍

public final void acquire(int arg)

以独占模式获取对象,忽略中断。通过至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。可以使用此方法来实现 Lock.lock() 方法。

参数:

arg - acquire 参数。此值被传送给 tryAcquire(int),但它是不间断的,并且可以表示任何内容。

 

protected boolean tryAcquire(int arg)

试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。

此方法总是由执行 acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。可以用此方法来实现 Lock.tryLock() 方法。

默认实现将抛出 UnsupportedOperationException

参数:

arg - acquire 参数。该值总是传递给 acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。

返回:

如果成功,则返回 true。在成功的时候,此对象已经被获取。

抛出:

IllegalMonitorStateException - 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。

UnsupportedOperationException - 如果不支持独占模式

 

public final boolean release(int arg)

以独占模式释放对象。如果 tryRelease(int) 返回 true,则通过消除一个或多个线程的阻塞来实现此方法。可以使用此方法来实现 Lock.unlock() 方法

参数:

arg - release 参数。此值被传送给 tryRelease(int),但它是不间断的,并且可以表示任何内容。

返回:

tryRelease(int) 返回的值

 

protected boolean tryRelease(int arg)

试图设置状态来反映独占模式下的一个释放。

此方法总是由正在执行释放的线程调用。

默认实现将抛出 UnsupportedOperationException

参数:

arg - release 参数。该值总是传递给 release 方法的那个值,或者是因某个条件等待而保存在条目上的当前状态值。该值是不间断的,并且可以表示任何内容。

返回:

如果此对象现在处于完全释放状态,从而使等待的线程都可以试图获得此对象,则返回 true;否则返回 false

 

public final void acquireShared(int arg)

以共享模式获取对象,忽略中断。通过至少先调用一次 tryAcquireShared(int) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquireShared(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。

参数:

arg - acquire 参数。此值被传送给 tryAcquireShared(int),但它是不间断的,并且可以表示任何内容。

 

 

public final boolean releaseShared(int arg)

以共享模式释放对象。如果 tryReleaseShared(int) 返回 true,则通过消除一个或多个线程的阻塞来实现该方法。

参数:

arg - release 参数。此值被传送给 tryReleaseShared(int),但它是不间断的,并且可以表示任何内容。

 

protected int tryAcquireShared(int arg)

试图在共享模式下获取对象状态。此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它。

此方法总是由执行 acquire 线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。

默认实现将抛出 UnsupportedOperationException

参数:

arg - acquire 参数。该值总是传递给 acquire 方法的那个值,或者是因某个条件等待而保存在条目上的值。该值是不间断的,并且可以表示任何内容。

返回:

在失败时返回负值;如果共享模式下的获取成功但其后续共享模式下的获取不能成功,则返回 0;如果共享模式下的获取成功并且其后续共享模式下的获取可能够成功,则返回正值,在这种情况下,后续等待线程必须检查可用性。(对三种返回值的支持使得此方法可以在只是有时候以独占方式获取对象的上下文中使用。)在成功的时候,此对象已被获取。

抛出:

IllegalMonitorStateException - 如果正在进行的获取操作将在非法状态下放置此同步器。必须以一致的方式抛出此异常,以便同步正确运行。

UnsupportedOperationException - 如果不支持共享模式

 

上面几个方法总结就是获取对象,当可以获取时不阻塞,否则会阻塞,使用LockSupport.park()---unsafe.park(),释放对象

 

<!--[if !supportLists]-->4、  <!--[endif]-->CountDownLatch

 

/**

 * 原理是内部维护一个大小i的信号量,

 * 使用await方法会一直等待直到信号量为0

 * 使用countDown方法会时信号量-1,当信号量为0await取消阻塞

 * @author jin.xiong

 *

 */

public class CountDownLatch

{

    private static final class Sync extends AbstractQueuedSynchronizer

    {

 

    /**

     * 获取状态

     * @return

     */

        int getCount()

        {

            return getState();

        }

 

        /**

         * state0是才可以获取锁,否则一直等待

         */

        public int tryAcquireShared(int i)

        {

            return getState() != 0 ? -1 : 1;

        }

 

        /**

         * stage通过CAS减一

         */

        public boolean tryReleaseShared(int i)

        {

            int j;

            int k;

            do

            {

                j = getState();

                if(j == 0)

                    return false;

                k = j - 1;

            } while(!compareAndSetState(j, k));

            return k == 0;

        }

 

        Sync(int i)

        {

            setState(i);

        }

    }

 

 

    public CountDownLatch(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException("count < 0");

        } else

        {

            sync = new Sync(i);

            return;

        }

    }

 

    public void await()

        throws InterruptedException

    {

        sync.acquireSharedInterruptibly(1);

    }

 

    public boolean await(long l, TimeUnit timeunit)

        throws InterruptedException

    {

        return sync.tryAcquireSharedNanos(1, timeunit.toNanos(l));

    }

 

    public void countDown()

    {

        sync.releaseShared(1);

    }

 

    public long getCount()

    {

        return (long)sync.getCount();

    }

 

    public String toString()

    {

        return (new StringBuilder()).append(super.toString()).append("[Count = ").append(sync.getCount()).append("]").toString();

    }

   

   

    public  static void main(String[] args) throws InterruptedException{

    final CountDownLatch down = new CountDownLatch(10);

    System.out.println(down.getCount());

   

    new Thread(){

        public void run(){

            try {

                  down.await();

              } catch (InterruptedException e) {

              }

            System.out.println("CountDownLatch Stage 0   " +down.getCount() );

        }

    }.start();

    for(int i=0;i<10;i++){

        Thread.sleep(1000);

        System.out.println(i);

        down.countDown();

    }

    System.out.println(down.getCount());

    }

 

    private final Sync sync;

}

 

 

5Semaphore

/**

 * 计数信号量,原理内部维护一个大小为i的许可数量

 * acquire(s)方法把当前许可号-s

 * release(s)方法吧当前许可号+s

 * @author jin.xiong

 *

 */

public class Semaphore

    implements Serializable

{

    /**

     * 公平的Sync

     * 有两个判断条件,如果当前的线程不在等待FIFO线程队列的首部,将继续等待,且stage必须大于0

     * @author jin.xiong

     *

     */

    static final class FairSync extends Sync

    {

 

        protected int tryAcquireShared(int i)

        {

            Thread thread = Thread.currentThread();

            int j;

            int k;

            do

            {

                Thread thread1 = getFirstQueuedThread();

                if(thread1 != null && thread1 != thread)

                    return -1;

                j = getState();

                k = j - i;

            } while(k >= 0 && !compareAndSetState(j, k));

            return k;

        }

 

        private static final long serialVersionUID = 2014338818796000944L;

 

        FairSync(int i)

        {

            super(i);

        }

    }

 

    /**

     * 非公平的Sync

     * @author jin.xiong

     *

     */

    static final class NonfairSync extends Sync

    {

 

        protected int tryAcquireShared(int i)

        {

            return nonfairTryAcquireShared(i);

        }

 

        private static final long serialVersionUID = -2694183684443567898L;

 

        NonfairSync(int i)

        {

            super(i);

        }

    }

 

    static abstract class Sync extends AbstractQueuedSynchronizer

    {

 

    /**

     * 获取状态

     * @return

     */

        final int getPermits()

        {

            return getState();

        }

 

        /**

         * 把状态-i

         * @param i

         * @return

         */

        final int nonfairTryAcquireShared(int i)

        {

            int j;

            int k;

            do

            {

                j = getState();

                k = j - i;

            } while(k >= 0 && !compareAndSetState(j, k));

            return k;

        }

 

        /**

         * 把状态+i

         */

        protected final boolean tryReleaseShared(int i)

        {

            int j;

            do

                j = getState();

            while(!compareAndSetState(j, j + i));

            return true;

        }

 

        /**

         * 状态-i nonfairTryAcquireShared作用一样

         * @param i

         */

        final void reducePermits(int i)

        {

            int j;

            int k;

            do

            {

                j = getState();

                k = j - i;

            } while(!compareAndSetState(j, k));

        }

 

        /**

         * 把状态设为0

         * @return

         */

        final int drainPermits()

        {

            int i;

            do

                i = getState();

            while(i != 0 && !compareAndSetState(i, 0));

            return i;

        }

 

        private static final long serialVersionUID = 1192457210091910933L;

 

        Sync(int i)

        {

            setState(i);

        }

    }

 

 

    /**

     * 初始化容量为i的信号量

     * @param i

     */

    public Semaphore(int i)

    {

        sync = new NonfairSync(i);

    }

 

    public Semaphore(int i, boolean flag)

    {

        sync = ((Sync) (flag ? ((Sync) (new FairSync(i))) : ((Sync) (new NonfairSync(i)))));

    }

 

    public void acquire()

        throws InterruptedException

    {

        sync.acquireSharedInterruptibly(1);

    }

 

    public void acquireUninterruptibly()

    {

        sync.acquireShared(1);

    }

 

    public boolean tryAcquire()

    {

        return sync.nonfairTryAcquireShared(1) >= 0;

    }

 

    public boolean tryAcquire(long l, TimeUnit timeunit)

        throws InterruptedException

    {

        return sync.tryAcquireSharedNanos(1, timeunit.toNanos(l));

    }

 

    public void release()

    {

        sync.releaseShared(1);

    }

 

    public void acquire(int i)

        throws InterruptedException

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.acquireSharedInterruptibly(i);

            return;

        }

    }

 

    public void acquireUninterruptibly(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.acquireShared(i);

            return;

        }

    }

 

    public boolean tryAcquire(int i)

    {

        if(i < 0)

            throw new IllegalArgumentException();

        else

            return sync.nonfairTryAcquireShared(i) >= 0;

    }

 

    /**

     * 获取信号量,如果没有则等待时间l

     * @param i

     * @param l

     * @param timeunit

     * @return

     * @throws InterruptedException

     */

    public boolean tryAcquire(int i, long l, TimeUnit timeunit)

        throws InterruptedException

    {

        if(i < 0)

            throw new IllegalArgumentException();

        else

            return sync.tryAcquireSharedNanos(i, timeunit.toNanos(l));

    }

 

   /**

    * 释放i个信号量

    * @param i

    */

    public void release(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.releaseShared(i);

            return;

        }

    }

 

    public int availablePermits()

    {

        return sync.getPermits();

    }

 

    /**

     * 释放所有信号量

     * @return

     */

    public int drainPermits()

    {

        return sync.drainPermits();

    }

 

    protected void reducePermits(int i)

    {

        if(i < 0)

        {

            throw new IllegalArgumentException();

        } else

        {

            sync.reducePermits(i);

            return;

        }

    }

 

    public boolean isFair()

    {

        return sync instanceof FairSync;

    }

 

    public final boolean hasQueuedThreads()

    {

        return sync.hasQueuedThreads();

    }

 

    public final int getQueueLength()

    {

        return sync.getQueueLength();

    }

 

    protected Collection getQueuedThreads()

    {

        return sync.getQueuedThreads();

    }

 

    public String toString()

    {

        return (new StringBuilder()).append(super.toString()).append("[Permits = ").append(sync.getPermits()).append("]").toString();

    }

 

    private static final long serialVersionUID = -3222578661600680210L;

    private final Sync sync;

}

 

 

分享到:
评论

相关推荐

    Java的concurrent包动画演示

    Java.util.concurrent包下主要的类进行了动画演示, 有助于初学者理解

    Scalable IO in Java.zip

    Scalable IO in Java是java.util.concurrent包的作者,大师Doug Lea关于分析与构建可伸缩的高性能IO服务的一篇经典文章,在文章中Doug Lea通过各个角度,循序渐进的梳理了服务开发中的相关问题,以及在解决问题的...

    深入理解Java内存模型.程晓明(带书签文字版).pdf

    concurrent 包的实现 52 FINAL 55 写 final 域的重排序规则 56 读 final 域的重排序规则 57 如果 final 域是引用类型 60 为什么 final 引用不能从构造函数内“逸出” 63 final 语义在处理器中的实现 65 JSR-...

    Java八股文的面试题

    Java中的并发编程机制包括线程、同步、锁等,以及java.util.concurrent包提供的高级并发功能。 异常处理: Java通过异常处理机制提供了一种结构化的错误处理方式,允许程序在遇到错误时优雅地恢复或终止。 Java新特性...

    图解java多线程设计模式

    java.util.concurrent包、synchronized关键字、Swing框架、Java内存模型等内容也均有涉及,不仅能够了解Java多线程的相关知识,还可加深对Java语言的理解。 本书适合以下读者阅读 a.对多线程感兴趣的人 b.对Java...

    Java并发编程基础.pdf

    线程与线程状态:理解Java中线程的基本概念,包括线程的创建、启动、暂停、恢复和终止。熟悉线程的生命周期及其不同状态,如新建、就绪、运行、阻塞和死亡。 线程同步与通信:掌握Java中的同步机制,如synchronized...

    深入理解java内存模型

    concurrent包的实现 FINAL 写final域的重排序规则 读final域的重排序规则 如果final域是引用类型 为什么final引用不能从构造函数内“逸出” final语义在处理器中的实现 JSR-133为什么要增强final的语义 总结 处理器...

    java8源码-java-high-concurrent-programming:《实战Java高并发程序设计》

    java-high-concurrent-programming 《实战Java高并发程序设计》源码整理,github上已经有人整理了,但是最近在看这本书,还是自己把每章代码整理下,也加深下自己的理解. 目录 第一章:走入并行世界 第二章:Java...

    java高级特性整理资料(反射+并发+jvm)

    包含java三大高级特性的文档,《Java Reflection in Action》、《JAVA并发编程实战》、《JVM调优总结》、《深入理解Java虚拟机JVM高级特性与最佳实践》、《concurrent programming in java》,适合想深入java技术的...

    Java面试宝典-经典

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 34 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 36 56、子线程循环10次,接着主线程循环100,接着又回到...

    Java面试宝典2010版

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 34 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 36 56、子线程循环10次,接着主线程循环100,接着又回到...

    java面试题大全(2012版)

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 34 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 36 56、子线程循环10次,接着主线程循环100,接着又回到...

    java面试题

    14. 简述synchronized和java.util.concurrent.locks.Lock的异同 ? 11 15. 当一个线程进入一个对象的一个synchronized方法后,其它线程是否可进入此对象的其它方法? 11 16. abstract class和interface有什么区别? 12...

    最新Java面试宝典pdf版

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 34 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 36 56、子线程循环10次,接着主线程循环100,接着又回到...

    Java并发编程:核心理论

    在此基础上,我们会进一步分析java.util.concurrent包的工具类,包括其使用方式、实现源码及其背后的原理。本文是该系列的第一篇文章,是这系列中核心的理论部分,之后的文章都会以此为基础来分析和解释。  一、...

    java面试宝典2012

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 38 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 40 56、子线程循环10次,接着主线程循环100,接着又回到...

    Java面试笔试资料大全

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 34 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 36 56、子线程循环10次,接着主线程循环100,接着又回到...

    Java学习题答案

    } } } 本题在编译时会发生错误(错误描述不同的JVM有不同的信息,意思就是未明确的x调用, 两个x都匹配,就象在同时import java.util和java.sql两个包时直接声明Date一样) 本题主要考察对接口和类的...

    JAVA面试宝典2010

    54、简述synchronized和java.util.concurrent.locks.Lock的异同 ? 34 55、设计4个线程,其中两个线程每次对j增加1,另外两个线程对j每次减少1。写出程序。 36 56、子线程循环10次,接着主线程循环100,接着又回到...

Global site tag (gtag.js) - Google Analytics