一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据。而每台服务器(8核12G)上CPU占用不到100%,load不超过5。这是怎么做到呢?下面将给你描述这个架构,它的核心是一个高效缓冲区设计,我们对它的要求是:
1,该缓存区要尽量简单
2,尽量避免生产者线程和消费者线程锁
3,尽量避免大量GC
缓冲 vs 性能瓶颈
提高硬盘写入IO的银弹无疑是批量顺序写,无论是在业界流行的分布式文件系统或数据,HBase,GFS和HDFS,还是以磁盘文件为持久化方式的消息队列Kafka都采用了在内存缓存数据然后再批量写入的策略。这一个策略的性能核心就是内存中缓冲区设计。这是一个经典的数据产生者和消费者场景,缓冲区的要求是当同步写入和读出时:(1)写满则不写(2)读空则不读(3)不丢失数据(4)不读重复数据。最直接也是常用的方式就是JDK自带的LinkedBlockingQueue。LinkedBlockingQueue是一个带锁的消息队列,写入和读出时加锁,完全满缓冲区上面的四个要求。但是当你的程序跑起来之后,看看那个线程CPU消耗最高?往往就是在线程读LinkedBlockingQueue锁的时候,这也成为很多对吞吐要求很高的程序的性能瓶颈。
Disruptor
解决加锁队列产生的性能问题?Disruptor是一个选择。Disruptor是什么?看看开源它的公司LMAX自己是怎么介绍的:
我们花费了大量的精力去实现更高性能的队列,但是,事实证明队列作为一种基础的数据结构带有它的局限性——在生产者、消费者、以及它们的数据存储之间的合并设计问题。Disruptor就是我们在构建这样一种能够清晰地分割这些关注问题的数据结构过程中所诞生的成果。
OK,Disruptor是用来解决我们这个场景的问题的,而且它不是队列。那么它是什么并且如何实现高效呢?我这里不做过多介绍,网上类似资料很多,简单的总结:
1,Disruptor使用了一个RingBuffer替代队列,用生产者消费者指针替代锁。
2,生产者消费者指针使用CPU支持的整数自增,无需加锁并且速度很快。Java的实现在Unsafe package中。
使用Disruptor,首先需要构建一个RingBuffer,并指定一个大小,注意如果RingBuffer里面数据超过了这个大小则会覆盖旧数据。这可能是一个风险,但Disruptor提供了检查RingBuffer是否写满的机制用于规避这个问题。而且根据maoyidao测试结果,写满的可能性不大,因为Disrutpor确实高效,除非你的消费线程太慢。
并且使用一个单独的线程去处理RingBuffer中的数据:
RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(RING_SIZE),
new SleepingWaitStrategy());
SequenceBarrier barrier = ringBuffer.newBarrier();
BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, barrier, handler);
ringBuffer.setGatingSequences(eventProcessor.getSequence());
// only support single thread
new Thread(eventProcessor).start();
ValueEvent通常是个自定义的类,用于封装你自己的数据:
public class ValueEvent {
private byte[] packet;
public byte[] getValue()
{
return packet;
}
public void setValue(final byte[] packet)
{
this.packet = packet;
}
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}
生产者通过RingBuffer.publish方法向buffer中添加数据,同时发出一个事件通知消费者有新数据达到,并且,,,注意我们是怎么规避数据覆盖问题的:
// Publishers claim events in sequence
long sequence = ringBuffer.next();
// if capacity less than 10%, don't use ringbuffer anymore
if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) {
log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %");
// do something
}
else {
ValueEvent event = ringBuffer.get(sequence);
event.setValue(packet); // this could be more complex with multiple fields
// make the event available to EventProcessors
ringBuffer.publish(sequence);
}
数据消费者代码在EventHandler中实现:
final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()
{
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
byte[] packet = event.getValue();
// do something
}
};
很好,完成!用以上代码跑个压测,结果果然比加锁队列快很多(Disruptor官网上有benchmark数据,我这里就不提供对比数据)。好,用到线上环境。。。。结果是。。。CPU反而飙升了!??
Disruptor的坑
书接上文,Disruptor压测良好,但上线之后CPU使用达到650%,LOAD接近300!分析diruptor源码可知,造成cpu过高的原因是 RingBuffer 的waiting策略,Disruptor官网例子使用的策略是 SleepingWaitStrategy ,这个类的策略是当没有新数据写入RingBuffer时,每1ns检查一次RingBuffer cursor。1ns!跟死循环没什么区别,因此CPU暴高。改成每100ms检查一次,CPU立刻降为7.8%。
为什么Disruptor官网例子使用这种有如此风险的SleepingWaitStrategy呢?原因是此策略完全不使用锁,当吞吐极高时,RingBuffer中始终有数据存在,通过轮询策略就能最大程度的把它的性能优势发挥出来。但这显然是理想状态,互联网应用有明显的高峰低谷,不可能总处于满负荷状态。因此还是BlockingWaitStrategy 这种锁通知机制更好:
RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
new SingleThreadedClaimStrategy(RING_SIZE),
new BlockingWaitStrategy());
这样写入不加锁,读出加锁。相对加锁队列少了一半,性能还是有显著提高。
还有没有更好的方法?
Disruptor是实现缓冲区的很好选择。但它本质的目的是提供线程间交换数据的高效实现,这是一个很好的通用选择。那么真对我们数据异步批量落地的场景,还有没有更好的选择呢?答案是:Yes,we have!我最终设计了一个非常简单的buffer,原因是:
1,Disruptor很好,但毕竟多引入了一个依赖,对于新同学也有学习成本。
2,Disruptor不能很好的解决GC过多的问题。
那么更好的缓存是什么呢?这首先要从场景说起。
首先的问题是:我需要一个buffer,但为啥要一个跨线程buffer呢?如果我用同一个线程读,再用这个线程去写,这个buffer完全是线程本地buffer,锁本身就无意义。同时异步Database落地没有严格的顺序要求,因此我是多线程同步读写,也不需要集中时的buffer来维护顺序,因此一个内置于线程中的二维byte[][]数组就可以解决全部问题!
public class ThreadLocalBoundedMQ {
private long lastFlushTime=0L;
private byte[][] msgs=new byte[Constants.BATCH_INS_COUNT][];
private int offset=0;
public byte[][] getMsgs(){
return msgs;
}
public void addMsg(byte[] msg)
{
msgs[offset++]=msg;
}
public int size() {
return offset;
}
public void clear() {
offset=0;
lastFlushTime=System.currentTimeMillis();
}
public boolean needFlush(){
return (System.currentTimeMillis()-lastFlushTime > Constants.MAX_BUFFER_TIME)
&& offset>0;
}
}
实际测试和上线效果良好(效果见本文第一节)!
总结
能够使用最简化的代码完成性能和业务要求,是最完美的方法。根据使用场景,你可以有很多假设,但不要被眼花缭乱的新技术迷惑而拿你自己的服务做小白鼠,最适合的,最简单的,就是最好的。
本文系maoyidao原创,转载请引用原链接:
同时推荐本系列前2篇
构建高性能服务(一)ConcurrentSkipListMap和链表构建高性能Java Memcached
http://maoyidao.iteye.com/blog/1559420
构建高性能服务(二)java高并发锁的3种实现
http://maoyidao.iteye.com/blog/1563523
分享到:
相关推荐
disruptor:高性能Java线程间通讯库
Java工具:高性能并发工具Disruptor简单使用
本文翻译自LMAX关于Disruptor的...目前Disruptor版本已经迭代至3.0,本论文是基于Disruptor1.0写就,在新版本中,相对与1.0版本,其核心设计思想没有变,只是实现细节有所调整和优化,因此,此论文仍然很有研读意义。
disruptor 缓冲队列 高效
Disruptor 高性能线程间消息传递库,通过它来实现“消息中心”,跨线程消息传递so easy! HikariCP 稳定、高性能的JDBC连接池。github star破11k! logback 快速、灵活的日志库,log4j作者的续作。 fastjson 马爸爸家...
Okra是一个简单的使用JAVA开发的高性能,高扩展,高并发,低延迟的服务器框架。 主要目的是帮助中小团队快速开发实现网络游戏服务端。 本项目包含完整的MMORPG等游戏服务器的DEMO. Dependencies: JDK 1.8 : 框架...
CPU缓存架构详解&高性能内存队列Disruptor实战
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...
11-线程池 ThreadPoolExecutor 底层原理源码分析(上)-...15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1).pdf designpattern.zip disruptor.zip forkjoin.zip jmm(1).zip
Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了特别简单的设计。 2017 Conversant Disruptor - 仍然是世界上最快的入门运行 maven build 来构建和使用包。 $ mvn -U ...
Disruptor:一种高性能的、在并发线程间数据交换领域用于替换有界限队列的方案
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip
Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他...Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
标签:disruptor、lmax、jar包、java、中文文档; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化翻译,文档中的代码和结构保持不变,注释和说明精准翻译,请放心使用...
Disruptor它是一个开源的并发框架能够在无锁的情况下实现...同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
High performance alternative to bounded queues for exchanging data between concurrent threads Disruptor 框架作者的论文。
简单讲解disruptor并附上demo
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)