本文介绍Exchanger工具类, 然后采用Exchanger给出一个两个线程交换数值的简单实例。
1. Exchanger介绍
从上面的注释中可以看出:Exchanger提供了一个同步点,在这个同步点,两个线程可以交换数据。每个线程通过exchange()方法的入口提供数据给另外的线程,并接收其它线程提供的数据,并返回。
Exchanger通过Lock和Condition来完成功能,Exchanger的一个重要的public方法是exchange方法,用于线程的数据交换, 相关的类图以及详细的Exchanger类内容如下:
2. Exchanger工具类的使用案例
本文给出一个简单的例子,实现两个线程之间交换数据,用Exchanger来做非常简单。
运行一段时间之后的输出结果如下:
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
可以看出:两个线程的数据一直都在相互交换。
1. Exchanger介绍
- /**
- * A synchronization point at which two threads can exchange objects.
- * Each thread presents some object on entry to the {@link #exchange
- * exchange} method, and receives the object presented by the other
- * thread on return.
- */
从上面的注释中可以看出:Exchanger提供了一个同步点,在这个同步点,两个线程可以交换数据。每个线程通过exchange()方法的入口提供数据给另外的线程,并接收其它线程提供的数据,并返回。
Exchanger通过Lock和Condition来完成功能,Exchanger的一个重要的public方法是exchange方法,用于线程的数据交换, 相关的类图以及详细的Exchanger类内容如下:
- package java.util.concurrent;
- import java.util.concurrent.locks.*;
- /**
- * A synchronization point at which two threads can exchange objects.
- * Each thread presents some object on entry to the {@link #exchange
- * exchange} method, and receives the object presented by the other
- * thread on return.
- *
- * <p><b>Sample Usage:</b>
- * Here are the highlights of a class that uses an <tt>Exchanger</tt> to
- * swap buffers between threads so that the thread filling the
- * buffer gets a freshly
- * emptied one when it needs it, handing off the filled one to
- * the thread emptying the buffer.
- * <pre>
- * class FillAndEmpty {
- * Exchanger<DataBuffer> exchanger = new Exchanger();
- * DataBuffer initialEmptyBuffer = ... a made-up type
- * DataBuffer initialFullBuffer = ...
- *
- * class FillingLoop implements Runnable {
- * public void run() {
- * DataBuffer currentBuffer = initialEmptyBuffer;
- * try {
- * while (currentBuffer != null) {
- * addToBuffer(currentBuffer);
- * if (currentBuffer.full())
- * currentBuffer = exchanger.exchange(currentBuffer);
- * }
- * } catch (InterruptedException ex) { ... handle ... }
- * }
- * }
- *
- * class EmptyingLoop implements Runnable {
- * public void run() {
- * DataBuffer currentBuffer = initialFullBuffer;
- * try {
- * while (currentBuffer != null) {
- * takeFromBuffer(currentBuffer);
- * if (currentBuffer.empty())
- * currentBuffer = exchanger.exchange(currentBuffer);
- * }
- * } catch (InterruptedException ex) { ... handle ...}
- * }
- * }
- *
- * void start() {
- * new Thread(new FillingLoop()).start();
- * new Thread(new EmptyingLoop()).start();
- * }
- * }
- * </pre>
- *
- * @since 1.5
- * @author Doug Lea
- * @param <V> The type of objects that may be exchanged
- */
- public class Exchanger<V> {
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition taken = lock.newCondition();
- /** Holder for the item being exchanged */
- private V item;
- /**
- * Arrival count transitions from 0 to 1 to 2 then back to 0
- * during an exchange.
- */
- private int arrivalCount;
- /**
- * Main exchange function, handling the different policy variants.
- */
- private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {
- lock.lock();
- try {
- V other;
- // If arrival count already at two, we must wait for
- // a previous pair to finish and reset the count;
- while (arrivalCount == 2) {
- if (!timed)
- taken.await();
- else if (nanos > 0)
- nanos = taken.awaitNanos(nanos);
- else
- throw new TimeoutException();
- }
- int count = ++arrivalCount;
- // If item is already waiting, replace it and signal other thread
- if (count == 2) {
- other = item;
- item = x;
- taken.signal();
- return other;
- }
- // Otherwise, set item and wait for another thread to
- // replace it and signal us.
- item = x;
- InterruptedException interrupted = null;
- try {
- while (arrivalCount != 2) {
- if (!timed)
- taken.await();
- else if (nanos > 0)
- nanos = taken.awaitNanos(nanos);
- else
- break; // timed out
- }
- } catch (InterruptedException ie) {
- interrupted = ie;
- }
- // Get and reset item and count after the wait.
- // (We need to do this even if wait was aborted.)
- other = item;
- item = null;
- count = arrivalCount;
- arrivalCount = 0;
- taken.signal();
- // If the other thread replaced item, then we must
- // continue even if cancelled.
- if (count == 2) {
- if (interrupted != null)
- Thread.currentThread().interrupt();
- return other;
- }
- // If no one is waiting for us, we can back out
- if (interrupted != null)
- throw interrupted;
- else // must be timeout
- throw new TimeoutException();
- } finally {
- lock.unlock();
- }
- }
- /**
- * Create a new Exchanger.
- **/
- public Exchanger() {
- }
- /**
- * Waits for another thread to arrive at this exchange point (unless
- * it is {@link Thread#interrupt interrupted}),
- * and then transfers the given object to it, receiving its object
- * in return.
- * <p>If another thread is already waiting at the exchange point then
- * it is resumed for thread scheduling purposes and receives the object
- * passed in by the current thread. The current thread returns immediately,
- * receiving the object passed to the exchange by that other thread.
- * <p>If no other thread is already waiting at the exchange then the
- * current thread is disabled for thread scheduling purposes and lies
- * dormant until one of two things happens:
- * [list]
- * <li>Some other thread enters the exchange; or
- * <li>Some other thread {@link Thread#interrupt interrupts} the current
- * thread.
- * [/list]
- * <p>If the current thread:
- * [list]
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@link Thread#interrupt interrupted} while waiting
- * for the exchange,
- * [/list]
- * then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- *
- * @param x the object to exchange
- * @return the object provided by the other thread.
- * @throws InterruptedException if current thread was interrupted
- * while waiting
- **/
- public V exchange(V x) throws InterruptedException {
- try {
- return doExchange(x, false, 0);
- } catch (TimeoutException cannotHappen) {
- throw new Error(cannotHappen);
- }
- }
- /**
- * Waits for another thread to arrive at this exchange point (unless
- * it is {@link Thread#interrupt interrupted}, or the specified waiting
- * time elapses),
- * and then transfers the given object to it, receiving its object
- * in return.
- *
- * <p>If another thread is already waiting at the exchange point then
- * it is resumed for thread scheduling purposes and receives the object
- * passed in by the current thread. The current thread returns immediately,
- * receiving the object passed to the exchange by that other thread.
- *
- * <p>If no other thread is already waiting at the exchange then the
- * current thread is disabled for thread scheduling purposes and lies
- * dormant until one of three things happens:
- * [list]
- * <li>Some other thread enters the exchange; or
- * <li>Some other thread {@link Thread#interrupt interrupts} the current
- * thread; or
- * <li>The specified waiting time elapses.
- * [/list]
- * <p>If the current thread:
- * [list]
- * <li>has its interrupted status set on entry to this method; or
- * <li>is {@link Thread#interrupt interrupted} while waiting
- * for the exchange,
- * [/list]
- * then {@link InterruptedException} is thrown and the current thread's
- * interrupted status is cleared.
- *
- * <p>If the specified waiting time elapses then {@link TimeoutException}
- * is thrown.
- * If the time is
- * less than or equal to zero, the method will not wait at all.
- *
- * @param x the object to exchange
- * @param timeout the maximum time to wait
- * @param unit the time unit of the <tt>timeout</tt> argument.
- * @return the object provided by the other thread.
- * @throws InterruptedException if current thread was interrupted
- * while waiting
- * @throws TimeoutException if the specified waiting time elapses before
- * another thread enters the exchange.
- **/
- public V exchange(V x, long timeout, TimeUnit unit)
- throws InterruptedException, TimeoutException {
- return doExchange(x, true, unit.toNanos(timeout));
- }
- }
2. Exchanger工具类的使用案例
本文给出一个简单的例子,实现两个线程之间交换数据,用Exchanger来做非常简单。
- package my.concurrent.exchanger;
- import java.util.concurrent.Exchanger;
- import java.util.concurrent.atomic.AtomicReference;
- public class ThreadA implements Runnable {
- private final Exchanger<Integer> exchanger;
- private final AtomicReference<Integer> last = new AtomicReference<Integer>(
- 5);
- public ThreadA(Exchanger<Integer> exchanger) {
- this.exchanger = exchanger;
- }
- public void run() {
- try {
- while (true) {
- last.set(exchanger.exchange(last.get()));
- System.out.println(" After calling exchange. Thread A has value: " + last.get());
- Thread.sleep(2000);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- package my.concurrent.exchanger;
- import java.util.concurrent.Exchanger;
- import java.util.concurrent.atomic.AtomicReference;
- public class ThreadB implements Runnable {
- private Exchanger<Integer> exchanger;
- private final AtomicReference<Integer> last = new AtomicReference<Integer>(
- 10);
- public ThreadB(Exchanger<Integer> exchanger) {
- this.exchanger = exchanger;
- }
- public void run() {
- try {
- while (true) {
- last.set(exchanger.exchange(last.get()));
- System.out.println(" After calling exchange. Thread B has value: " + last.get());
- Thread.sleep(2000);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- package my.concurrent.exchanger;
- import java.util.concurrent.Exchanger;
- public class ExchangerTest {
- public static void main(String[] args) {
- Exchanger<Integer> exchanger = new Exchanger<Integer>();
- new Thread(new ThreadA(exchanger)).start();
- new Thread(new ThreadB(exchanger)).start();
- }
- }
运行一段时间之后的输出结果如下:
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 10
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
After calling exchange. Thread B has value: 10
After calling exchange. Thread A has value: 5
After calling exchange. Thread B has value: 5
After calling exchange. Thread A has value: 10
可以看出:两个线程的数据一直都在相互交换。
相关推荐
龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...
并发工具类Exchanger详解.mp4 CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解....
第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于...
java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...
│ 高并发编程第一阶段24讲、线程间通信快速入门,使用wait和notify进行线程间的数据通信.mp4 │ 高并发编程第一阶段25讲、多Produce多Consume之间的通讯导致出现程序假死的原因分析.mp4 │ 高并发编程第一阶段26...
主要介绍了Java多线程编程之使用Exchanger数据交换实例,本文直接给出实例代码,需要的朋友可以参考下
Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...
第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于...
Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。 两个线程都...
│ 高并发编程第一阶段24讲、线程间通信快速入门,使用wait和notify进行线程间的数据通信.mp4 │ 高并发编程第一阶段25讲、多Produce多Consume之间的通讯导致出现程序假死的原因分析.mp4 │ 高并发编程第一阶段26...
JAVA并发编程-2-线程并发工具类一、Fork/Join1、分而治之与工作密取2、使用标准范式3、Fork/Join的同步用法4、Fork/Join的异步用法二、CountDownLatch三、CyclicBarrier四、Semaphore信号量五、Exchanger ...
【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 ...26.大白话说java并发工具类-Semaphore,Exchanger 27.一篇文章,让你彻底弄懂生产者--消费者问题
主要介绍了Java编程线程同步工具Exchanger的使用实例解析,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
java.util.concurrent包中的Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动...
java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...