`
sha.tians
  • 浏览: 14460 次
社区版块
存档分类
最新评论

java.util.concurrent学习总结

    博客分类:
  • Java
 
阅读更多

        java.util.concurrent包,分为了三个部分:java.util.concurrent,java.util.concurrent.atomic,以及java.util.concurrent.lock,主要包含了五个方面:线程池机制、同步集合、锁、原子操作、以及辅助类。

        下面的图是网上搜到的一张J.U.C完整的API,根据这张图可以从整体上看出J.U.C包中包括了哪些部分:

          最近主要学习了其中的线程池机制和锁,包括其中使用到的辅助类,本篇主要记录了关于这两方面的学习总结,其他的部分后续再来学习和添加。

 

 1. 线程池机制

        首先,我们讨论一下使用线程池的好处,线程池为线程生命周期间接成本问题和资源崩溃问题提供了解决方案。通过对多个任务重新使用线程,创建线程的间接成本将分布到多个任务中。另外,当请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。

 

 Executor:任务的执行者

 ExecutorService:一个线程池的管理者

 CompletionService:ExecutorService的扩展,可以获得线程执行后的结果

 Future:异步计算的结果

 ScheduledExecutorService:一个ExecutorService,可以安排线程在给定延迟后运行,或定时运行

 ExecutorService用法示例:

package concurrenttest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 本示例使用了Executors的静态函数生成一个固定的线程池,只能执行4个线程,当执行完一个线程后,才会又执行一个新的线程
 * 这就会产生性能问题,比如如果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加
 * 
 * @author sha.tians
 */
public class ExecutorsTest extends Thread {

    private int index;

    public ExecutorsTest(int i){
        this.index = i;
    }

    public void run() {
        try {
            System.out.println("[" + this.index + "] start....");
            Thread.sleep((int) (Math.random() * 10000));
            System.out.println("[" + this.index + "] end.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String args[]) {
        ExecutorService service = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 10; i++) {
            service.submit(new ExecutorsTest(i));
        }
        System.out.println("submit finish");
        service.shutdown();
    }

}

  ExecutorService通过submit提交任务,通过shutdown结束任务。线程池通过辅助类Executors来创建,本例中创建了一个固定大小为4的线程池,而Executors还可以创建其他类型的线程池,查看其源码,Executors是通过new ThreadPoolExecutor来实现的,对应关系如下: 

Executors
newSingleThreadExecutor():
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)
newCachedThreadPool():
    new ThreadPoolExecutor(0, Integer.MAX_VALUE,  60L, TimeUnit.SECONDS,  new SynchronousQueue<Runnable>())
newFixedThreadPool(int nThreads):
    new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>())

  ThreadPoolExecutor构造方法参数含义如下:

ThreadPoolExecutor
构造方法:
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,   long keepAliveTime,   TimeUnit unit,   BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory,  RejectedExecutionHandler handler)
入参:
corePoolSize:the number of threads to keep in the  pool, even if they are idle.
                         保持在池中的线程数,包括闲置的。
maximumPoolSize: the maximum number of threads to allow in the  pool.
                                    池中允许的最大线程数
keepAliveTime: when the number of threads is greater than  the core, this is the maximum time that excess idle threads  will wait for new tasks before terminating.
                            当线程数大于core,闲置线程在终止前等待新任务的最大时间
unit: the time unit for the keepAliveTime  argument.
            keepAliveTime的时间单位
workQueue: the queue to use for holding tasks before they  are executed. This queue will hold only the Runnable tasks submitted by the execute method.
                        在执行前holding任务的队列,这个队列只hold被execute方法提交的Runnable任务。
threadFactory: the factory to use when the executor  creates a new thread.
                            当executor创建新任务时使用的工厂
handler: the handler to use when execution is blocked  because the thread bounds and queue capacities are reached.
                  当execution因为线程闲置和队列达到容量阻塞时使用的handler

  CompletionService用法示例:

package concurrenttest;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务, 并按照完成这些任务的顺序处理它们的结果。
 * 
 * @author sha.tians
 */
public class CompletionServiceTest implements Callable<String> {

    private int id;

    public CompletionServiceTest(int i){
        this.id = i;
    }

    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        CompletionService<String> completion = new ExecutorCompletionService<String>(service);
        for (int i = 0; i < 10; i++) {
            completion.submit(new CompletionServiceTest(i));
        }
        for (int i = 0; i < 10; i++) {
            System.out.println(completion.take().get());
        }
        service.shutdown();
    }

    public String call() throws Exception {
        Integer time = (int) (Math.random() * 1000);
        try {
            System.out.println(this.id + " start");
            Thread.sleep(time);
            System.out.println(this.id + " end");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return this.id + ":" + time;
    }
}

 

    本例中,通过CompletionService扩展ExecutorService,获得了线程执行后结果。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。

   ExecutorCompletionSevice构造方法如下:

 

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

  此处初始化了一个completionQueue,这个queue是用来做什么的呢?查看一下源代码:

 

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }    

 执行的过程中,应该是将结果加入了completionQueue中, take线程执行结果的时候就从这个queue中取得。

ScheduledExecutorService用法示例:

package concurrenttest;

import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
/**
 * schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。
 * scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。
 * @author sha.tians
 *
 */
public class ScheduledExecutorServiceTest {

    public static void main(String[] args) {
        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        final Runnable beeper = new Runnable() {

            int count = 0;

            public void run() {
                System.out.println(new Date() + " beep " + (++count));
            }
        };
        // 1秒钟后运行,并每隔2秒运行一次
        final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
        // 2秒钟后运行,并每次在上次任务运行完后等待5秒后重新运行
        final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
        // 30秒后结束关闭任务,并且关闭Scheduler
        scheduler.schedule(new Runnable() {

            public void run() {
                beeperHandle.cancel(true);
                beeperHandle2.cancel(true);
                scheduler.shutdown();
            }
        }, 30, SECONDS);
    }
}

  ScheduleExecutorService通过scheduleAtFixedRate方法,和scheduleWihFixedDelay方法,定期运行任务,并返回了ScheduledFuure对象,可通过cancle方法取消任务。

 

 2. 锁机制

 ReentrantLock :一个可重入的互斥锁定Lock

 

package concurrenttest;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。
 * 
 * @author sha.tians
 */
public class ReentrantLockTest extends Thread {

    TestReentrantLock lock;
    private int       id;

    public ReentrantLockTest(int i, TestReentrantLock test){
        this.id = i;
        this.lock = test;
    }

    public void run() {
        lock.print(id);
    }

    public static void main(String args[]) {
        ExecutorService service = Executors.newCachedThreadPool();
        TestReentrantLock lock = new TestReentrantLock();
        for (int i = 0; i < 10; i++) {
            service.submit(new ReentrantLockTest(i, lock));
        }
        service.shutdown();
    }
}

class TestReentrantLock {

    private ReentrantLock lock = new ReentrantLock();

    public void print(int str) {
        try {
            lock.lock();
            System.out.println(str + "获得");
            Thread.sleep((int) (Math.random() * 1000));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(str + "释放");
            lock.unlock();
        }
    }
}

  ReentrantLock由最近成功获得lock,且还没有释放该lock的线程拥有,当lock没有被另一个线程所拥有时,调用lock的线程会成功获得该锁并返回,否则就直接立即返回。可以适用isHeldBYCUrrentThread()和getHoldCount()方法来检查此情况是否发生。

 

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public boolean isLocked() {
        return sync.isLocked();
    }
 此类构造方法有两种,另一种可以接受一个可选的公平参数,当设置为true时,在多个线程竞争下,该锁定会倾向于将访问授权给等待时间最长的线程,否则将无法保证任何特定的访问顺序。
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }
 其中,Sync是ReentrantLock的内部类,该类注释如下:
    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    static abstract class Sync extends AbstractQueuedSynchronizer {
}

  其中AQS是指AbstractQueuedSynchronizer,java并发同步器。它内部维护了一个阻塞线程的队列,队列的策略是先进先出(FIFO),它是一个双向列表,结点Node为AbstractQueueSnchronizer的内部类,包含熟悉prev,之前前一个结点,以及熟悉next,指向后一个结点,每个结点内部都持有一个线程Thread。AbstractQueuedSynchronizer内置一个state字段,用来表示某种意义,当ReentrantLock使用AQS的时候,state被用来表示锁被重入的次数。

  根据Doug Lea的论文,获取一个同步器(acquire)的流程如下:

if (!tryAcquire(arg)) {
    node = create and enqueue new node;//创建一个新的节点,入队列
    pred = node's effective predecessor;//pred 执行当前节点的前一个有效节点
    while (pred is not head node || !tryAcquire(arg)) {//当前一个节点不是头节点或者获取锁失败
        if (pred's signal bit is set)//前一个节点的信号位被设置
            park();//阻塞自己
        else
            compareAndSet pred's signal bit to true;//将前一个节点的信号量设置位true
       pred = node's effective predecessor;//执行当前节点的前一个有效节点
    }
   head = node;
}

  释放一个同步器(release)的流程如下:

if (tryRelease(arg) && head node's signal bit is set) {
compareAndSet head's signal bit to false;//head节点信号位设置位false
unpark head's successor, if one exists//唤醒head节点的下个节点
}

 这个部分的逻辑比较复杂,目前只做了一个较粗的了解,后续还需要继续深入。

Doug Lea论文原文见:http://dl2.iteye.com/upload/attachment/0060/7399/3f8f3dd3-bc31-3d60-8338-b418e9dea36d.pdf

 
 
分享到:
评论

相关推荐

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Tomcat内存溢出的解决方法(java.util.concurrent.ExecutionException)

    "java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError" 是一个典型的错误提示,它表明在并发执行过程中遇到了内存不足的问题。下面我们将深入探讨这个问题的原因、影响以及如何解决。 内存溢出...

    java.util.concurrent 学习ppt

    Java.util.concurrent是Java 5.0引入的一个重要包,它为多线程编程提供了一组高级并发工具。这个包的设计者是Doug Lea,它的出现是JSR-166的一部分,也被称作Tiger更新。Java.util.concurrent的引入是为了解决传统...

    java.util.concurrent

    java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和...

    java并发工具包 java.util.concurrent中文版pdf

    import java.util.concurrent.ArrayBlockingQueue; public class BlockingQueueExample { public static void main(String[] args) { // 创建一个容量为 10 的阻塞队列 BlockingQueue&lt;String&gt; queue = new ...

    java.util.concurrent系列文章(1)

    ### Java.util.concurrent 系列文章知识点总结 #### 一、引言 随着多核处理器的普及,多线程编程已成为现代软件开发中的一个重要组成部分。Java 5 引入了 `java.util.concurrent` 包,该包提供了丰富的 API 来简化...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    The java.util.concurrent Synchronizer Framework

    ### Java.util.concurrent.Synchronizer框架详解 #### 一、引言与背景 随着Java技术的发展,多线程编程成为了一项重要的技术需求。为了更好地支持并发编程,Java平台在J2SE 1.5版本中引入了`java.util.concurrent`...

    JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用

    "JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...

    Java高性能线程库(java.util.concurrent包的补充)

    一个高性能的Java线程库,该库是 JDK 1.5 中的 java.util.concurrent 包的补充,可用于基于并发消息机制的应用。该类库不提供远程的消息功能,其设计的宗旨是实现一个内存中的消息传递机制. 主要特点有: * All ...

    动画学习 java.util.concurrent并发工具包

    如何启动:以win7系统为例,最好jdk8 1.打开cmd,cd到jdk的path,本机是:cd C:\Java\jdk6\bin ...java -cp D:\javaConcurrentAnimated.jar vgrazi.concurrent.samples.launcher.ConcurrentExampleLauncher

    java.util.concurrent-多线程框架.docx

    java.util.concurrent 多线程框架 java.util.concurrent 多线程框架是 Java 语言中用于多线程编程的库。该库提供了多种线程池实现、并发集合、同步器、lock 等多种机制,以便开发者更方便地编写高效、可靠的多线程...

    java.util.concurrent.uml.pdf

    标题中提到了“java.util.concurrent.uml.pdf”,这表明文件是一份Java并发编程工具包java.util.concurrent的UML(统一建模语言)类结构图的PDF格式文件。UML图能够帮助开发者理解Java并发包中的类、接口及其关系,...

    The java. util. concurrent synchronizer framework.pdf

    AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...

    The java.util.concurrent synchronizer framework.pdf

    文档标题“java.util.concurrent同步器框架”和描述“Doug Lea的java.util.concurrent同步器框架”表明本文将探讨由Doug Lea所撰写的关于Java并发编程中同步器框架的内容。文档中提到了AbstractQueuedSynchronizer类...

    java.util.concurrent_您不知道的5件事

    ### Java.util.concurrent_您不知道的5件事 #### 1. Semaphore(信号量) - **定义与作用**:`Semaphore` 类是一种控制多个线程访问共享资源的机制,它通过内部维护一个整数计数器(许可的数量)以及一组等待线程...

    java.util.concurrent 实现线程池队列

    在Java中,`java.util.concurrent.ExecutorService` 接口代表了一个线程池服务,而`ThreadPoolExecutor` 是它的具体实现,我们可以自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间、线程队列等。...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...

Global site tag (gtag.js) - Google Analytics