reactor官⽅⽂档译⽂(1)Reactor简介
原⽂地址:projectreactor.io/docs/reference/
Reactor简介
Reactor是⼀个基础库,⽤在构建实时数据流应⽤、要求有容错和低延迟⾄毫秒、纳秒、⽪秒的服务。
— Preface TL;DR
什么是Reactor?
让我们⼤致了解⼀下Reactor。在你使⽤喜欢的搜索敲⼊⼀些关键词如Reactive、spring Reactive、Asynchronous java或者仅仅是"What the heck is Reactor?".简⽽⾔
之,Reactor是⼀个轻量级的JVM基础库,它可以帮助我们构建的服务和应⽤⾼效⽽异步的传递消息。
⾼效的含义是什么呢?
word加水印传递⼀个消息从A到B时GC产⽣的内存很⼩或者完全没有。
当消费者处理消息的速度低于⽣产者产⽣消息的速度时产⽣了溢出时,必须尽快处理。
尽可能的提供⽆锁的异步流。
据以往的经验来看,我们知道异步编程是困难的,特别是当⼀个平台提供了很多选项如JVM。
Reactor瞄准绝⼤部分场景中真正的⽆阻塞,并且提供了⼀组⽐原⽣Jdk的urrent库更⾼效的API。Reactor也提供了⼀个可选性(不建议使⽤):
阻塞等待:如()。
Unsafe数据获取:如ReentrantLock.lock()。
异常抛出:如try ..catch ...finally
同步阻塞:如 syschronized
Wrapper配置(GC压⼒):例如 new Wrapper<T>(event)
让我们先使⽤⼀个纯正的Executor⽅法:word怎么换行
private ExecutorService threadPool = wFixedThreadPool(8);
final List<T> batches = new ArrayList<T>();
Callable<T> t = new Callable<T>() { //1
public T run() {
synchronized(batches) { //2
T result = callDataba(msg); //3
batches.add(result);
return result;
}
}
};
Future<T> f = threadPool.submit(t); //4
T result = f.get() //5
1.分配回调⽅法---可能会导致gc压⼒。
2.Synchronization将强制对每个线程停⽌检查。
3. 存在消费者的消费能⼒低于⽣产者⽣产能⼒的隐患。
4. 使⽤线程池将task传递到⽬标线程--肯定通过FutureTask给gc造成压⼒。
5. 阻塞直⾄callDataba()响应。
从上述的简单⽰例中,容易看出扩展性会受到严重的影响。
不断分配的对象将导致gc停⽌⼯作,特别是耗时⽐较多的⼤任务时。当⼀个gc停⽌⼯作时将会从降低全局的性能。
队列默认情况下长度是不受限制的。任务会堆积到数据库中。
后台⽇志不是⼀个内存泄露的地⽅,但是副作⽤就⽐较烦⼈了:在gc暂停⼯作时需要扫描更多对象;损失数据重要bit的风险;等等。
经典链接Queue分配节点时产⽣的内存压⼒。
使⽤阻塞⽅式应答请求时发⽣恶性循环。
阻塞⽅式应答导致⽣产者效率慢下来。实际上,因为需要提交更多任务时等待响应,流程变成了基本的同步⽅式。
同数据存储的通信异常将以不友好的形式传递到⽣产者,通过线程边界来分离⼯作,这使容错的协商变的⽐较容易。
完全的、真正的⾮阻塞⽐较难以实现---特别是有⽐较时髦名称的分布式系统中如微服务架构。然⽽,Reactor却没有妥协,它试图利⽤可⽤的最佳模式来使开发者不必觉得像是在写⼀个数学论⽂⽽仅仅是⼀个微服务(nanrvice)。
没有什么⽐较光更快的了(除了流⾔蜚语和病毒猫视频),在某些⽅⾯,延迟是每个真实世界的系统必须关注的。为此:
反应器提供了⼀个框架,可以帮助你减轻恶⼼的延迟引起的副作⽤,在应⽤程序中使⽤最⼩的开销:使⽤⼀些灵活的结构,通过在启动时预先分配在运⾏时的分配数据结构来避免分配问题。
限制主消息传送结构,因⽽不会导致任务⽆限的累积。
利⽤流⾏的模式例如Reactive和事件驱动架构来提供⼀个包含应答的⾮阻塞的、端对端流。
实现了最新的Reactive流标准,通过不发送多于当前容量的请求来使受限的结构更有效率。
使⽤这些概念到进程间通信,提供了理解控制流的⾮阻塞IO驱动。
对开发者暴露功能API,帮助开发者使⽤⼀个⽆副作⽤的⽅式组织代码,也帮助你确定在什么场景下你是线程安全和具有容错性的。
项⽬简介:
该项⽬始于2012年,孕育时间较长。2013年出现Reactor1.x版本。该版本成功部署到不同的组织,不仅有开源组织如MeltDown、还有商业机构如Pivotal RTI。2014年我们实现了新的"Reactive流标准",并在2015年的4⽉开始了版本2.0的⼤规模重构⽬标。Reactive流标准拉近了分发机制的鸿沟:控制多少线程传递多少数据。
同时我们也决定重新调整我们的⼀些事件驱动和任务协调API的来应对⽇益流⾏、记录的reactive扩展。
Reactor由Pivotal赞助⽀持,有两个核⼼提交者。因为Pivotal同时也是spring框架的东家,我们的很多同事也是不同spring项⽬的核⼼贡献者,所以我们也提供从Reactor到spring的集成同时也⽀持spring框架的⼀些重要功能如spring消息模块的STOMP代理。也就是说,我们不会强迫仅仅想使⽤Re孕妇血糖
actor的⼈去适应spring。我们保留了⼀个⼤容量Reactive的内嵌⼯具。事实上,Reactor的⽬标之⼀是在你解决异步和功能性问题时保持公正的态度。
Reactor遵循,可以通过获取。
使⽤要求:
Reactor需要jdk7及以上版本。
但完整的功能组合表达式需要java8的lambdas⽀持。
作为后备,⽀持spring clojure和groovy的扩展。
Reactor需要jvm⽀持Unsafe⽅式获取(如:android不⽀持)时才能表现最全的功能。永久和平
当Unsafe获取不⽀持是所有基于RingBuffer的特定将不能⼯作。
Reactor打包成传统的jar形式存在于maven中央库中,可以使⽤你喜欢的⼯具来拉取这个依赖包。
架构总览:
Reactor基本代码划分为⼏个⼦模块,这样你可以单独使⽤某⼀模块⽽抛弃不需要的模块。
下述是⼀些使⽤Reactor模块和其它混合的Reactive技术⽰例,完成异步⽬标:
Spring XD + Reactor-Net (Core/Stream) : 使⽤Reactor 作为Sink/Source IO 驱动.
Grails | Spring + Reactor-Stream (Core) : 使⽤Stream和 Promi作为后台处理程序。
Spring Data + Reactor-Bus (Core) : ⽣产数据库事件(Save/Delete/…).
Spring集成Java DSL + Reactor Stream (Core) : Microbatch MessageChannel from Spring Integration.
RxJavaReactiveStreams + RxJava + Reactor-Core : Combine rich composition with efficient asynchronous IO Processor
RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream) : Compo input data with RxJava and gate with Async IO drivers.
Reactive stream
Reactive stream是⼀个新的标准,不同的⼚商和技术组织包括Netflix,oracle,Pivotal,TypeSafe⽀持这个标准。该标准有望被java9或者以后的版本的标准收录进去。
该标准的⽬标是提供⼀种同步或者异步的具有控制流机制的数据序列。这个标准是轻量级的,第⼀个⽬标是JVM。它提供了4个java接⼝,⼀个Tck和⼀系列的⽰例。根据需
要,4个java接⼝的实现⾮常直接,这个项⽬的内涵是Tck对操作的校验。
图三 Reactive stream协约
Reactive Streams接⼝
个Subscription,subscription获取Subscriber将要处理多少数据。与其它数据时序信号交互的其它回调有: next (新消息)和可选的completion/activestreams.Subscription:在初始化时传给Subscriber的⼀个⼩的追踪器。.它控制着我们准备消费掉多少数据和什么时候停⽌消费(取消).
图四:Reactive Stream发布协约
通过传递Subscriber,⼀个请求数据从subscriber到publisher有两种⽅式:
⽆限制的: 在订阅时, 仅调⽤Subscription的request(Long.MAX_VALUE)⽅法.
有限制的: 在订阅时, 保留subscription的引⽤,并且当subscriber准备处理数据时调⽤request(long)⽅法。
通常, 在订阅时Subscribers将请求⼀组初始数据或者甚⾄1个数据。
然后,onNext认为执⾏成功(例如后⾯的Commit, Flush等等), 请求更多的数据。
建议使⽤线性组的请求。为避免请求重叠,例如每次下次请求时请求10个或者更多的数据。
表1 ⽬前为⽌,Reactor直接使⽤的Reactive stream接⼝及实现
Reactive
Streams
Reactor模块实现说明
Processor reactor-core, reactor-
stream
Publisher reactor-core, reactor-
bus, reactor-stream,
reactor-net
爱情逃兵
reactor.io.*
在core模块,处理器继承了Publisher.在bus模块,发布⼀个不限制的路由事件,在stream模
块,stream扩展直接继承Publisher. 在net模块,Chanel继承了Publisher来消费请求数据,同时也提工作经历模板
供了具有flush和clo的回调的providers.
Subscriber reactor-core, reactor-
bus, reactor-stream,
reactor-net
reactor.bus.EventBus.*,
reactor.io.impl.*
在core模块,处理器继承了Subscriber. 在bus模块,提供了⽆限制的Publisher/Subscriber能⼒.在
stream模块,Subscribers计算特定的回调⾏为.在Net模块,subscriber的IO层实现处理写、关闭和
flush.
Subscription reactor-stream,
reactor-net
reactor.io.impl.*
在stream模块, 提供了⼀个优化过的PushSubscriptions和 buffering-ready ReactiveSubscription. 在
Net模块, 使⽤⾃定义Subscription实现背压的⽅式实现异步IO读。
从reactor 2启动时我们就⼀直遵循这个标准,并且随着标准的改变⽽改变直到1.0.0正式版准备发布。现在可以通过maven中央库及其流⾏的镜像可以找到该标准,你将发现它作为过渡,依赖于reactor-core模块。
Reactive扩展
Reactive扩展或者通常称作Rx,是⼀种定义完备的功能api,这些api扩展了观察者模式到⼀个史诗的程度。
Rx模式⽀持实现了使⽤少数设计的关键字来处理Reactive 数据序列:
使⽤回调链来抽象实时及延迟:当可以获得到数据时调⽤。
抽象了⼀直使⽤的线程模式:同步或者异步仅仅是我们处理的Obrvable/Stream。
控制错误传递及停⽌:错误和完成信号及数据的有效负载信号传递到链中。
在多个预先定义的api中解决了多个扩展-聚合及其它组合问题。
Reactive扩展的标准Jvm实现是RxJava。它提供了⼀个功能丰富的Api。
Reactor 2 提供了⼀个特定模块实现了Reactive扩展的⼀部分功能。建议需要使⽤Reactive stream全部功能的⽤户使⽤RxJava。最后,当组合完整的RxJava系统时,⽤户可
以从Reactor提供的强⼤的异步和IO的中获益。
表2:Rx和Reactor stream的不同点:
rx reactor-stream说明
Stream Reactive Stream Publisher的实现
夕火念什么
action.Action Reactive Stream Processor的实现蓝鲸紧急出动
Obrvable with 1 data at Promi返回唯⼀结果的类型, Reactive Stream Processor实现并提供了可选的异步分发功
能。
Factory API (just, from, merge….)Streams和core模块的 data-focud ⼦类⼀样, 返回 Stream
Functional API (map, filter, take…
.)
Dispatcher,
Reactor Stream计算⽆限制的共享Dispatcher或者有限的Processor的操作。
Obrvable.obrveOn()Stream.dispatchOn()只是dispatcher参数的⼀个适配命名。