`
wh0426
  • 浏览: 55081 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
Group-logo
架构师的知识与实践
浏览量:55072
社区版块
存档分类
最新评论

disruptor使用示例

阅读更多


LMAX
开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。

longEvent事件数据

 

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    
    public long get(){
        return this.value;
    }
}

 

 

LongEventFactory事件工厂

 

import com.lmax.disruptor.EventFactory;
/**
 * 事件生产工厂
 * @author wanghao
 *
 */
public class LongEventFactory implements EventFactory<LongEvent>
{

	@Override
	public LongEvent newInstance() {
		return new LongEvent();
	}

}

 

 

LongEventProducer事件生产者

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

/**
 * 生产者,生产longEvent事件
 * @author harry
 *
 */
public class LongEventProducer {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void product(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。

消息者事件处理器

为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。

 

import com.lmax.disruptor.EventHandler;
/**
 * 消息者事件处理器,打印输出到控制台
 * @author harry
 *
 */
public class LongEventHandler  implements EventHandler<LongEvent>{
	  public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
	    {
	        System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch);
	    }
}

 

LongEventProducerWithTranslator

 

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * post-3.0 the preferred approach for publishing messages is 
 * via the Event Publisher/Event Translator portion of the API. E.g.
 * @author harry
 *
 */
public class LongEventProducerWithTranslator {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void product(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}


translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler 处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。

 

product方法,将生产者生产的消息放入ringbuffer中。

LongEventMain

 

消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等

ByteBuffer 类字节buffer,用于包装消息。

ProducerType.SINGLE为单线程 ,可以提高性能。

 

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain {
	@SuppressWarnings("unchecked")
	public static void main(String[] args) throws Exception
	    {
	        // 执行器,用于构造消费者线程
	        Executor executor = Executors.newCachedThreadPool();

	        // 指定事件工厂
	        LongEventFactory factory = new LongEventFactory();

	        // 指定 ring buffer字节大小, must be power of 2.
	        int bufferSize = 1024;

	        //单线程模式,获取额外的性能
	        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, 
                    bufferSize,executor,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
	        //设置事件业务处理器---消费者
	        disruptor.handleEventsWith(new LongEventHandler());
	        //启动disruptor线程
	        disruptor.start();

	        // 获取 ring buffer环,用于接取生产者生产的事件
	        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
	        //为 ring buffer指定事件生产者
	        //LongEventProducer producer = new LongEventProducer(ringBuffer);
	        LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer);
	        ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存
	        for (long l = 0; true; l++)
	        {
	            bb.putLong(0, l);
	            producer.product(bb);//生产者生产数据
	            Thread.sleep(1000);
	        }
	        
	    }
}

 

 

实验结果:

consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true
consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true
consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true
consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true
consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true
consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true
consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true

 

Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。

 

分享到:
评论
1 楼 ClyenLiang 2016-08-24  

相关推荐

    Disruptor示例

    业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 ...

    Disruptor3.2官方例子测试

    NULL 博文链接:https://yanbingwei.iteye.com/blog/1985778

    spring集成disruptor

    网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。

    Disruptor C++版(仅支持单生产者)

    Disruptor C++版,本人已在windows下成功使用,参照例子使用即可。

    share-disruptor.zip

    Disruptor 简单示例,涵盖多种模型。适合Disruptor 入门学习。

    disruptor jar包+Demo+Api

    disruptor 的jar包和api 自带了一个简单的例子。带注释 不是官方的例子。 不过是转别人的。 看了之后很容易明白。

    Disruptor应用实例

    NULL 博文链接:https://bijian1013.iteye.com/blog/2435671

    disruptror的jar包和例子

    disruptor3.3.2的jar包 和一个简单实用disruptor的例子

    disruptor-3.2.1源码带jar包20140321

    最快的java并发框架disruptor已经将编译好的jar包集成在里面过阵子写个完整版本的示例再上传,本人其他资源也不错请大家多多下载

    disruptor-example:使用LMAX Disruptor环形缓冲区的示例

    LMAX Disruptor环形缓冲区示例 使用LMAX破坏者框架的示例: 受此博客文章的启发: 这是“钻石配置”的极其简化的版本,如以下内容所述: 在此实现中,日志记录和复制步骤同时发生,并且都必须成功才能执行发布...

    spring与disruptor集成的简单示例

    本篇文章主要介绍了spring与disruptor集成的简单示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    disruptor-starter:干扰器的使用示例

    破坏者启动者干扰器的使用示例

    disruptor_cpp:单个生产者的 LMAX 破坏者的 CPP 实施

    212,598 ops/secRun 1, Disruptor=51,921,079 ops/secRun 2, Disruptor=50,864,699 ops/secRun 3, Disruptor=59,630,292 ops/secRun 4, Disruptor=62,227,753 ops/secRun 5, Disruptor=59,988,002 ops/secRun 6, ...

    dispatcher:带有嵌入式 LMAX Disruptor 的快速事件路由器

    Dispatcher 是一个以 LMAX Disruptor 为核心的事件处理器。 使用批量写入,可以轻松获得超过每秒 100 万个事件的速率,在 MacBook Pro 上的上限仅为每秒 2000 万个事件(每个事件约 50ns)。 发出单个事件的性能仍然...

    Disruptor学习.7z

    java网络技术示例代码,用于初学者学习 多线程,循环buffer,参考博客上的代码,实现内存的读写功能

    javaforkjoin源码-filespilt-demo:初始化项目

    单线程读多线程写方案,该方案使用了两种不同的线程池实现:ThreadPoolExcutor和ForkJoinPool,分别对应NORMAL和FORKJOIN两种执行模式; 生产者-消费者模式下的多线程读/写方案,对应PRODUCERCONSUMER执行模式。 ...

    打扰者计费示例:使用打扰者弹簧管理器框架的示例LMAX打扰者弹簧启动项目

    Spring Boot管理的LMAX Disruptor示例项目 该项目使用来创建interruptor spring bean和执行消息事务。 该项目使用spring boot来将应用程序加载为JMS侦听器。 要运行此项目,必须与IBM Websphere MQ集成。 当然,您...

    flow-disruptor:确定性的按流网络条件故障模拟器

    介绍flow-disruptor是确定性的每流网络状况模拟器。 为了稍微简化一下描述,每个流意味着针对每个TCP连接(实际上是连接的每个流)分别维护和模拟... 示例:以下配置设置了到10.0.1.56:45004连接的流量配置文件,其中

    基于Vert.x和RxJava 2构建通用的爬虫框架的示例

    最近由于业务需要监控一些数据,虽然市面上有很多优秀的爬虫框架,但是我仍然打算从头开始实现一套完整的爬虫框架。 在技术选型上,我没有选择Spring来搭建项目,而是选择了更轻量级的Vert.x。一方面感觉Spring太重...

    chronicle-queue-devoxx-fr-2016:编年史队列代码示例

    Chronique队列代码示例 这是在Devoxx France 2016上有关Java库的演讲(“工具在... 原始代码演示了如何使用Disruptor。 作者: 非常感谢: ( 的初始申请 ( 为我们的《编年史》(Chronicle Queue)版本做出的贡献。

Global site tag (gtag.js) - Google Analytics