- 浏览: 460334 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
ty1972873004:
sunwang810812 写道我运行了这个例子,怎么结果是这 ...
Java并发编程: 使用Semaphore限制资源并发访问的线程数 -
lgh1992314:
simpleDean 写道请问,Logger.setLevel ...
Java内置Logger详解 -
sunwang810812:
我运行了这个例子,怎么结果是这样的:2号车泊车6号车泊车5号车 ...
Java并发编程: 使用Semaphore限制资源并发访问的线程数 -
jp260715007:
nanjiwubing123 写道参考你的用法,用如下方式实现 ...
面试题--三个线程循环打印ABC10次的几种解决方法 -
cb_0312:
SurnameDictionary文章我没看完,现在懂了
中文排序
本文介绍Exchanger工具类, 然后采用Exchanger给出一个两个线程交换数值的简单实例。
1. Exchanger介绍
/** * A synchronization point at which two threads can exchange objects. * Each thread presents some object on entry to the {@link #exchange * exchange} method, and receives the object presented by the other * thread on return. */
从上面的注释中可以看出:Exchanger提供了一个同步点,在这个同步点,两个线程可以交换数据。每个线程通过exchange()方法的入口提供数据给另外的线程,并接收其它线程提供的数据,并返回。
Exchanger通过Lock和Condition来完成功能,Exchanger的一个重要的public方法是exchange方法,用于线程的数据交换, 相关的类图以及详细的Exchanger类内容如下:
package java.util.concurrent; import java.util.concurrent.locks.*; /** * A synchronization point at which two threads can exchange objects. * Each thread presents some object on entry to the {@link #exchange * exchange} method, and receives the object presented by the other * thread on return. * * <p><b>Sample Usage:</b> * Here are the highlights of a class that uses an <tt>Exchanger</tt> to * swap buffers between threads so that the thread filling the * buffer gets a freshly * emptied one when it needs it, handing off the filled one to * the thread emptying the buffer. * <pre> * class FillAndEmpty { * Exchanger<DataBuffer> exchanger = new Exchanger(); * DataBuffer initialEmptyBuffer = ... a made-up type * DataBuffer initialFullBuffer = ... * * class FillingLoop implements Runnable { * public void run() { * DataBuffer currentBuffer = initialEmptyBuffer; * try { * while (currentBuffer != null) { * addToBuffer(currentBuffer); * if (currentBuffer.full()) * currentBuffer = exchanger.exchange(currentBuffer); * } * } catch (InterruptedException ex) { ... handle ... } * } * } * * class EmptyingLoop implements Runnable { * public void run() { * DataBuffer currentBuffer = initialFullBuffer; * try { * while (currentBuffer != null) { * takeFromBuffer(currentBuffer); * if (currentBuffer.empty()) * currentBuffer = exchanger.exchange(currentBuffer); * } * } catch (InterruptedException ex) { ... handle ...} * } * } * * void start() { * new Thread(new FillingLoop()).start(); * new Thread(new EmptyingLoop()).start(); * } * } * </pre> * * @since 1.5 * @author Doug Lea * @param <V> The type of objects that may be exchanged */ public class Exchanger<V> { private final ReentrantLock lock = new ReentrantLock(); private final Condition taken = lock.newCondition(); /** Holder for the item being exchanged */ private V item; /** * Arrival count transitions from 0 to 1 to 2 then back to 0 * during an exchange. */ private int arrivalCount; /** * Main exchange function, handling the different policy variants. */ private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException { lock.lock(); try { V other; // If arrival count already at two, we must wait for // a previous pair to finish and reset the count; while (arrivalCount == 2) { if (!timed) taken.await(); else if (nanos > 0) nanos = taken.awaitNanos(nanos); else throw new TimeoutException(); } int count = ++arrivalCount; // If item is already waiting, replace it and signal other thread if (count == 2) { other = item; item = x; taken.signal(); return other; } // Otherwise, set item and wait for another thread to // replace it and signal us. item = x; InterruptedException interrupted = null; try { while (arrivalCount != 2) { if (!timed) taken.await(); else if (nanos > 0) nanos = taken.awaitNanos(nanos); else break; // timed out } } catch (InterruptedException ie) { interrupted = ie; } // Get and reset item and count after the wait. // (We need to do this even if wait was aborted.) other = item; item = null; count = arrivalCount; arrivalCount = 0; taken.signal(); // If the other thread replaced item, then we must // continue even if cancelled. if (count == 2) { if (interrupted != null) Thread.currentThread().interrupt(); return other; } // If no one is waiting for us, we can back out if (interrupted != null) throw interrupted; else // must be timeout throw new TimeoutException(); } finally { lock.unlock(); } } /** * Create a new Exchanger. **/ public Exchanger() { } /** * Waits for another thread to arrive at this exchange point (unless * it is {@link Thread#interrupt interrupted}), * and then transfers the given object to it, receiving its object * in return. * <p>If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * <p>If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of two things happens: * [list] * <li>Some other thread enters the exchange; or * <li>Some other thread {@link Thread#interrupt interrupts} the current * thread. * [/list] * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@link Thread#interrupt interrupted} while waiting * for the exchange, * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @param x the object to exchange * @return the object provided by the other thread. * @throws InterruptedException if current thread was interrupted * while waiting **/ public V exchange(V x) throws InterruptedException { try { return doExchange(x, false, 0); } catch (TimeoutException cannotHappen) { throw new Error(cannotHappen); } } /** * Waits for another thread to arrive at this exchange point (unless * it is {@link Thread#interrupt interrupted}, or the specified waiting * time elapses), * and then transfers the given object to it, receiving its object * in return. * * <p>If another thread is already waiting at the exchange point then * it is resumed for thread scheduling purposes and receives the object * passed in by the current thread. The current thread returns immediately, * receiving the object passed to the exchange by that other thread. * * <p>If no other thread is already waiting at the exchange then the * current thread is disabled for thread scheduling purposes and lies * dormant until one of three things happens: * [list] * <li>Some other thread enters the exchange; or * <li>Some other thread {@link Thread#interrupt interrupts} the current * thread; or * <li>The specified waiting time elapses. * [/list] * <p>If the current thread: * [list] * <li>has its interrupted status set on entry to this method; or * <li>is {@link Thread#interrupt interrupted} while waiting * for the exchange, * [/list] * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * <p>If the specified waiting time elapses then {@link TimeoutException} * is thrown. * If the time is * less than or equal to zero, the method will not wait at all. * * @param x the object to exchange * @param timeout the maximum time to wait * @param unit the time unit of the <tt>timeout</tt> argument. * @return the object provided by the other thread. * @throws InterruptedException if current thread was interrupted * while waiting * @throws TimeoutException if the specified waiting time elapses before * another thread enters the exchange. **/ public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { return doExchange(x, true, unit.toNanos(timeout)); } }
2. Exchanger工具类的使用案例
本文给出一个简单的例子,实现两个线程之间交换数据,用Exchanger来做非常简单。
package my.concurrent.exchanger; import java.util.concurrent.Exchanger; import java.util.concurrent.atomic.AtomicReference; public class ThreadA implements Runnable { private final Exchanger<Integer> exchanger; private final AtomicReference<Integer> last = new AtomicReference<Integer>( 5); public ThreadA(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } public void run() { try { while (true) { last.set(exchanger.exchange(last.get())); System.out.println(" After calling exchange. Thread A has value: " + last.get()); Thread.sleep(2000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
package my.concurrent.exchanger; import java.util.concurrent.Exchanger; import java.util.concurrent.atomic.AtomicReference; public class ThreadB implements Runnable { private Exchanger<Integer> exchanger; private final AtomicReference<Integer> last = new AtomicReference<Integer>( 10); public ThreadB(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } public void run() { try { while (true) { last.set(exchanger.exchange(last.get())); System.out.println(" After calling exchange. Thread B has value: " + last.get()); Thread.sleep(2000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
package my.concurrent.exchanger; import java.util.concurrent.Exchanger; public class ExchangerTest { public static void main(String[] args) { Exchanger<Integer> exchanger = new Exchanger<Integer>(); new Thread(new ThreadA(exchanger)).start(); new Thread(new ThreadB(exchanger)).start(); } }
运行一段时间之后的输出结果如下:
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
可以看出:两个线程的数据一直都在相互交换。
发表评论
-
工厂类中移除if/else语句
2016-07-10 19:52 852面向对象语言的一个强大的特性是多态,它可以用来在代码中移除 ... -
Java编程练手100题
2014-12-11 17:13 6672本文给出100道Java编程练手的程序。 列表如下: 面 ... -
数组复制的三种方法
2014-11-30 12:57 2178本文将给出三种实现数组复制的方法 (以复制整数数组为例)。 ... -
数组复制的三种方法
2014-11-30 12:54 0本文将给出三种实现数组复制的方法 (以复制整数数组为例)。 ... -
四种复制文件的方法
2014-11-29 13:21 1688尽管Java提供了一个类ava.io.File用于文件的操 ... -
判断一个字符串中的字符是否都只出现一次
2014-11-25 12:58 2660本篇博文将给大家带来几个判断一个字符串中的字符是否都只出现一 ... -
使用正则表达式判断一个数是否为素数
2014-11-23 13:35 2106正则表达式能够用于判断一个数是否为素数,这个以前完全没有想过 ... -
几个可以用英文单词表达的正则表达式
2014-11-21 13:12 3703本文,我们将来看一下几个可以用英文单词表达的正则表达式。这些 ... -
(广度优先搜索)打印所有可能的括号组合
2014-11-20 11:58 1920问题:给定一个正整n,作为括号的对数,输出所有括号可能 ... -
随机产生由特殊字符,大小写字母以及数字组成的字符串,且每种字符都至少出现一次
2014-11-19 14:48 3943题目:随机产生字符串,字符串中的字符只能由特殊字符 (! ... -
找出1到n缺失的一个数
2014-11-18 12:57 3119题目:Problem description: You h ... -
EnumSet的几个例子
2014-11-14 16:24 8708EnumSet 是一个与枚举类型一起使用的专用 Set 实现 ... -
给定两个有序数组和一个指定的sum值,从两个数组中各找一个数使得这两个数的和与指定的sum值相差最小
2014-11-12 11:24 3286题目:给定两个有序数组和一个指定的sum值,从两个数组 ... -
Java面试编程题练手
2014-11-04 22:49 6657面试编程 写一个程序,去除有序数组中的重复数字 编 ... -
Collections用法整理
2014-10-22 20:55 9799Collections (java.util.Collect ... -
The Code Sample 代码实例 个人博客开通
2014-09-04 18:48 1375个人博客小站开通 http://thecodesample. ... -
Collections.emptyXXX方法
2014-06-08 13:37 2105从JDK 1.5开始, Collections集合工具类中预先 ... -
这代码怎么就打印出"hello world"了呢?
2014-06-08 00:37 7356for (long l = 4946144450195624L ... -
最短时间过桥
2014-04-21 22:03 4073本文用代码实现最短时间过桥,并且打印如下两个例子的最小过桥时间 ... -
将数组分割成差值最小的子集
2014-04-20 22:34 2840本文使用位掩码实现一个功能 ==》将数组分割成差值最小的子集 ...
相关推荐
龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...
并发工具类Exchanger详解.mp4 CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解....
第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于...
java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...
│ 高并发编程第一阶段24讲、线程间通信快速入门,使用wait和notify进行线程间的数据通信.mp4 │ 高并发编程第一阶段25讲、多Produce多Consume之间的通讯导致出现程序假死的原因分析.mp4 │ 高并发编程第一阶段26...
主要介绍了Java多线程编程之使用Exchanger数据交换实例,本文直接给出实例代码,需要的朋友可以参考下
Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...
第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于...
Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。 两个线程都...
│ 高并发编程第一阶段24讲、线程间通信快速入门,使用wait和notify进行线程间的数据通信.mp4 │ 高并发编程第一阶段25讲、多Produce多Consume之间的通讯导致出现程序假死的原因分析.mp4 │ 高并发编程第一阶段26...
JAVA并发编程-2-线程并发工具类一、Fork/Join1、分而治之与工作密取2、使用标准范式3、Fork/Join的同步用法4、Fork/Join的异步用法二、CountDownLatch三、CyclicBarrier四、Semaphore信号量五、Exchanger ...
【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 ...26.大白话说java并发工具类-Semaphore,Exchanger 27.一篇文章,让你彻底弄懂生产者--消费者问题
主要介绍了Java编程线程同步工具Exchanger的使用实例解析,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
java.util.concurrent包中的Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动...
java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...