`

并发框架之Disruptor

阅读更多
Disruptor它是一个开源的并发框架
官方地址|:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

https://github.com/LMAX-Exchange/disruptor


public class LongEvent {
	
	private Long value;

	public Long getValue() {
		return value;
	}

	public void setValue(Long value) {
		this.value = value;
	}
}


import com.lmax.disruptor.EventFactory;
/**
 *事件工厂
 */
public class LongEventFactory implements EventFactory<LongEvent>{
	@Override
	public LongEvent newInstance() {
		 return new LongEvent();
	}

}

import com.lmax.disruptor.EventHandler;
/**
 * 消费者/事件处理器
 */
public class LongEventHandler implements EventHandler{
	@Override
	public void onEvent(Object event, long sequence, boolean endOfBatch)
			throws Exception {
		System.out.println("Event:"+event+"   sequence:"+sequence+"   endOfBatch:"+endOfBatch);
	}
}



import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer {
	private final RingBuffer<LongEvent> ringBuffer;
	public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	public void onData(ByteBuffer bb) {
		// 下一个序列
		long sequence = ringBuffer.next();
		try {
			//获取Disruptor中的条目用于序列
			LongEvent event = ringBuffer.get(sequence); 
			//填充数据
			event.setValue(bb.getLong(0));
		} finally {
			//发布序列
			ringBuffer.publish(sequence);
		}
	}
}


import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

public class LongEventProducerWithTranslator {
	private final RingBuffer<LongEvent> ringBuffer;

	public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
		public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
			event.setValue(bb.getLong(0));;
		}
	};

	public void onData(ByteBuffer bb) {
		ringBuffer.publishEvent(TRANSLATOR, bb);
	}

}


import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

public class LongEventMain {
	public static void main(String[] args) throws Exception {
		Executor executor = Executors.newCachedThreadPool();
		// bufferSize必须是2的N次方
		int bufferSize = 1024;
		// 构造disruptor
		Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,bufferSize, executor);
		// 链接处理器
		disruptor.handleEventsWith((event, sequence, endOfBatch) ->
		System.out.println("Event: " + event+"  sequence:"+sequence+"  endOfBatch:"+endOfBatch));
		// 启动运行Disruptor
		disruptor.start();
		//从disruptor中获取环形缓冲区
		RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
		ByteBuffer bb = ByteBuffer.allocate(8);
		for (long l = 0; true; l++) {
			bb.putLong(0, l);
			ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)),bb);
			Thread.sleep(1000);
		}
	}

}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics