package com.disruptor.test3; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import org.junit.Test; public class ArrayBlockingQueueTest { @Test public void test() throws InterruptedException { long cost = System.currentTimeMillis(); final CountDownLatch l = new CountDownLatch(1); final BlockingQueue<Long> bq = new ArrayBlockingQueue<Long>(4096); Runnable p = new Runnable() { public void run() { for (int i = 0; i < ConstantsUtil.MAX_LOOP; i++) { try { bq.put((long) i); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable c = new Runnable() { public void run() { while (true) { try { long i = bq.take(); //System.out.println(i); if (i == ConstantsUtil.MAX_LOOP - 1) { l.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } } }; new Thread(c).start(); new Thread(p).start(); l.await(); System.out.println("cost:" + (System.currentTimeMillis() - cost)); } }
package com.disruptor.test3; public abstract class ConstantsUtil { static int MAX_LOOP = 1000000000; }
package com.disruptor.test3; import java.util.concurrent.CountDownLatch; import org.junit.Test; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class DisruptorTest { private static final int BUFFER_SIZE = 4096; @Test public void test() throws InterruptedException { long cost = System.currentTimeMillis(); CountDownLatch l = new CountDownLatch(1); //创建RingBuffer RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer( ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy()); //创建序列栅栏 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //创建消费者 MyEventHandler handler = new MyEventHandler(l); //事件执行者 BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>( ringBuffer, sequenceBarrier, handler); //序列由栅栏统一计算 ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); new Thread(batchEventProcessor).start(); for (long i = 0; i < ConstantsUtil.MAX_LOOP; i++) { long next = ringBuffer.next(); //通过序列从环中,获取消息,没有则由ValueEvent.EVENT_FACTORY工厂创建空事件 ValueEvent event = ringBuffer.get(next); //填充数据 event.setValue(i); //将环中的数据发布出去,发布之后,实际也是直接通过事件消费 ringBuffer.publish(next); } l.await(); System.out.println("cost:" + (System.currentTimeMillis() - cost)); } }
package com.disruptor.test3; import java.util.concurrent.CountDownLatch; import com.lmax.disruptor.EventHandler; public class MyEventHandler implements EventHandler<ValueEvent> { private CountDownLatch l; public long count = 0; public MyEventHandler() { }; public MyEventHandler(CountDownLatch l) { this.l = l; }; public void onEvent(ValueEvent event, long arg1, boolean arg2) throws Exception { long i = event.getValue(); //System.out.println(i); if (i == ConstantsUtil.MAX_LOOP - 1) { l.countDown(); } } }
package com.disruptor.test3; import com.lmax.disruptor.EventFactory; public final class ValueEvent { private long value; public long getValue() { return value; } public void setValue(final long value) { this.value = value; } public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() { public ValueEvent newInstance() { return new ValueEvent(); } }; }
1亿: Disrupter :3910 ms BlockQueue: 246211 ms 4s VS 25s 6倍 10亿 Disrupter :36767 ms BlockQueue: 231872 ms 37s vs 232s 6倍
相关推荐
Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...
可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者...
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者...
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
简单讲解disruptor并附上demo
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...
disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最...
disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包
Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...
disruptor 多个消费者 但是只消费一次 有时候会有这样的需求
Disruptor简单使用。完成多线程间并行、等待、先后执行等功能。
disruptor.jar 2018最新版本(包含disruptor-3.4.1.jar、disruptor-3.4.1-sources.jar、disruptor-3.4.1-javadoc.jar)
网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。