ConsumerRepository是消费者的入口
ConsumerInfo中保存了一个消费者的所有的信息.
ConsumerRepository的consumerInfos字段是个ConsumerInfo数组,保存了全部消费者的信息.
ConsumerInfo中有个SequenceBarrier字段,这个是用来获取生产者的位置信息的.
ConsumerInfo中的EventProcessor是真正的消费者.
有人会说我自己写的是EventHandler作为消费者啊,其实EventProcessor是做了封装
EventProcessor中有个字段就是eventHandler.
如果对Disruptor只传入EventHandler的话:
Disruptor类中:
public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
Disruptor类中:
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<T>[] eventHandlers)
{
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
可以看到默认的EventProcessor是BatchEventProcessor.
BatchEventProcessor类:
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<T> eventHandler;
dataProvider其实就是ringBuffer,用来获取ringBuffer那个位置的Event
sequenceBarrier用来获取生产者的位置信息
eventHandler是具体处理Event的策略,这个是在Disruptor的handleEventsWith方法中传进来的.
消费者的实现.
BatchEventProcessor类是实现了Runnable接口的.所以每一个消费者都是可以作为一个线程来跑的.
这也是 Disruptor的构造类传入一个线程池的原因,线程池就是用来跑消费者的.
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (nextSequence > availableSequence)
{
Thread.yield();
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
run方法看起来很长,其实主要就是这个:
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
这个是获取生产者的位置的
if (nextSequence > availableSequence) 表示消费者超过生产者,这个是不允许的.
这里的操作是Thread.yield();不同的EventProcessor这里采取的策略也会不一样.
while (true) 可以看到在不成功的状态下会不断去检查生产者的信息.
waitFor的实现是waitStrategy.waitFor来实现的,waitStrategy有很多中不同的策略,默认是BlockingWaitStrategy.
BlockingWaitStrategy的实现中用到了lock.lock();意思就是同时只允许一个个消费者排队去抢,下一个消费者要等待上一个消费者处理完一个之后才能抢.
在SequenceBarrie的waitFor的最后一步,会通过getHighestPublishedSequence来检查生产者的位置信息的标志是否正常.这个是和生产者的publish方法联系起来的.
分享到:
相关推荐
disruptor 多个消费者 但是只消费一次 有时候会有这样的需求
最快的java并发框架disruptor已经将编译好的jar包集成在里面过阵子写个完整版本的示例再上传,本人其他资源也不错请大家多多下载
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
disruptor 代码分析,分析主要api 使用方法
SourceAnalysis_Disruptor Disruptor原始码解析
那么它的应用场景自然就是“生产者-消费者”模型的应用场合了。 可以拿 JDK 的 BlockingQueue 做一个简单对比,以便更好地认识 Disruptor 是什么。 我们知道 BlockingQueue 是一个 FIFO 队列,生产者(Producer)往...
LMAX Disruptor 最新版本 源码+API+驱动包
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
springboot集成disruptor,自动配置,便于快速开发,隐藏disruptor复杂的使用,线程池的频繁创建,优化开发。
Disruptor C++版,本人已在windows下成功使用,参照例子使用即可。
卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处->一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...
简单讲解disruptor并附上demo
Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。
Java并发框架Disruptor,里面采取环形缓存结构,速度更快,适用于生产者消费者模式