`
luoshi0801
  • 浏览: 145872 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Disruptor封装

 
阅读更多

在数据交换场景,disruptor受到越来越多的欢迎。下面是将原生disruptor封装成queue模型的代码,供参考

 

抽象类Disruptor,提供pull、take等接口

 

 

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;


public abstract class DisruptorQueue {
    
    public static void setUseSleep(boolean useSleep) {
        DisruptorQueueImpl.setUseSleep(useSleep);
    }

    public static DisruptorQueue mkInstance(String queueName,
            ProducerType producerType, int bufferSize, WaitStrategy wait) {
        return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait);
    }

    public abstract String getName();

    public abstract void haltWithInterrupt();

    public abstract Object poll();

    public abstract Object take();

    public abstract void consumeBatch(EventHandler<Object> handler);
    
    public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);

    public abstract void publish(Object obj);

    public abstract void publish(Object obj, boolean block)
            throws InsufficientCapacityException;

    public abstract void consumerStarted();

    public abstract void clear();

    public abstract long population();

    public abstract long capacity();

    public abstract long writePos();

    public abstract long readPos();

    public abstract float pctFull();
    
}

 

 

具体实现

 

import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;


public class DisruptorQueueImpl extends DisruptorQueue {
    
    private static final Logger logger = LoggerFactory.getLogger(DisruptorQueueImpl.class);
    
    static boolean useSleep = true;
    public static void setUseSleep(boolean useSleep) {
        AbstractSequencerExt.setWaitSleep(useSleep);
    }
    
    private static final Object FLUSH_CACHE = new Object();
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";

    private final String _queueName;
    private final RingBuffer<MutableObject> _buffer;
    private final Sequence _consumer;
    private final SequenceBarrier _barrier;

    volatile boolean consumerStartedFlag = false;

    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
    private final Lock readLock = cacheLock.readLock();
    private final Lock writeLock = cacheLock.writeLock();

    public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
        this._queueName = PREFIX + queueName;
        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
        _consumer = new Sequence();
        _barrier = _buffer.newBarrier();
        _buffer.addGatingSequences(_consumer);
        if (producerType == ProducerType.SINGLE) {
            consumerStartedFlag = true;
        } else {
            if (bufferSize < 2) {
                throw new RuntimeException("QueueSize must >= 2");
            }
            try {
                publishDirect(FLUSH_CACHE, true);
            } catch (InsufficientCapacityException e) {
                throw new RuntimeException("This code should be unreachable!", e);
            }
        }
    }

    public String getName() {
        return _queueName;
    }

    public void consumeBatch(EventHandler<Object> handler) {
        consumeBatchToCursor(_barrier.getCursor(), handler);
    }

    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }

    public Object poll() {
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        if (nextSequence <= _barrier.getCursor()) {
            MutableObject mo = _buffer.get(nextSequence);
            _consumer.set(nextSequence);
            Object ret = mo.o;
            mo.setObject(null);
            return ret;
        }
        return null;
    }

    public Object take() {
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        try {
            _barrier.waitFor(nextSequence);
        } catch (AlertException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException " + e.getCause());
            return null;
        } catch (TimeoutException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        MutableObject mo = _buffer.get(nextSequence);
        _consumer.set(nextSequence);
        Object ret = mo.o;
        mo.setObject(null);
        return ret;
    }

    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        try {
            final long nextSequence = _consumer.get() + 1;
            final long availableSequence = _barrier.waitFor(nextSequence);
            if (availableSequence >= nextSequence) {
                consumeBatchToCursor(availableSequence, handler);
            }
        } catch (AlertException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException " + e.getCause());
            return;
        }catch (TimeoutException e) {
            logger.error(e.getMessage(), e);
            return ;
        }
    }

    public void consumeBatchToCursor(long cursor, EventHandler<Object> handler){
        for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
            try {
                MutableObject mo = _buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                if (o == FLUSH_CACHE) {
                    Object c = null;
                    while (true) {
                        c = _cache.poll();
                        if (c == null)
                            break;
                        else
                            handler.onEvent(c, curr, true);
                    }
                } else if (o == INTERRUPT) {
                    throw new InterruptedException(
                            "Disruptor processing interrupted");
                } else {
                    handler.onEvent(o, curr, curr == cursor);
                }
            } catch (InterruptedException e) {
                logger.error(e.getMessage());
                return;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RuntimeException(e);
            }
        }
        _consumer.set(cursor);
    }

    public void publish(Object obj) {
        try {
            publish(obj, true);
        } catch (InsufficientCapacityException ex) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }

    public void publish(Object obj, boolean block)
            throws InsufficientCapacityException {

        boolean publishNow = consumerStartedFlag;

        if (!publishNow) {
            readLock.lock();
            try {
                publishNow = consumerStartedFlag;
                if (!publishNow) {
                    _cache.add(obj);
                }
            } finally {
                readLock.unlock();
            }
        }

        if (publishNow) {
            publishDirect(obj, block);
        }
    }

    protected void publishDirect(Object obj, boolean block)
            throws InsufficientCapacityException {
        final long id;
        if (block) {
            id = _buffer.next();
        } else {
            id = _buffer.tryNext(1);
        }
        final MutableObject m = _buffer.get(id);
        m.setObject(obj);
        _buffer.publish(id);
    }

    public void consumerStarted() {

        writeLock.lock();
        consumerStartedFlag = true;
        
        writeLock.unlock();
    }

    public void clear() {
        while (population() != 0L) {
            poll();
        }
    }

    public long population() {
        return (writePos() - readPos());
    }

    public long capacity() {
        return _buffer.getBufferSize();
    }

    public long writePos() {
        return _buffer.getCursor();
    }

    public long readPos() {
        return _consumer.get();
    }

    public float pctFull() {
        return (1.0F * population() / capacity());
    }

    public Object getState() {
        long rp = readPos();
        long wp = writePos();
        state.put("capacity", capacity());
        state.put("population", wp - rp);
        state.put("write_pos", wp);
        state.put("read_pos", rp);
        return state;
    }

    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        @Override
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }
}

 

  

public class MutableObject {

    Object o = null;

    public MutableObject() {

    }

    public MutableObject(Object o) {
        this.o = o;
    }

    public void setObject(Object o) {
        this.o = o;
    }

    public Object getObject() {
        return o;
    }

}

 

 

代码依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.2.1</version>
</dependency>

 

分享到:
评论

相关推荐

    async-framework:基于Disruptor的异步并行框架

    async-framework提供了流程和队列的概念,流程 Flow 代表步骤,队列 Queue 代表处理节点,队列由Disruptor封装而成,每个事件都有Flow发起,并且在每个Queue存在一个状态,每个队列处理特定的事件,简单的来说 Flow ...

    Disruptor3.x Disruptor使用方式

    Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    Disruptor 入门 - v1.0

    从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。 可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是...

    disruptor案例加简单说明

    简单讲解disruptor并附上demo

    Disruptor资料合集

    Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...

    disruptor-3.3.7-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...

    disruptor-3.2.0.jar

    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载

    disruptor框架案例.rar

    Disruptor它是一个开源的并发框架能够在无锁的情况下实现网络的Queue并发操作。同时,Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者...

    LMAX-Disruptor框架jar包

    Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。

    Disruptor示例

    业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最...

    Disruptor demo

    Disruptor简单使用。完成多线程间并行、等待、先后执行等功能。

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor 多个消费者

    disruptor 多个消费者 但是只消费一次 有时候会有这样的需求

    disruptor.jar 3.4 2018最新版本

    disruptor.jar 2018最新版本(包含disruptor-3.4.1.jar、disruptor-3.4.1-sources.jar、disruptor-3.4.1-javadoc.jar)

    spring集成disruptor

    网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。

    spring-boot-starter-disruptor.zip

    springboot集成disruptor,自动配置,便于快速开发,隐藏disruptor复杂的使用,线程池的频繁创建,优化开发。

Global site tag (gtag.js) - Google Analytics