`
wbj0110
  • 浏览: 1550158 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

如何让Java以光的速度跨线程通信?

阅读更多

一个比Disruptor吞吐量等性能指标更好的框架,使用Railway算法,将线程之间的消费发送参考现实生活中火车在站点之间搬运货物。

目标起始于一个简单的想法:创建一个开发人员友好的,简单的,轻量级线程间的通信框架,无需使用任何锁,同步器,信号量,等待,通知以及没有队列,消息,事件或任何其它并发特定的语法或工具。

只是一个Java接口接受到POJO以后在其背后实现这个通信,这个主意很类似Akka的Actors,但是它也许是有点矫枉过正,特别是对于单个多核计算机上线程间的通信优化必须是轻量的。

Akka的伟大之处是跨进程通信,特别是Actor是能够跨越不同JVM节点实现分布式通信。

无论如何,你可能觉得使用Akka在一个小型项目上有些过度,因为你只需要线程之间的通信,但是你还是想使用类似Actor这种做法模式。

该文章作者使用了动态代理 堵塞队列和一个缓存的线程池创建了这个解决方案,如图:


SPSC队列是一个Single Producer/Single Consumer 队列(单生产者/单消费者),而MPSC是一个Multi Producer/Single Consumer队列。

Dispatcher线程从Actor线程接受到消息,然后发送到相应的SPSC中。

Actor线程从接受的消息中使用数据,调用相应的actor类的方法,Actor实例都是发送消息给MPSC队列,然后再从Actor线程那里得到消息。

下面是ping-pong案例:

public interface PlayerA (
  void pong(long ball); //send and forget method call 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //send and forget method call    
}    
 public class PlayerAImpl implements PlayerA {    
  @Override    
  @ublic void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
// this manager hides the complexity of inter-thread communications   // and it takes control over actor proxies, actor implementations and threads    
    ActorManager manager = new ActorManager();
// registers actor implementations inside the manager   
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
//Create actor proxies. Proxies convert method calls into internal messages    //which would be sent between threads to a specific actor instance.    
   PlayerA playerA = manager.createActor(PlayerA.class);    
   PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}


这两个play能够每秒打500,000个乒乓。但是如果和单个线程执行速度相比,还是很差的,同样代码在单个线程可以到达每秒两百万个。

作者开始研究缓慢的原因,在一些校验和测试以后,他认为是Actors之间发送消息影响了整体性能:


作者找到一个SPSC单生产者和单消费者的无锁队列,http://www.infoq.com/presentations/Lock-Free-Algorithms

无锁队列提供比锁队列更好的性能。锁队列中在当一个线程获得锁,其他线程将被阻塞,直到该锁被释放的。在无锁算法的情况下,生产者线程可以产生消息,但不阻止其他生产者线程,以及其他消费者,而从队列中读取的消费者不会被阻塞。

这个无锁队列据测试结果是超过每秒100M ops,是JDK的并发队列实现的10倍。

但是作者使用这个无锁队列提到SPSC 以后,并没有产生明显性能提升,他立即意识到这个框架的性能瓶颈不是在SPSC,而是在多个生产者/单个消费者(MPSC)那里。

多个生产者如果使用SPSC会覆盖彼此的值,因为SPSC并没有一个对生产者的控制机制,即使最快的SPSC也不适合。

对于MPSC作者找到了LMAX的disruptor,一个通过Ringbuffer实现的高性能线程间通信库包。



使用Disruptor很容易实现非常低延迟,高吞吐量的线程间消息通信。它还提供了用例对生产者和消费者的不同组合。多个线程可以从环形缓冲区中读取而不会阻塞对方:



多生产者和多消费者:


三个生产者/一个消费者测试结果显示,Disruptor都是两倍于LinkedBlockingQueue 。

但是使用Disruptor后的这个框架性能还是没有达到预期,作者从上下班的地铁中得到灵感,在某个站点同一车厢出来的人是生产者,进去的是消费者。

建立一个Railway类,使用AtomicLong来跟踪地铁在站与站之间的传递,下面是一个single-train railway:

public class RailWay {  
 private final Train train = new Train();  
 //站台号码stationNo 跟踪火车,定义哪个站点接受火车
 private final AtomicInteger stationIndex = new AtomicInteger();
//多线程访问这个方法,也就是在特定站点等待火车
public Train waitTrainOnStation(final int stationNo) {
  
   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // this is necessary to keep a high throughput of message passing.   //But it eats CPU cycles while waiting for a train  
   }  
   // the busy loop returns only when the station number will match  // stationIndex.get() % stationCount condition

   return train;
 }
//这个方法通过增加火车站台号将火车移到下一个站点。  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }



参考Disruptor,创建线程间传递long值:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // array to transfer freight goods

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { // returns the count of goods    
  return index;    
 }    
 public void addGoods(long i) { // adds item to the train    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //removes the item from the train    
  index--;    
  return goodsArray[i];    
 }    
}


如下图两个线程传递long:


使用一列火车实现单个生产者单个消费者:

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //starting a consumer thread    
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //waits for the train at the station #1    
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // unload goods    
      }    
      railway.sendTrain(); //sends the current train to the first station.    
     }    
   }    
 }.start();

 final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // waits for the train on the station #0    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // adds goods to the train    
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //measures the performance per each 100M items   
    final long duration = System.nanoTime() - start;|    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,d\n", ops);    
    System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%n\n", 
                       duration / (float)(i) * (float) Train.CAPACITY);    
  }    
 }    
}


通过测试,它达到 767,028,751 ops/sec ,是Nitsan’s blog.(第一个采用)的SPSC队列的几倍。

下面假设如果能有两列火车,每个站点有自己的火车,一个火车在第一个站点加载货物,第二列火车在第二个站点加载货物:


经过测试吞吐量是单列火车的1.4被,延迟从192.6纳秒降低到133.5纳秒。

但是线程间传输消息延迟是因为火车容量2048导致2178.4纳秒,通过增加火车降低这个延迟,如下图:


当在两个线程之间使用32,768 列火车传递一个long值,其延迟降低到13.9纳秒。到此吞吐量和延迟达到了一个平衡。

这只是SPSC的实现,纳秒多个生产者如何实现呢?答案是加入更多站点。



每个线程等待下一列火车,然后加载卸装消息,再把火车发到下一个站,而生产者线程放入消息到火车而消费者是从其中获得,火车总是从一个站到另外一个站循环不断移动。

测试了SPMC单个生产者和多个消费者,使用8个站点,一个属于生产者,剩余7个是消费者。

火车数量是256 火车容量是32时,测试结果是:吞吐量和延迟:
ops/sec = 116,604,397
latency nanos = 274.4
而火车数量是32而火车容量是256时:
ops/sec = 432,055,469
latency nanos = 592.5

后者相对是一个好的结果,延迟虽然提高,但是吞吐量提高的倍数要高得多。

分享到:
评论

相关推荐

    Java毕业设计-JAVA网络通信系统的研究与开发(源代码+论文+开题报告).rar

    通过采用Java的Socket编程和多线程技术,系统能够实现客户端与服务器之间的即时通信,同时支持多人在线聊天、文件传输以及音视频通话等多种功能。 在开发过程中,我们充分利用了Java的跨平台特性,使得该系统可以在...

    JAVA多线程编程详解-详细操作例子

    线程又称为轻量级进程,它和进程一样拥有独立的执行控制,由操作系统负责调度,区别在于线程没有独立的存储空间,而是和所属进程中的其它线程共享一个存储空间,这使得线程间的通信远较进程简单。 具体到java内存...

    JAVA_API1.6文档(中文)

    java.applet 提供创建 applet 所必需的类和 applet 用来与其 applet 上下文通信的类。 java.awt 包含用于创建用户界面和绘制图形图像的所有类。 java.awt.color 提供用于颜色空间的类。 java.awt.datatransfer ...

    JAVA上百实例源码以及开源项目

     Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java文件传输实例不可错过,Java网络编程技能的提升很有帮助。 Java聊天程序,包括服务端和...

    JAVA上百实例源码以及开源项目源代码

     Java局域网通信——飞鸽传书源代码,大家都知道VB版、VC版还有Delphi版的飞鸽传书软件,但是Java版的确实不多,因此这个Java文件传输实例不可错过,Java网络编程技能的提升很有帮助。 Java聊天程序,包括服务端和...

    Java 1.6 API 中文 New

    java.applet 提供创建 applet 所必需的类和 applet 用来与其 applet 上下文通信的类。 java.awt 包含用于创建用户界面和绘制图形图像的所有类。 java.awt.color 提供用于颜色空间的类。 java.awt.datatransfer 提供...

    java开源包4

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包101

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包11

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包6

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包9

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包5

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包8

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包10

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包3

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    java开源包1

    MyBatchFramework 是一个开源的轻量级的用以创建可靠的易管理的批量作业的Java包,主要特点是多线程、调度、JMX管理和批量执行报表,执行历史等。 SIP协议包 jSIP.tar jSIP这个Java包目标是用Java实现SIP(SIP:...

    一款分布式的java游戏服务器框架,具备高性能、可伸缩、分布式、多线程等特点,java 8 +gradle 4.0

    Disruptor 高性能线程间消息传递库,通过它来实现“消息中心”,跨线程消息传递so easy! HikariCP 稳定、高性能的JDBC连接池。github star破11k! logback 快速、灵活的日志库,log4j作者的续作。 fastjson 马爸爸家...

    java api最新7.0

    java.applet 提供创建 applet 所必需的类和 applet 用来与其 applet 上下文通信的类。 java.awt 包含用于创建用户界面和绘制图形图像的所有类。 java.awt.color 提供用于颜色空间的类。 java.awt.datatransfer 提供...

    用java实现安卓handler消息机制

    使用java实现安卓的handler消息机制,支持跨线程通信和发送延迟消息

Global site tag (gtag.js) - Google Analytics