Disruptor体验和对⽐
Disruptor体验和对⽐
已经不记得最早接触到Disruptor 是什么时候了,只记得发现它的时候它是以具有闪电般的速度被介绍的。于是在脑⼦
⾥,Disruptor 和“闪电”⼀词关联了起来,然⽽却⼀直没有时间去探究⼀下。最近正在进⾏⼀项对性能有很⾼要求的产品项⽬的研究,⾃然想起了闪电般的Disruptor ,这必有它的⽤武之地,于是进⾏了⼀番探查,将成果和体会记录在案。⼀、什么是Disruptor 从功能上来看,Disruptor 是实现了“队列”的功能,⽽且是⼀个有界队列。那么它的应⽤场景⾃然就是“⽣产者-消费者”模型的应⽤场合了。可以拿JDK 的BlockingQueue 做⼀个简单对⽐,以便更好地认识Disruptor 是什么。我们知道BlockingQueue 是⼀个FIFO 队列,⽣产者(Producer)往队列⾥发布(publish)⼀项事件(或称之为“消息”也可以)时,消费者(Consumer)能获得通知;如果没有事件时,消费者被堵塞,直到⽣产者发布了新的事件。这些都是Disruptor 能做到的,与之不同的是,Disruptor 能做更多:同⼀个“事件”可以有多个消费者,消费者之间既可以并⾏处理,也可以相互依赖形成处理的先后次序(形成⼀个依赖图);预分配⽤于存储事件内容的内存空间;针对极⾼的性能⽬标⽽实现的极度优化和⽆锁的设计;以上的描述虽然简单地指出了Disruptor 是什么,但
对于它“能做什么”还不是那么直截了当。⼀般性地来说,当你需要在两个独⽴的处理过程(两个线程)之间交换数据时,就可以使⽤Disruptor 。当然使⽤队列(如上⾯提到的BlockingQueue)也可以,只不过Disruptor 做得更好。拿队列来作⽐较的做法弱化了对Disruptor 有多强⼤的认识,如果想要对此有更多的了解,可以仔细看看Disruptor 在其东家LMAX 交易平台(也是实现者) 是如何作为核⼼架构来
使⽤的,这⽅⾯就不做详述了,问度娘或⾕哥都能找到。⼆、Disruptor 的核⼼概念先从了解Disruptor 的核⼼概念开始,来了解它是如何运作的。下⾯介绍的概念模型,既是领域对象,也是映射到代码实现上的核⼼对象。Ring Buffer如其名,环形的缓冲区。曾经RingBuffer 是Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过Disruptor 进⾏交换的数据(事件)进⾏存储和更新。在⼀些更⾼级的应⽤场景中,Ring Buffer 可以由⽤户的⾃定义实现来完全替代。Sequence Disruptor通过顺序递增的序号来编号管理通过其进⾏交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。⼀个Sequence ⽤于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理
增长幅度
进度。虽然⼀个AtomicLong 也可以⽤于标识进度,但定义Sequence 来负责该问题还有另⼀个⽬的,那就是防⽌不同的Sequence 之间的CPU缓存伪共享(Fla Sharing)问题。(注:
这是Disruptor 实现⾼性能的关键点之⼀,⽹上关于伪共享问题的介绍已经汗⽜充栋,在此不再赘述)。
Sequencer Sequencer 是Disruptor 的真正核⼼。此接⼝有两个实现类SingleProducerSequencer、MultiProducerSequencer ,它们定义在⽣产者和消费者之间快速、正确地传递数据的并发算法。Sequence Barrier⽤于保持对RingBuffer的main published Sequence 和Consumer依赖的其它Consumer的Sequence 的引⽤。Sequence Barrier 还定义了决定Consumer 是否还有可处理的事件的逻辑。Wait Strategy定义Consumer 如何进⾏等待下⼀个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不⼀样的性能表现)Event在Disruptor 的语义中,⽣产者和消费者之间进⾏交换的数据被称为事件(Event)。它不是⼀个被Disruptor 定义的特定类型,⽽是由Disruptor 的使⽤者定义并指定。EventProcessorEventProcessor 持有特定消费者(Consumer)的Sequence,并提供⽤于调⽤事件处理实现的事件循环(Event Loop)。EventHandlerDisruptor 定义的事件处理接⼝,由⽤户实现,⽤于处理事件,是Consumer 的真正实现。Producer即⽣产者,只是泛指调⽤Disruptor 发布事件的⽤户代码,Disruptor 没有定义特定接⼝或类型。三、如何使⽤Disruptor Disruptor 的API ⼗分简单,主要有以下⼏个步骤:定义事件事件(Event)就是通过Disruptor 进⾏交换
的数据类型。public class LongEvent
{
private long value; public void t(long value)
{
this.value = value;
}
} 定义事件⼯⼚事件⼯⼚(Event Factory)定义了如何实例化前⾯第1步中定义的事件(Event),需要实现接⼝
com.lmax.disruptor.EventFactory<T>。Disruptor 通过EventFactory 在RingBuffer 中预创建Event 的实例。⼀个Event 实例实际上被⽤作⼀个“数据槽”,发布者发布前,先从RingBuffer 获得⼀个Event 的实例,然后往Event 实例中填充数据,之后再发布到RingBuffer 中,之后由Consumer 获得该Event 实例并从中读取数据。import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance()
{
return new LongEvent();
}
} 定义事件处理的具体实现通过实现接⼝
com.lmax.disruptor.EventHandler<T> 定义事件处理的
具体实现。import com.lmax.disruptor.EventHandler;public class LongEventHandler implements
EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long quence, boolean endOfBatch)
{
头皮痒头皮屑多System.out.println("Event: " + event);
}
} 定义⽤于事件处理的线程池Disruptor 通过
urrent.ExecutorService 提供的线程来触发Consumer 的事件处理。例如:ExecutorService executor = wCachedThreadPool(); 指定等待策略Disruptor 定义了com.lmax.disruptor.WaitStrategy 接⼝⽤于抽象Consumer 如何等待新事件,这是策略模式的应⽤。Disruptor 提供了多个WaitStrategy 的实现,每种策略都具有不同性能和优缺点,根据实际运⾏环境的CPU 的硬件特点选择恰当的策略,并配合特定的JVM 的配置参数,能够实现不同的性能提升。例
如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最⼩并且在各种不同部署环
境中能提供更加⼀致的性能表现;SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy 差不多,对CPU 的消耗也类似,但其对⽣产者线程的影响最⼩,适合⽤于异步⽇志类似的场景;YieldingWaitStrategy 的性能是最好的,适合⽤于低延迟的系统。在要求极⾼性能且事件处理线数⼩于CPU 逻辑核⼼数的场景中,推荐使⽤此策略;例如,CPU开启超线程的特性。WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); 启动DisruptorEventFactory<LongEvent> eventFactory = new LongEventFactory(); ExecutorService executor =
int ringBufferSize = 1024 * 1024; // RingBuffer ⼤⼩,必须是2 的N 次⽅;
Disruptor<LongEvent> disruptor = new
Disruptor<LongEvent>(eventFactory,
ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);
disruptor.start(); 发布事件Disruptor 的事件发布过程是⼀个两阶段提交的过程:第⼀步:先从RingBuffer 获取下⼀个可以写⼊的事件的序号;第⼆步:获取对应的事件对象,将数据写⼊事件对象;第三部:将事件提交到RingBuffer;事件只有在提交之后才会通知EventProcessor
进⾏处理;// 发布事件;
RingBuffer<LongEvent> ringBuffer =
long quence = ();//请求下⼀个事件序号;
try {
LongEvent event = (quence);//获取该序号对应的事件对象;
long data = getEventData();//获取要通过事件传递的业务数据;
event.t(data);
} finally{
ringBuffer.publish(quence);//发布事件;
} 注意,最后的ringBuffer.publish ⽅法必须包含在finally 中以确保必须得到调⽤;如果某个请求的quence 未被提交,将会堵塞后续的发布操作或者其它的producer。Disruptor 还提供另外⼀种形式的调⽤来简化以上操作,并确保publish 总是得到调⽤。static class Translator implements EventTran
slatorOneArg<LongEvent, Long>{
@Override
韩语学习班public void translateTo(LongEvent event, long quence, Long data) {
event.t(data);
}
}
public static Translator TRANSLATOR = new Translator();
public static void publishEvent2(Disruptor<LongEvent> disruptor) {
// 发布事件;
RingBuffer<LongEvent> ringBuffer =
long data = getEventData();//获取要通过事件传递的业务
数据;
ringBuffer.publishEvent(TRANSLATOR, data);
}此外,Disruptor 要求RingBuffer.publish 必须得到调⽤的潜台词就是,如果发⽣异常也⼀样要调⽤publish ,那么,很显然这个时候需要调⽤者在事件处理的实现上来判断事
件携带的数据是否是正确的或者完整的,这是实现者应该要注意的事情。关闭Disruptordisruptor.shutdown();//关闭disruptor,⽅法会堵塞,直⾄所有的事件都得到处理;executor.shutdown();//关闭disruptor 使⽤的线程池;如果需要的话,必须⼿动关闭,disruptor 在shutdown 时不会⾃动关闭;四、性能对⽐测试为了直观地感受Disruptor 有多快,设计了⼀个性能对⽐测试:Producer 发布100 万次事件,从发布第⼀个事件开始计时,捕捉Consumer 处理完所有事件的耗时。测试⽤例在Producer 如何将事件通知到Consumer 的实现⽅式上,设计了三种不同的实现:Producer 的事件发布和Consumer 的事件处理都在同⼀个线程,Producer 发布事件后⽴即触发Consumer 的事件处理;Producer 的事件发布和Consumer 的事件处理在不同的线程,通过ArrayBlockingQueue 传递给Consumer 进⾏处理;Producer 的事件发布和Consumer 的事件处理在不同的线程,通过Disruptor 传递给Consumer 进⾏处理;此次测试⽤例仅做了只有⼀个Producer 和
⼀个Consumer 的情
形,测试⽤例的代码如下:CounterTracer tracer = wInstance(DATA_COUNT);//计数跟踪到达指定的数值;
TestHandler handler = new TestHandler(tracer);//Consumer 的事件处理;
EventPublisher publisher = wInstance(new PublisherCreationArgs(DATA_COUNT, handler));//通过⼯⼚对象创建不同的Producer 的实现;
publisher.start();
tracer.start();
//发布事件;
for (int i = 0; i < DATA_COUNT; i++) {
publisher.publish(i);
}
//等待事件处理完成;
tracer.waitForReached();
publisher.stop();
//输出结果;
printResult(tracer); 事件处理的实现只是调⽤⼀个计数器(CounterTracer)加1,该计数器跟踪从开始到达到总的事件次数时所耗的时间。public class TestHandler {
private CounterTracer tracer;
public TestHandler(CounterTracer tracer) {
}
/**
* 如果返回true,则表⽰处理已经全部完成,不再处理后续事件;
*
* @param event
* @return
*/
public boolean process(TestEvent event){
unt();
}
} 针对单⼀Producer 和单⼀Consumer 的测试场景,
wwdc是什么CounterTracer 的实现如下:/**
* 测试结果跟踪器,计数器不是线程安全的,仅在单线程的consumer 测试中使⽤;
info是什么意思*
* @author haiq
*
*/
public class SimpleTracer implements CounterTracer
{ private long startTicks;
private long endTicks;
private long count = 0;
private boolean end = fal;
private final long expectedCount;
private CountDownLatch latch = new CountDownLatch(1); public SimpleTracer(long expectedCount) {
} @Override
法语考试public void start() {
neithernor
startTicks = System.currentTimeMillis();
end = fal;
} @Override
public long getMilliTimeSpan() {
return endTicks - startTicks;
} @Override
public boolean count() {
if (end) {
e2e
return end;
}
count++;
end = count >= expectedCount;
if (end) {
endTicks = System.currentTimeMillis();
}
return end;
} @Override
public void waitForReached() throws InterruptedException {
latch.await();
}
} 第⼀种Producer 的实现:直接触发事件处理;public class DirectingPublisher implements EventPublisher { private TestHandler handler;
private TestEvent event = new TestEvent();
public DirectingPublisher(TestHandler handler) {
this.handler = handler;
} @Override
public void publish(int data) throws Exception {
event.tValue(data);
handler.process(event);
freshfresh
} //省略其它代码;
} 第⼆种Producer 的实现:通过ArrayBlockinigQueue 实现;public class BlockingQueuePublisher implements EventPublisher {
private ArrayBlockingQueue<TestEvent> queue ;