`

《Java并发编程》之三:基础构建模块

    博客分类:
  • Java
阅读更多

将线程安全性委托给现有的线程安全类,委托时创建线程安全类一个最有效的策略:只需让现有的线程安全类管理所有的状态即可。

 

5.1  同步容器类

早起JDK的同步容器类包括Vector和Hashtable等,这些同步封装器类是由Collections.synchronizedXxx等工厂方法创建的。

同步容器类的问题是,但有复合操作:迭代、跳转、条件运算的时候,如果有其他线程并发地修改容器的时候,它们可能会出现意料之外的行为。

如果加锁的话,就会牺牲性能和伸缩性。

当容器作为另一个容器的元素或者键值时,就会出现调用hashCode equals等方法,间接调用迭代操作,而可能会产生ConcurrentModificationException异常。同样containsAll、removeAll、retainAll等方法以及把容器作为参数的构造函数都会对容器进行迭代。

 

5.2  并发容器

JDK5提供了多种并发容器来改进同步容器的性能。上面讲的Vector和Hashtable等是同步容器,它们对所有容器状态访问进行串行化,代价是性能严重低下。

而并发容器是针对多个线程并发访问设计的,在JDK5中,ConcurrentHashMap代替同步且基于散列的Map,CopyOnWriteArrayList用于在遍历操作为主要操作情况下代替同步的List。

注:通过并发容器来代替同步容器,可以极大提高伸缩性和性能并且降低风险

 

ConcurrentHashMap

CopyOnWriteArrayList

Queue

BlockingQueue:构建生产者-消费者模式的阻塞队列

LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步的List拥有更好的并发性。

PriorityBlockingQueue是一个按优先级排序队列

SynchronizedQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。它维护一组线程,这些线程在等待着把元素加入或者移出队列。  这种区别相当于将文件直接交给同事(SynchronizedQueue),还是将文件放入她的邮箱并希望她能尽快拿到文件。

 

 将桌面搜索问题分解为:文件遍历和建立索引两个独立操作,使用阻塞队列可简化编程模式:

public class ProducerConsumer {
    static class FileCrawler implements Runnable {
        private final BlockingQueue<File> fileQueue;
        private final FileFilter fileFilter;
        private final File root;

        public FileCrawler(BlockingQueue<File> fileQueue,
                           final FileFilter fileFilter,
                           File root) {
            this.fileQueue = fileQueue;
            this.root = root;
            this.fileFilter = new FileFilter() {
                public boolean accept(File f) {
                    return f.isDirectory() || fileFilter.accept(f);
                }
            };
        }

        private boolean alreadyIndexed(File f) {
            return false;
        }

        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void crawl(File root) throws InterruptedException {
            File[] entries = root.listFiles(fileFilter);
            if (entries != null) {
                for (File entry : entries)
                    if (entry.isDirectory())
                        crawl(entry);
                    else if (!alreadyIndexed(entry))
                        fileQueue.put(entry);
            }
        }
    }

    static class Indexer implements Runnable {
        private final BlockingQueue<File> queue;

        public Indexer(BlockingQueue<File> queue) {
            this.queue = queue;
        }

        public void run() {
            try {
                while (true)
                    indexFile(queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public void indexFile(File file) {
            // Index the file...
        }
    }

    private static final int BOUND = 10;
    private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();

    public static void startIndexing(File[] roots) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
        FileFilter filter = new FileFilter() {
            public boolean accept(File file) {
                return true;
            }
        };

        for (File root : roots)
            new Thread(new FileCrawler(queue, filter, root)).start();

        for (int i = 0; i < N_CONSUMERS; i++)
            new Thread(new Indexer(queue)).start();
    }
}

 爬虫程序负责往队列中添加File,属于生产者,而索引线程负责从队列中拿出File,然后建立索引,属于消费者。所有同步管理完全交由容器,相当的爽啊。

 

对象池利用了串行线程封闭,将对象借给一个请求线程,只要对象池包含足够的内部同步来安全发布池中对象,并且客户端不会发布池中对象或者使用完后返回池中对象,就可以安全地在线程间传递所有权了。

 

Latch的用法:门闩

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

5.5.3  信号量:

计数信号量 counting semaphore用来控制同时访问某个特定资源的操作数量,或同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded)
                sem.release();
        }
    }

    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

 

5.5.4  栅栏 Barrier

上面演示的Latch是一次性对象,一旦进入终止状态,就不能被重置。 Barrier跟Latch类似,它能阻塞一组线程直到某个事件发生。Barrier与Latch的区别在于所有线程必须同时到达Barrier位置,才能继续执行。

Latch用来等待事件,而Barrier用来等待其他线程。

Barrier用来实现一些协议,例如几个家庭决定在某个地方集合: 所有人6:00在地铁洛溪站碰头,到了后要等其他人,之后再讨论下一步怎么做。

 

CyclicBarrier在并行迭代算法中非常有用,当线程到达barrier位置时候调用await方法,这个方法阻塞直到所有线程都达到barrier位置。如果所有线程都到达barrier的位置,那么barrier will open and release all threads,and then reset state for the next usage。如果对await调用超时或者await阻塞的线程被中断,那么barrier就被认为打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。

细胞自动衍生模拟:

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                new Runnable() {
                    public void run() {
                        mainBoard.commitNewValues();
                    }
                });
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++)
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
    }

    private class Worker implements Runnable {
        private final Board board;

        public Worker(Board board) {
            this.board = board;
        }

        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++)
                    for (int y = 0; y < board.getMaxY(); y++)
                        board.setNewValue(x, y, computeValue(x, y));
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }

        private int computeValue(int x, int y) {
            // Compute the new value that goes in (x,y)
            return 0;
        }
    }

    public void start() {
        for (int i = 0; i < workers.length; i++)
            new Thread(workers[i]).start();
        mainBoard.waitForConvergence();
    }

    interface Board {
        int getMaxX();

        int getMaxY();

        int getValue(int x, int y);

        int setNewValue(int x, int y, int value);

        void commitNewValues();

        boolean hasConverged();

        void waitForConvergence();

        Board getSubBoard(int numPartitions, int index);
    }
}

 

5.6  构建高效可伸缩性的结果缓存:

public class Memoizer<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache
            = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw LaunderThrowable.launderThrowable(e.getCause());
            }
        }
    }
}

 有了这个之后,可以在因式分解中使用这个Memoizer来缓存之前的计算结果,不仅高效而且可扩展性更强:

@ThreadSafe
public class Factorizer extends GenericServlet implements Servlet {
    private final Computable<BigInteger, BigInteger[]> c =
            new Computable<BigInteger, BigInteger[]>() {
                public BigInteger[] compute(BigInteger arg) {
                    return factor(arg);
                }
            };
    private final Computable<BigInteger, BigInteger[]> cache
            = new Memoizer<BigInteger, BigInteger[]>(c);

    public void service(ServletRequest req,
                        ServletResponse resp) {
        try {
            BigInteger i = extractFromRequest(req);
            encodeIntoResponse(resp, cache.compute(i));
        } catch (InterruptedException e) {
            encodeError(resp, "factorization interrupted");
        }
    }

    void encodeIntoResponse(ServletResponse resp, BigInteger[] factors) {
    }

    void encodeError(ServletResponse resp, String errorString) {
    }

    BigInteger extractFromRequest(ServletRequest req) {
        return new BigInteger("7");
    }

    BigInteger[] factor(BigInteger i) {
        // Doesn't really factor
        return new BigInteger[]{i};
    }
}

 

本人博客已搬家,新地址为:http://yidao620c.github.io/

分享到:
评论

相关推荐

    Java并发编程实战

    第5章 基础构建模块 5.1 同步容器类 5.1.1 同步容器类的问题 5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 ...

    《java并发编程实战》读书笔记-第5章-基础构建模块

    《java并发编程实战》读书笔记-第3章-对象的共享,脑图形式,使用xmind8制作 包括同步容器类、并发容器类、阻塞队列和生产者消费者模式、阻塞和中断方法、同步工具类。最后是构建高效且可伸缩的结果缓存

    Java 并发编程实战

    第5章 基础构建模块 5.1 同步容器类 5.1.1 同步容器类的问题 5.1.2 迭代器与Concurrent-ModificationException 5.1.3 隐藏迭代器 5.2 并发容器 5.2.1 ConcurrentHashMap 5.2.2 额外的原子Map操作 5.2.3 ...

    Java并发编程实战2019.zip

    Java并发编程实战,第1章 简介,第2章 线程安全性 第3章 对象的共享 第4章 对象的组合 第5章 基础构建模块 第6章 任务执行 第7章 取消与关闭 第8章 线程池的使用 第9章 图形用户界面应用程序 第10章 避免...

    Java并发编程-线程安全与基础构建模块

    NULL 博文链接:https://lemon-1227.iteye.com/blog/1493843

    Java并发编程实战-读书笔记

    《Java并发编程实战》个人读书笔记,非常详细: 1 简介 2 线程安全性 3 对象的共享 4 对象的组合 5 基础构建模块 6 任务执行 7 取消与关闭 8 线程池的使用 9 图形用户界面应用程序 10 避免活跃性危险 11 性能与可...

    Java并发编程(学习笔记).xmind

    Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...

    Java是一种广泛使用的编程语言

    Java是一种静态类型的、类基础的、并发性的、面向对象的编程语言。以下是一些Java的主要特性: 平台独立性:这是Java最著名的特性。Java采用"一次编写,到处运行"的方法。Java程序在任何支持Java的平台上都可以运行...

    java8源码-baijia123:常用工具类及测试类

    JAVA并发编程实战的示例及其他 第5章:基础构建模块 对应类包com.baijia123.concurrent TestHarnes-&gt;在计时测试中使用CountDownLatch(闭锁)来启动和停止线程 Preloader-&gt;使用FutureTask来提前加载稍后需要的数据 ...

    JAVA_API1.6文档(中文)

    java.util.concurrent 在并发编程中很常用的实用工具类。 java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类...

    java源码包2

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 ...

    java源码包---java 源码 大量 实例

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    一些常见的Java面试题.docx

    Java是一种静态类型的、类基础的、并发性的、面向对象的编程语言。以下是一些Java的主要特性: 平台独立性:这是Java最著名的特性。Java采用"一次编写,到处运行"的方法。Java程序在任何支持Java的平台上都可以运行...

    JAVA上百实例源码以及开源项目

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    JAVA上百实例源码以及开源项目源代码

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 3...

    Java 1.6 API 中文 New

    java.util.concurrent 在并发编程中很常用的实用工具类。 java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,...

    java源码包4

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 ...

    java源码包3

     Tcp服务端与客户端的JAVA实例源代码,一个简单的Java TCP服务器端程序,别外还有一个客户端的程序,两者互相配合可以开发出超多的网络程序,这是最基础的部分。 递归遍历矩阵 1个目标文件,简单! 多人聊天室 ...

    java api最新7.0

    java.util.concurrent 在并发编程中很常用的实用工具类。 java.util.concurrent.atomic 类的小工具包,支持在单个变量上解除锁的线程安全编程。 java.util.concurrent.locks 为锁和等待条件提供一个框架的接口和类,...

Global site tag (gtag.js) - Google Analytics