projectreactor
project reactor (1)
核⼼
flux
an Asynchronous Sequence of 0-N Items
⼀个表⽰ 包含0-n个元素的 异步序列。
mono
an Asynchronous 0-1 Result
会动的房子教案
⼀个表⽰ 包含1个元素或者没有元素的 异步结果。
创建并订阅flux和mono
创建
广西阳朔
Flux<String> q1 = Flux.just("foo","bar","foobar");
List<String> iterable = Arrays.asList("foo","bar","foobar");
Flux<String> q2 = Flux.fromIterable(iterable);
Mono<String> noData = pty();
Mono<String> data = Mono.just("foo");
// 第⼀个参数是范围的开始,⽽第⼆个参数是要⽣成的itmes数。
Flux<Integer> numbersFromFiveToSeven = Flux.range(5,3);
狗咬尾巴订阅
subscribe 使⽤了java8的lambda函数表达式,若不熟悉java8函数表达式以及stream流操作api的童鞋,强烈建议学习⼀下!不光是为了下⾯的内容铺垫,它真的很厉害!
包含以下变种:
// 订阅并触发序列。
subscribe();
// 为每个产⽣的value做⼀些逻辑。
subscribe(Consumer<?super T> consumer);
// 处理值,但也会对错误作出反应。
subscribe(Consumer<?super T> consumer,
Consumer<?super Throwable> errorConsumer);
// 处理值和错误,但在序列成功完成后还要运⾏⼀些代码。
subscribe(Consumer<?super T> consumer,
Consumer<?super Throwable> errorConsumer,
Runnable completeConsumer);
// 处理值和错误并成功完成,但也可以使⽤此 subscribe 调⽤产⽣的Subscription 执⾏某些操作。
subscribe(Consumer<?super T> consumer,
Consumer<?super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<?super Subscription> subscriptionConsumer);
这些变体返回对subscribe 的引⽤,我们可以在不需要更多数据时使⽤该引⽤来取消订阅。 取消后,源应停⽌产⽣值并清除其创建的任何资源。
这种取消和清理⾏为在Reactor中由通⽤的Disposable接⼝表⽰。
subscribe ⽰例(五个变种)
1 简单操作
// 设置⼀个Flux,当添加⼀个订阅者之后,该Flux会产⽣三个值。
Flux<Integer> ints = Flux.range(1,3);
// 以最简单的⽅式订阅。
ints.subscribe();
2 消费操作
Flux<Integer> ints = Flux.range(1,3);
// 订阅者订阅并打印对应的value
ints.subscribe(System.out::println);
3 2+error回调
Flux<Integer> ints = Flux.range(1,4)
.map(i ->{
if(i <=3){
return i;
}
throw new RuntimeException("Got to 4");
});
ints.subscribe(
System.out::println,
error -> println("Error: "+ error)
);
输出:
1
2
3
Error:RuntimeException: Got to4
4 3+completion events 回调
Flux<Integer> ints = Flux.range(1,4);
ints.subscribe(
System.out::println,
error -> println("Error "+ error),
// 添加 completion events 处理器
()-> System.out.println("complete Done")
);
输出:
1
2
3
4cmd关机命令
complete Done
注意:error 信号和complete 信号都是 终端事件(stream中的概念),并且彼此互斥。 为了使完成completion events 处理器⼯作,因此⽰例代码没有触发error 信号。
complete 回调没有输⼊,由⼀对空括号表⽰:它与Runnable接⼝中的run⽅法匹配。
5 4+Subscription 操作
该变体需要您对Subscription进⾏操作(对它执⾏⼀个request(long)或cancel())。 否则,flux会挂起。
Flux<Integer> ints = Flux.range(1,4);
ints.subscribe(System.out::println,
error -> println("Error "+ error),
()-> System.out.println("Done"),
// 当我们订阅时,我们会收到⼀个 Subscription。这⾥表⽰我们要从源中获取最多10个元素(实际上将发出4个元素并完成)。
sub -> quest(10));
注意,新版3.5 的api已经弃⽤此变种,官⽅的解释是使⽤者经常会忘记 subsciption,因此如果确实需要的话官⽅建议使⽤subscribeWith(Subscriber) 代替。
就像这样就不会忘记啦/捂脸,怎么会有种奇葩的感觉:
ints.subscribeWith(new Subscriber<Integer>(){
@Override
public void onNext(Integer integer){
System.out.println(integer);
}
@Override
public void onError(Throwable throwable){
}
@Override
public void onComplete(){
System.out.println("Done");
}
@Override
public void onSubscribe(Subscription subscription){
}
});
使⽤返回的Disposable取消⼀个 subscribe()
以上那些基于lambda的 subscribe()变体都具有 Disposable 返回类型。因此在这种情况下,可以通过调⽤其dispo()⽅法来取消订阅。
对于flux和mono来说,cancel 是让信号源应停⽌产⽣ 元素 的信号。 但是这个操作并不能保证⽴即执⾏:某些源可能以很快的速度产⽣元素,以⾄于在接收到取消指令之前已经完成了所有的⽣产动作。
Disposables类中提供了⼀些有关Disposable的实⽤程序。其中,Disposables.swap()创建⼀个Disposable包装器,使我们可以原⼦地取消和替换⼀个具体的Disposable。
另⼀个有趣的实⽤程序是posite(…)。 通过此组合,可以收集多个Disposable(例如,与服务调⽤关联的多个进⾏中的请求),并在以后⼀次性处理所有这些请求。 调⽤组合的dispo()⽅法后,任何尝试添加另⼀个Disposable都会⽴即被dispo。
lambda的替代⽅案 BaSubscriber
还有⼀种附加的订阅⽅法,该⽅法更通⽤并且采⽤成熟的订阅服务器,⽽不是从⼀个lambda表达式中⽣成。
BaSubscriber(或其⼦类)的实例是⼀次性的,这意味着,如果BaSubscriber订阅了第⼆个发布者,则取消其对第⼀个发布者的订阅。 这是因为使⽤实例两次将违反“反应式流”规则,即不得并⾏调⽤订阅服务器的onNext⽅法。
⽰例:
package;
import Subscription;
import BaSubscriber;
/**
默认情况下,BaSubscriber会触发⼀个⽆限制的请求,其⾏为与Subscribe()完全相同。
但是,当需要⾃定义请求量时,扩展BaSubscriber会更加有⽤。
**/
public class SampleSubscriber<T>extends BaSubscriber<T>{
@Override
public void hookOnSubscribe(Subscription subscription){
System.out.println("Subscribed");
// ⾃定义请求量⼀次⼀个请求
request(1);
}
@Override
public void hookOnNext(T value){
System.out.println(value);
/
/ ⾃定义请求量⼀次⼀个请求
request(1);
}
}
测试:
SampleSubscriber<Integer> ss =new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1,4);
ints.subscribe(ss);
输出:
Subscribed
1
山西网上税务局
2
3
4
肉怎么画BaSubscriber还提供了requestUnbounded()⽅法以切换到⽆界模式(等效于request(Long.MAX_VALUE)),当然还提供了cancel()⽅法。
另外和lambda中的变种类似,它还具有其他钩⼦:hookOnComplete,hookOnError,hookOnCancel和hookFinally(在序列终⽌时始终调⽤,终⽌类型作为SignalType参数传递)。对于这些钩⼦⽅法童鞋可⾃⾏实现,⽰例中的 SampleSubscriber 是执⾏绑定请求的订阅服务器的绝对最⼩实现。
Backpressure 和重塑 requests
在Reactor中实现Backpressure 时,通过向上游 operator发送 request ,将consumer pressure 传播回source 。 当前请求的总和有时被称为当前“需求”或“待处理请求”。 需求的上限为Long.MAX_VALUE,表⽰⽆限制的请求(意味着“尽快⽣成”,这样基本上是禁⽤Backpressure )。
第⼀个请求在订阅时来⾃ final 订阅者,但是下⾯这些最直接的订阅⽅式⽴即触发了Long.MAX_VALU
E的⽆限制请求:subscrbe()及其⼤多数基于lambda的变体(具有Consumer 的变量除外)冰岛的首都
block(), blockFirst() 和 blockLast()
通过toIterable() 或者 toStream() 进⾏遍历
定制原始请求的最简单⽅法是使⽤BaSubscriber进⾏subscribe,并覆盖hookOnSubscribe⽅法,如以下⽰例所⽰:
Flux.range(1,10)
.doOnRequest(r -> System.out.println("第 "+ r +" 个请求"))
.subscribe(new BaSubscriber<Integer>(){
@Override
public void hookOnSubscribe(Subscription subscription){
request(1);
}
@Override
public void hookOnNext(Integer integer){
System.out.println("收到 "+ integer +" 之后cancel ");
cancel();
}
});
输出:
第1个请求
收到1之后cancel
在处理请求时,必须⼩⼼以产⽣⾜够的需求来推进序列,否则您的Flux可能会“卡住”。 这就是为什么BaSubscriber在
hookOnSubscribe中默认为⽆界请求的原因。 覆盖此挂钩时,通常应⾄少调⽤⼀次请求。
改变下游需求的Operators
音乐书
要记住的⼀件事是,上游链中的每个Operator 都可以调整在订阅级别表达的需求。 ⼀个教科书式的例⼦是 buffer(N)运算符:如果它接收到request(2),则将其解释为对两个完整缓冲区的需求。 结果,由于缓冲区需要将N个元素视为已满,因此缓冲区运算符会将请求重塑为2 xN。