LMAX 开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。
longEvent事件数据
public class LongEvent { private long value; public void set(long value) { this.value = value; } public long get(){ return this.value; } }
LongEventFactory事件工厂
import com.lmax.disruptor.EventFactory; /** * 事件生产工厂 * @author wanghao * */ public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
LongEventProducer事件生产者
import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; /** * 生产者,生产longEvent事件 * @author harry * */ public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void product(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。
消息者事件处理器
为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。
import com.lmax.disruptor.EventHandler; /** * 消息者事件处理器,打印输出到控制台 * @author harry * */ public class LongEventHandler implements EventHandler<LongEvent>{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch); } }
LongEventProducerWithTranslator
import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * post-3.0 the preferred approach for publishing messages is * via the Event Publisher/Event Translator portion of the API. E.g. * @author harry * */ public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void product(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler 处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。
product方法,将生产者生产的消息放入ringbuffer中。
LongEventMain
消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等
ByteBuffer 类字节buffer,用于包装消息。
ProducerType.SINGLE为单线程 ,可以提高性能。
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { // 执行器,用于构造消费者线程 Executor executor = Executors.newCachedThreadPool(); // 指定事件工厂 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字节大小, must be power of 2. int bufferSize = 1024; //单线程模式,获取额外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize,executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith(new LongEventHandler()); //启动disruptor线程 disruptor.start(); // 获取 ring buffer环,用于接取生产者生产的事件 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //为 ring buffer指定事件生产者 //LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存 for (long l = 0; true; l++) { bb.putLong(0, l); producer.product(bb);//生产者生产数据 Thread.sleep(1000); } } }
实验结果:
consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true
consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true
consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true
consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true
consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true
consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true
consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true
Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。
相关推荐
业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 ...
NULL 博文链接:https://yanbingwei.iteye.com/blog/1985778
网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。
Disruptor C++版,本人已在windows下成功使用,参照例子使用即可。
Disruptor 简单示例,涵盖多种模型。适合Disruptor 入门学习。
disruptor 的jar包和api 自带了一个简单的例子。带注释 不是官方的例子。 不过是转别人的。 看了之后很容易明白。
NULL 博文链接:https://bijian1013.iteye.com/blog/2435671
disruptor3.3.2的jar包 和一个简单实用disruptor的例子
最快的java并发框架disruptor已经将编译好的jar包集成在里面过阵子写个完整版本的示例再上传,本人其他资源也不错请大家多多下载
LMAX Disruptor环形缓冲区示例 使用LMAX破坏者框架的示例: 受此博客文章的启发: 这是“钻石配置”的极其简化的版本,如以下内容所述: 在此实现中,日志记录和复制步骤同时发生,并且都必须成功才能执行发布...
本篇文章主要介绍了spring与disruptor集成的简单示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
破坏者启动者干扰器的使用示例
212,598 ops/secRun 1, Disruptor=51,921,079 ops/secRun 2, Disruptor=50,864,699 ops/secRun 3, Disruptor=59,630,292 ops/secRun 4, Disruptor=62,227,753 ops/secRun 5, Disruptor=59,988,002 ops/secRun 6, ...
Dispatcher 是一个以 LMAX Disruptor 为核心的事件处理器。 使用批量写入,可以轻松获得超过每秒 100 万个事件的速率,在 MacBook Pro 上的上限仅为每秒 2000 万个事件(每个事件约 50ns)。 发出单个事件的性能仍然...
java网络技术示例代码,用于初学者学习 多线程,循环buffer,参考博客上的代码,实现内存的读写功能
单线程读多线程写方案,该方案使用了两种不同的线程池实现:ThreadPoolExcutor和ForkJoinPool,分别对应NORMAL和FORKJOIN两种执行模式; 生产者-消费者模式下的多线程读/写方案,对应PRODUCERCONSUMER执行模式。 ...
Spring Boot管理的LMAX Disruptor示例项目 该项目使用来创建interruptor spring bean和执行消息事务。 该项目使用spring boot来将应用程序加载为JMS侦听器。 要运行此项目,必须与IBM Websphere MQ集成。 当然,您...
介绍flow-disruptor是确定性的每流网络状况模拟器。 为了稍微简化一下描述,每个流意味着针对每个TCP连接(实际上是连接的每个流)分别维护和模拟... 示例:以下配置设置了到10.0.1.56:45004连接的流量配置文件,其中
最近由于业务需要监控一些数据,虽然市面上有很多优秀的爬虫框架,但是我仍然打算从头开始实现一套完整的爬虫框架。 在技术选型上,我没有选择Spring来搭建项目,而是选择了更轻量级的Vert.x。一方面感觉Spring太重...
Chronique队列代码示例 这是在Devoxx France 2016上有关Java库的演讲(“工具在... 原始代码演示了如何使用Disruptor。 作者: 非常感谢: ( 的初始申请 ( 为我们的《编年史》(Chronicle Queue)版本做出的贡献。