`

Java并发编程: 使用Exchanger实现线程间的数据交换

 
阅读更多
本文介绍Exchanger工具类, 然后采用Exchanger给出一个两个线程交换数值的简单实例。 

1. Exchanger介绍 
Java代码  收藏代码
  1. /** 
  2.  * A synchronization point at which two threads can exchange objects. 
  3.  * Each thread presents some object on entry to the {@link #exchange 
  4.  * exchange} method, and receives the object presented by the other 
  5.  * thread on return. 
  6. */  


从上面的注释中可以看出:Exchanger提供了一个同步点在这个同步点,两个线程可以交换数据。每个线程通过exchange()方法的入口提供数据给另外的线程,并接收其它线程提供的数据,并返回。 

Exchanger通过Lock和Condition来完成功能,Exchanger的一个重要的public方法是exchange方法,用于线程的数据交换, 相关的类图以及详细的Exchanger类内容如下: 

 

Java代码  收藏代码
  1. package java.util.concurrent;  
  2. import java.util.concurrent.locks.*;  
  3.   
  4. /** 
  5.  * A synchronization point at which two threads can exchange objects. 
  6.  * Each thread presents some object on entry to the {@link #exchange 
  7.  * exchange} method, and receives the object presented by the other 
  8.  * thread on return. 
  9.  * 
  10.  * <p><b>Sample Usage:</b> 
  11.  * Here are the highlights of a class that uses an <tt>Exchanger</tt> to 
  12.  * swap buffers between threads so that the thread filling the 
  13.  * buffer gets a freshly 
  14.  * emptied one when it needs it, handing off the filled one to 
  15.  * the thread emptying the buffer. 
  16.  * <pre> 
  17.  * class FillAndEmpty { 
  18.  *   Exchanger<DataBuffer> exchanger = new Exchanger(); 
  19.  *   DataBuffer initialEmptyBuffer = ... a made-up type 
  20.  *   DataBuffer initialFullBuffer = ... 
  21.  * 
  22.  *   class FillingLoop implements Runnable { 
  23.  *     public void run() { 
  24.  *       DataBuffer currentBuffer = initialEmptyBuffer; 
  25.  *       try { 
  26.  *         while (currentBuffer != null) { 
  27.  *           addToBuffer(currentBuffer); 
  28.  *           if (currentBuffer.full()) 
  29.  *             currentBuffer = exchanger.exchange(currentBuffer); 
  30.  *         } 
  31.  *       } catch (InterruptedException ex) { ... handle ... } 
  32.  *     } 
  33.  *   } 
  34.  * 
  35.  *   class EmptyingLoop implements Runnable { 
  36.  *     public void run() { 
  37.  *       DataBuffer currentBuffer = initialFullBuffer; 
  38.  *       try { 
  39.  *         while (currentBuffer != null) { 
  40.  *           takeFromBuffer(currentBuffer); 
  41.  *           if (currentBuffer.empty()) 
  42.  *             currentBuffer = exchanger.exchange(currentBuffer); 
  43.  *         } 
  44.  *       } catch (InterruptedException ex) { ... handle ...} 
  45.  *     } 
  46.  *   } 
  47.  * 
  48.  *   void start() { 
  49.  *     new Thread(new FillingLoop()).start(); 
  50.  *     new Thread(new EmptyingLoop()).start(); 
  51.  *   } 
  52.  * } 
  53.  * </pre> 
  54.  * 
  55.  * @since 1.5 
  56.  * @author Doug Lea 
  57.  * @param <V> The type of objects that may be exchanged 
  58.  */  
  59. public class Exchanger<V> {  
  60.     private final ReentrantLock lock = new ReentrantLock();  
  61.     private final Condition taken = lock.newCondition();  
  62.   
  63.     /** Holder for the item being exchanged */  
  64.     private V item;  
  65.       
  66.     /** 
  67.      * Arrival count transitions from 0 to 1 to 2 then back to 0 
  68.      * during an exchange. 
  69.      */  
  70.     private int arrivalCount;  
  71.   
  72.     /** 
  73.      * Main exchange function, handling the different policy variants. 
  74.      */  
  75.     private V doExchange(V x, boolean timed, long nanos) throws InterruptedException, TimeoutException {  
  76.         lock.lock();  
  77.         try {  
  78.             V other;  
  79.   
  80.             // If arrival count already at two, we must wait for  
  81.             // a previous pair to finish and reset the count;  
  82.             while (arrivalCount == 2) {  
  83.                 if (!timed)  
  84.                     taken.await();  
  85.                 else if (nanos > 0)   
  86.                     nanos = taken.awaitNanos(nanos);  
  87.                 else   
  88.                     throw new TimeoutException();  
  89.             }  
  90.   
  91.             int count = ++arrivalCount;  
  92.   
  93.             // If item is already waiting, replace it and signal other thread  
  94.             if (count == 2) {   
  95.                 other = item;  
  96.                 item = x;  
  97.                 taken.signal();  
  98.                 return other;  
  99.             }  
  100.   
  101.             // Otherwise, set item and wait for another thread to  
  102.             // replace it and signal us.  
  103.   
  104.             item = x;  
  105.             InterruptedException interrupted = null;  
  106.             try {   
  107.                 while (arrivalCount != 2) {  
  108.                     if (!timed)  
  109.                         taken.await();  
  110.                     else if (nanos > 0)   
  111.                         nanos = taken.awaitNanos(nanos);  
  112.                     else   
  113.                         break// timed out  
  114.                 }  
  115.             } catch (InterruptedException ie) {  
  116.                 interrupted = ie;  
  117.             }  
  118.   
  119.             // Get and reset item and count after the wait.  
  120.             // (We need to do this even if wait was aborted.)  
  121.             other = item;  
  122.             item = null;  
  123.             count = arrivalCount;  
  124.             arrivalCount = 0;   
  125.             taken.signal();  
  126.               
  127.             // If the other thread replaced item, then we must  
  128.             // continue even if cancelled.  
  129.             if (count == 2) {  
  130.                 if (interrupted != null)  
  131.                     Thread.currentThread().interrupt();  
  132.                 return other;  
  133.             }  
  134.   
  135.             // If no one is waiting for us, we can back out  
  136.             if (interrupted != null)   
  137.                 throw interrupted;  
  138.             else  // must be timeout  
  139.                 throw new TimeoutException();  
  140.         } finally {  
  141.             lock.unlock();  
  142.         }  
  143.     }  
  144.   
  145.     /** 
  146.      * Create a new Exchanger. 
  147.      **/  
  148.     public Exchanger() {  
  149.     }  
  150.   
  151.     /** 
  152.      * Waits for another thread to arrive at this exchange point (unless 
  153.      * it is {@link Thread#interrupt interrupted}), 
  154.      * and then transfers the given object to it, receiving its object 
  155.      * in return. 
  156.      * <p>If another thread is already waiting at the exchange point then 
  157.      * it is resumed for thread scheduling purposes and receives the object 
  158.      * passed in by the current thread. The current thread returns immediately, 
  159.      * receiving the object passed to the exchange by that other thread. 
  160.      * <p>If no other thread is already waiting at the exchange then the  
  161.      * current thread is disabled for thread scheduling purposes and lies 
  162.      * dormant until one of two things happens: 
  163.      * [list] 
  164.      * <li>Some other thread enters the exchange; or 
  165.      * <li>Some other thread {@link Thread#interrupt interrupts} the current 
  166.      * thread. 
  167.      * [/list] 
  168.      * <p>If the current thread: 
  169.      * [list] 
  170.      * <li>has its interrupted status set on entry to this method; or  
  171.      * <li>is {@link Thread#interrupt interrupted} while waiting 
  172.      * for the exchange,  
  173.      * [/list] 
  174.      * then {@link InterruptedException} is thrown and the current thread's  
  175.      * interrupted status is cleared.  
  176.      * 
  177.      * @param x the object to exchange 
  178.      * @return the object provided by the other thread. 
  179.      * @throws InterruptedException if current thread was interrupted  
  180.      * while waiting 
  181.      **/  
  182.     public V exchange(V x) throws InterruptedException {  
  183.         try {  
  184.             return doExchange(x, false0);  
  185.         } catch (TimeoutException cannotHappen) {   
  186.             throw new Error(cannotHappen);  
  187.         }  
  188.     }  
  189.   
  190.     /** 
  191.      * Waits for another thread to arrive at this exchange point (unless 
  192.      * it is {@link Thread#interrupt interrupted}, or the specified waiting 
  193.      * time elapses), 
  194.      * and then transfers the given object to it, receiving its object 
  195.      * in return. 
  196.      * 
  197.      * <p>If another thread is already waiting at the exchange point then 
  198.      * it is resumed for thread scheduling purposes and receives the object 
  199.      * passed in by the current thread. The current thread returns immediately, 
  200.      * receiving the object passed to the exchange by that other thread. 
  201.      * 
  202.      * <p>If no other thread is already waiting at the exchange then the  
  203.      * current thread is disabled for thread scheduling purposes and lies 
  204.      * dormant until one of three things happens: 
  205.      * [list] 
  206.      * <li>Some other thread enters the exchange; or 
  207.      * <li>Some other thread {@link Thread#interrupt interrupts} the current 
  208.      * thread; or 
  209.      * <li>The specified waiting time elapses. 
  210.      * [/list] 
  211.      * <p>If the current thread: 
  212.      * [list] 
  213.      * <li>has its interrupted status set on entry to this method; or  
  214.      * <li>is {@link Thread#interrupt interrupted} while waiting 
  215.      * for the exchange,  
  216.      * [/list] 
  217.      * then {@link InterruptedException} is thrown and the current thread's  
  218.      * interrupted status is cleared.  
  219.      * 
  220.      * <p>If the specified waiting time elapses then {@link TimeoutException} 
  221.      * is thrown. 
  222.      * If the time is  
  223.      * less than or equal to zero, the method will not wait at all. 
  224.      * 
  225.      * @param x the object to exchange 
  226.      * @param timeout the maximum time to wait 
  227.      * @param unit the time unit of the <tt>timeout</tt> argument. 
  228.      * @return the object provided by the other thread. 
  229.      * @throws InterruptedException if current thread was interrupted 
  230.      * while waiting 
  231.      * @throws TimeoutException if the specified waiting time elapses before 
  232.      * another thread enters the exchange. 
  233.      **/  
  234.     public V exchange(V x, long timeout, TimeUnit unit)   
  235.         throws InterruptedException, TimeoutException {  
  236.         return doExchange(x, true, unit.toNanos(timeout));  
  237.     }  
  238.   
  239. }  


2. Exchanger工具类的使用案例 
本文给出一个简单的例子,实现两个线程之间交换数据,用Exchanger来做非常简单。 

Java代码  收藏代码
  1. package my.concurrent.exchanger;  
  2.   
  3. import java.util.concurrent.Exchanger;  
  4. import java.util.concurrent.atomic.AtomicReference;  
  5.   
  6. public class ThreadA implements Runnable {  
  7.   
  8.     private final Exchanger<Integer> exchanger;  
  9.   
  10.     private final AtomicReference<Integer> last = new AtomicReference<Integer>(  
  11.             5);  
  12.   
  13.     public ThreadA(Exchanger<Integer> exchanger) {  
  14.         this.exchanger = exchanger;  
  15.     }  
  16.   
  17.     public void run() {  
  18.         try {  
  19.             while (true) {  
  20.                 last.set(exchanger.exchange(last.get()));  
  21.                 System.out.println(" After calling exchange. Thread A has value: " + last.get());  
  22.                 Thread.sleep(2000);  
  23.             }  
  24.         } catch (InterruptedException e) {  
  25.             e.printStackTrace();  
  26.         }  
  27.     }  
  28.   
  29. }  


Java代码  收藏代码
  1. package my.concurrent.exchanger;  
  2.   
  3. import java.util.concurrent.Exchanger;  
  4. import java.util.concurrent.atomic.AtomicReference;  
  5.   
  6. public class ThreadB implements Runnable {  
  7.   
  8.     private Exchanger<Integer> exchanger;  
  9.   
  10.     private final AtomicReference<Integer> last = new AtomicReference<Integer>(  
  11.             10);  
  12.   
  13.     public ThreadB(Exchanger<Integer> exchanger) {  
  14.         this.exchanger = exchanger;  
  15.     }  
  16.   
  17.     public void run() {  
  18.         try {  
  19.             while (true) {  
  20.                 last.set(exchanger.exchange(last.get()));  
  21.                 System.out.println(" After calling exchange. Thread B has value: " + last.get());  
  22.                 Thread.sleep(2000);  
  23.             }  
  24.         } catch (InterruptedException e) {  
  25.             e.printStackTrace();  
  26.         }  
  27.     }  
  28.   
  29. }  


Java代码  收藏代码
  1. package my.concurrent.exchanger;  
  2.   
  3. import java.util.concurrent.Exchanger;  
  4.   
  5. public class ExchangerTest {  
  6.   
  7.     public static void main(String[] args) {  
  8.         Exchanger<Integer> exchanger = new Exchanger<Integer>();  
  9.         new Thread(new ThreadA(exchanger)).start();  
  10.         new Thread(new ThreadB(exchanger)).start();  
  11.     }  
  12.   
  13. }  


运行一段时间之后的输出结果如下: 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread A has value: 10 
After calling exchange. Thread B has value: 10 
After calling exchange. Thread A has value: 5 
After calling exchange. Thread B has value: 5 
After calling exchange. Thread A has value: 10 



可以看出:两个线程的数据一直都在相互交换。
 
分享到:
评论

相关推荐

    龙果 java并发编程原理实战

    龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...

    Java并发编程原理与实战

    并发工具类Exchanger详解.mp4 CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解....

    龙果java并发编程完整视频

    第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于...

    Java 并发编程原理与实战视频

    java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段24讲、线程间通信快速入门,使用wait和notify进行线程间的数据通信.mp4 │ 高并发编程第一阶段25讲、多Produce多Consume之间的通讯导致出现程序假死的原因分析.mp4 │ 高并发编程第一阶段26...

    Java多线程编程之使用Exchanger数据交换实例

    主要介绍了Java多线程编程之使用Exchanger数据交换实例,本文直接给出实例代码,需要的朋友可以参考下

    Java并发编程(学习笔记).xmind

    Java并发编程 背景介绍 并发历史 必要性 进程 资源分配的最小单位 线程 CPU调度的最小单位 线程的优势 (1)如果设计正确,多线程程序可以通过提高处理器资源的利用率来提升系统吞吐率 ...

    java并发编程

    第40节并发工具类Exchanger详解00:13:47分钟 | 第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于...

    java并发Exchanger的使用

    Exchanger是java 5引入的并发类,Exchanger顾名思义就是用来做交换的。这里主要是两个线程之间交换持有的对象。当Exchanger在一个线程中调用exchange方法之后,会等待另外的线程调用同样的exchange方法。 两个线程都...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段24讲、线程间通信快速入门,使用wait和notify进行线程间的数据通信.mp4 │ 高并发编程第一阶段25讲、多Produce多Consume之间的通讯导致出现程序假死的原因分析.mp4 │ 高并发编程第一阶段26...

    JAVA并发编程-2-线程并发工具类

    JAVA并发编程-2-线程并发工具类一、Fork/Join1、分而治之与工作密取2、使用标准范式3、Fork/Join的同步用法4、Fork/Join的异步用法二、CountDownLatch三、CyclicBarrier四、Semaphore信号量五、Exchanger ...

    【2018最新最详细】并发多线程教程

    【2018最新最详细】并发多线程教程,课程结构如下 1.并发编程的优缺点 2.线程的状态转换以及基本操作 ...26.大白话说java并发工具类-Semaphore,Exchanger 27.一篇文章,让你彻底弄懂生产者--消费者问题

    Java编程线程同步工具Exchanger的使用实例解析

    主要介绍了Java编程线程同步工具Exchanger的使用实例解析,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下

    Java:Exchanger类的作用.docx

    java.util.concurrent包中的Exchanger类可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动...

    java并发工具类(CountDownLatch+Semaphore+Exchanger)

    java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+Semaphore+Exchanger);java并发工具类(CountDownLatch+...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    java并发工具包详解

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

Global site tag (gtag.js) - Google Analytics