`

Disruptor PK BlockingQueue

阅读更多

 

package com.disruptor.test3;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.junit.Test;

public class ArrayBlockingQueueTest {

    @Test
    public void test() throws InterruptedException {

        long cost = System.currentTimeMillis();

        final CountDownLatch l = new CountDownLatch(1);
        final BlockingQueue<Long> bq = new ArrayBlockingQueue<Long>(4096);

        Runnable p = new Runnable() {
            public void run() {
                for (int i = 0; i < ConstantsUtil.MAX_LOOP; i++) {
                    try {
                        bq.put((long) i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        Runnable c = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        long i = bq.take();
                        //System.out.println(i);
                        if (i == ConstantsUtil.MAX_LOOP - 1) {
                            l.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        new Thread(c).start();

        new Thread(p).start();

        l.await();
        System.out.println("cost:" + (System.currentTimeMillis() - cost));
    }

}

 

 

package com.disruptor.test3;

public abstract class ConstantsUtil {
    static int MAX_LOOP = 1000000000;
}

 

 

package com.disruptor.test3;

import java.util.concurrent.CountDownLatch;

import org.junit.Test;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

public class DisruptorTest {

    private static final int BUFFER_SIZE = 4096;

    @Test
    public void test() throws InterruptedException {
        long cost = System.currentTimeMillis();
        CountDownLatch l = new CountDownLatch(1);
        //创建RingBuffer
        RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(
            ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());

        //创建序列栅栏
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        //创建消费者
        MyEventHandler handler = new MyEventHandler(l);

        //事件执行者
        BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(
            ringBuffer, sequenceBarrier, handler);

        //序列由栅栏统一计算
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        new Thread(batchEventProcessor).start();

        for (long i = 0; i < ConstantsUtil.MAX_LOOP; i++) {
            long next = ringBuffer.next();
            //通过序列从环中,获取消息,没有则由ValueEvent.EVENT_FACTORY工厂创建空事件
            ValueEvent event = ringBuffer.get(next);
            //填充数据
            event.setValue(i);
            //将环中的数据发布出去,发布之后,实际也是直接通过事件消费
            ringBuffer.publish(next);
        }
        l.await();
        System.out.println("cost:" + (System.currentTimeMillis() - cost));
    }
}

 

 

package com.disruptor.test3;

import java.util.concurrent.CountDownLatch;

import com.lmax.disruptor.EventHandler;

public class MyEventHandler implements EventHandler<ValueEvent> {
    private CountDownLatch l;

    public long            count = 0;

    public MyEventHandler() {
    };

    public MyEventHandler(CountDownLatch l) {
        this.l = l;
    };

    public void onEvent(ValueEvent event, long arg1, boolean arg2) throws Exception {
        long i = event.getValue();
        //System.out.println(i);
        if (i == ConstantsUtil.MAX_LOOP - 1) {
            l.countDown();
        }
    }
}

 

 

package com.disruptor.test3;

import com.lmax.disruptor.EventFactory;

public final class ValueEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(final long value) {
        this.value = value;
    }

    public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
                                                                   public ValueEvent newInstance() {
                                                                       return new ValueEvent();
                                                                   }
                                                               };
}

 

 

1亿:
Disrupter :3910 ms       
BlockQueue: 246211 ms

4s VS 25s   6倍

10亿
Disrupter :36767 ms       
BlockQueue: 231872 ms

37s vs 232s 6倍


 

 

 

分享到:
评论

相关推荐

    disruptor:Disruptor BlockingQueue

    Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...

    Disruptor 入门 - v1.0

    可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者...

    Disruptor3.x Disruptor使用方式

    Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...

    LMAX-Disruptor框架jar包

    Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    Disruptor 极速体验.docx

    可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往队列里发布(publish)一项事件(或称之为“消息”也可以)时,消费者...

    disruptor-3.3.7-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...

    disruptor案例加简单说明

    简单讲解disruptor并附上demo

    Disruptor资料合集

    Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...

    disruptor-3.2.0.jar

    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载

    Disruptor示例

    业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最...

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor框架案例.rar

    Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...

    disruptor 多个消费者

    disruptor 多个消费者 但是只消费一次 有时候会有这样的需求

    Disruptor demo

    Disruptor简单使用。完成多线程间并行、等待、先后执行等功能。

    disruptor.jar 3.4 2018最新版本

    disruptor.jar 2018最新版本(包含disruptor-3.4.1.jar、disruptor-3.4.1-sources.jar、disruptor-3.4.1-javadoc.jar)

    spring集成disruptor

    网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。

Global site tag (gtag.js) - Google Analytics