Java响应式编程基础-响应式流
什么是流
形象的⽐喻来说就是如同⽔⼀样绵绵不绝的数据形式。⽽抽象点来说,是有⼀个⽣产者(source)产⽣,由⼀个或者多个消费者(sink)消费的数据元素(item)序列。那从这个抽象的描述就可以看出,使⽤流来承担数据交互的模式就是咱们经常说的⽣产者/消费者模型,⽽这种模型也可以称之为发布者/订阅者模型(后⽂将使⽤这个名字,因为JDK中使⽤的是这个名字)。
对于流数据来说,⼀般有两种的数据流转⽅式:
拉(pull)数据模式:订阅者向发布者索要数据。
推(push)数据模式:发布者向订阅者推送数据(push)。
这两种模式都是描述的单次信息传递的⽅式。如果发布者产⽣信息的速度和订阅者消费信息的速度⼀致的话,那这两种⽅法都将是⼗分有效的数据流转⽅式。
流有什么问题
流的问题在于当两端的速度不匹配的时候(考虑⼀下各种mq主要处理的问题削峰平⾕)。⽽速度的不匹配⾃然存在以下两种情况:
订阅者消费速度快
这种情况的时候会出现订阅者有处理能⼒了,但是订阅者⽆信息可以处理的情况。如果这种时候是同步的调⽤模式,则订阅者将会阻塞,直到有新的信息可以进⾏处理。⽽如果这时候是异步的信息处理模式,则订阅者可以在⽆消息处理的时候挂起,直接切换到其他的任务处理中(对于多核CPU的多线程来说)。也就是说,对于这种情况,⽐较理想的是异步推模式。
发布者发布速度快
当发布者发布速度快的时候,会发⽣订阅者来不及处理数据的情况。如果是同步的情况下发布者会⼀直阻塞,⽽如果是异步模式则对于订阅者来说有两种处理⽅式(可以类⽐⼀下线程池设计)可以处理:
损失数据:丢弃数据(在有限的队列缓存已经满了的情况下)
不损失数据:加⼊队列缓存数据(订阅者需要有拥有⽆限的缓冲队列暂存数据,以确保不会溢出)
⽽还有另⼀种需要发布者加⼊的处理⽅式叫做背压(backpressure)。背压的实现⽅式是:由订阅者
发出信号,让发布者降低信息的发布速度,从⽽让信息速度之间匹配。背压的优点是同样可以处理信息流速不⼀致问题。⽽更有意思的是,这时候信息的处理策略可以由发布者来选择:
损失数据:丢弃数据(在有限的队列缓存已经满了的情况下)
不损失数据:加⼊队列缓存数据(发布者需要有拥有⽆限的缓冲队列暂存数据,以确保不会溢出)
没错,这两种情况是和订阅者⼀致的,不过选择权则由订阅者变成了发布者。
也就是说,在发布者发布速度快的时候,要么发布者直接同步阻塞,要么可以先根据消息的主要关⼼⽅(是发布者还是订阅者)来确定是否使⽤背压,然后再根据数据的类型判断是否接受数据丢弃(不丢弃可能会导致系统崩溃)。往往我们的发布者可以由上层的mq或者程序的应答机制保护消息的可⽤性。
那么结论是什么,我们需要异步⾮阻塞(订阅者消费快)、以及背压(发布者发布快)。
什么是响应式流
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
响应式流(Reactive Streams)概念被提出是在2013年,旨在处理上⼀⼩节中由于流速问题⽽产⽣的⼏种问题:订阅者订阅者的阻塞、由订阅者(数据下游)来选择是依赖⽆限队列(数据不丢)或直接丢弃数据。
⽽对于⼀项标准⽽⾔,它的⽬是⾃然是⽤更少的协议来描述交互。⽽响应式流的模型也是⼗分简单:
订阅者异步的向发布者请求N个元素。
发布者⼀步的向订阅者发送M(0<M<=N)个元素。
社会主义价值观24字
基于这个模型,响应式流可以做到pull模型和push模型流处理机制之间动态切换。当订阅者较慢时,发送者没有新数量的请求则发布者进⼊等待请求信息的状态,相当于pull模型;当订阅者更快时,相当于发布者没有新的信息,订阅者进⼊到等待消息发送的状态相当于push模型。
五行什么生土Java中的响应式流
对于响应式流,在2015年的时候确定了关于其Java API,具体的详情也也可以参考上⾯的链接。其中定义了4个API,具体为:Publisher值周生
Subscriber
Subscription
Processor<T,R>
对他们的定义为:
Publisher(发布者)
是⼀个假定上游会产⽣⽆限数据的信息发布者。他们会向有发送请求的订阅者推送元素。
Subscriber(订阅者)
订阅者会从发布者那⾥领取令牌,然后根据令牌向发布者发送“获取请求”。同时当发布者部分准备好元素的时候,会通过令牌对订阅者进⾏调⽤,进⾏数据消费。
Subscription(令牌)
发布者和订阅者通过令牌来进⾏信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。
Processor(处理器)
可以通过处理器连接发布者、订阅者以及其他处理器。Processor本⾝同时继承了Publisher与Subscriber接⼝,所以可以对元素进⾏处理转发。主要⽤于让数据从T转换为R。同时,由于Processor本⾝也可以接⼊Processor,所以Processor可以组成链来对数据进⾏处理。
行草硬笔
⼀次完整的调⽤流程⼤概可以描述为:
1. 订阅者向发布者发送订阅请求。
2. 发布者根据订阅请求⽣成令牌发送给订阅者。
3. 订阅者根据令牌向发布者发送请求N个数据。
4. 发送者根据订阅者的请求数量返回M(M<=N)个数据
5. 重复3,4
6. 数据发送完毕后由发布者发送给订阅者结束信号
⽽Java API中的接⼝如下所⽰,其中所有的⽅法都是void,因为所有的⽅法都是异步执⾏的。
1001夜
public interface Publisher<T>{
//⽤于1.中订阅请求
public void subscribe(Subscriber<?super T> s);
}
public interface Subscriber<T>{
//⽤于2.中回调发送令牌
public void onSubscribe(Subscription s);
//⽤于3.⽤于接受4中发送过来的数据
public void onNext(T t);
//⽤于3,4,5接收中间异常了之后的调⽤
public void onError(Throwable t);
//⽤于6.中结束信号的回调
public void onComplete();
}
public interface Subscription {
//⽤于3.的发送请求N个数据
public void request(long n);
//⽤于3,4,5订阅者异步的向
public void cancel();
}
public interface Processor<T,R>extends Subscriber<T>, Publisher<R>{
}
JDK中的响应式流
Java API中的流程使⽤⽅式看起来⽐较简单,但API背后的具体实现由于是全异步交互以及涉及具体背压处理⽽很困难。⽽JDK9中为⽤户提供了Publisher接⼝的简单实现,让开发⼈员可以基于此来扩展出⾃⼰的实际需求。
JDK 9中的响应式流功能提供在urrent包下,全响应式流的API接⼝被封装到 Flow接⼝中,其中包括需要使⽤的接⼝以及静态⽅法,关于上⼀⼩节中接⼝⽅法的详细描述也可以参见该接⼝上的⽅法描述。其中的静态接⼝为:
Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription
除去上⼀⼩节说的4个接⼝外,Flow中还包含了⼀个默认⽅法defaultBufferSize(),⽤于返回默认的令牌中的缓冲区⼤⼩,⽽默认的值
为DEFAULT_BUFFER_SIZE = 256。
除去Flow外,其中还有⼀个刚刚说到的Publisher的简单实现类SubmissionPublisher<T>。该接⼝在实现了publisher<T>之外还实现
了AutoCloable接⼝,所以可以直接⽤try块来进⾏资源的管理。
尽管JDK 9中没有提供Subscriber<T>的简单实现,但是在SubmissionPublisher<T>中提供了⼀个consume(Consumer<? super T> consumer)⽅法,⽤于让开发⼈员可以直接消费消息发布者的所有元素。实际上是在内部实现了简单的Subscriber为ConsumerSubscriber,但是并不是public的,所以不能直接使⽤。
简单的例⼦
根据JDK 9中提供的SubmissionPublisher<T>咱们来写⼀个⼩例⼦。
减肥的食物
public static void main(String[] args){
// ⽤于承接返回值的任务
CompletableFuture<Void> task;
// try-with-resource来控制资源
try(SubmissionPublisher<Integer> publisher =new SubmissionPublisher<>()){
System.out.println("默认缓冲容量: "+ MaxBufferCapacity());
// 传⼊打印⽅法来处理元素
task = sume(System.out::println);
// 打印数字,调⽤发布者进⾏信息处理
IntStream.range(1,6)
.forEach(publisher::submit);
}
if(task !=null){
try{
// 当所有订阅者处理完毕后调⽤
<();
}catch(InterruptedException | ExecutionException e){
e.printStackTrace();
}
}
}
在这个例⼦⾥⾯进⾏了以下⼏件事。
1. 声明⼀个CompletableFuture⽤于捕获后续的处理事件。
2. 开启资源⽤于进⾏流消息订阅
3. 设置流的订阅⽅法(订阅者)塞波加大公国
4. 进⾏发布者的信息发送
婴儿服
5. 阻塞主⽅法等待处理完毕后结束
其中MaxBufferCapacity()会打印默认的缓存空间256。在调⽤sume的时候,是奖传⼊的Consumer在内部封装成⼀
个Subscribr的简单实现类,⽤于订阅信息的发送,实时上后续数据的订阅者就是在这步创建的。
当publisher进⾏调⽤的时候,调⽤submit发送数据,publisher有两个⽅法⽤于发送数据,⼀个是submit,⼀个是offer。两个⽅法下⾯实际都是调⽤的doOffer⽅法,所以,offer⽅法提供了置顶延迟时间后丢弃的策略,⽽submit是offer的简单实现,是⼀致阻塞不丢弃。
最后
不得不说响应式流是java中响应式编程的基础,⽽JDK 9中也提供了Reactive Streams的“简单”实现。之所⽰简单是打引号的是因为实际上还有点绕的,有兴趣的同学可以追⼀下SubmissionPublisher<T>的实现,有⼀些思想的经典实现,⽐如⽤整数中的7位来作为状态机。在下⼀篇中我们再聊⼀下JDK 9中的数据交互顺序。