javafluxapi,JAVAReactorAPI简单使⽤(Flux和Mono)及
Web。。。
⼀. Reactor API 简单使⽤(Flux和Mono)
1. 常⽤创建Flux及Mono的⽅式
1.1. 使⽤just从现有的已知内容和⼤⼩的数据创建Flux或Mono
//使⽤数组创建⼀个被观察者(⽣产者,Flux)
Flux.just(new String[]{"hello",",","nihao","!"})
//观察者监听被观察者(消费者)
迈克尔杰克逊壁纸.subscribe(System.out::print);
//使⽤可变参数创建Flux
Flux.just("你","好","啊","!")
.
subscribe(System.out::print);
//使⽤just创建Mo
Mono.just("asd").subscribe(System.out::println);
1.2. 使⽤fromIterable从可迭代对象中创建Flux
本草纲目
//从可迭代的对象中创建Flux
Flux.fromIterable(Arrays.asList("你好",",","fromIter","!"))
.subscribe(System.out::print);
var list = new ArrayList(List.of("你","好"));
Flux flux = Flux.fromIterable(list);
list.add("啊");//在创建Flux后追加元素
flux.subscribe(System.out::print);//这⾥输出: 你好啊
王者电脑版
1.3. 使⽤fromStream从集合流中创建Flux
//流也可以是Arrays.asList("a", "b").stream()等⽅式返回的流
Flux.fromStream(Stream.of("从","流","中创建","Flux!"))
.subscribe(System.out::println);
1.4. 使⽤range中创建⼀个范围内迭代的Flux
Flux.range(0,10).subscribe(System.out::print);
1.5. 使⽤interval创建间隔某⼀时间异步执⾏的Flux
Flux.interval(Duration.ofMillis(100))
//限制执⾏10次
.take(10)
//避免主线程提前结束
Thread.sleep(1100);
吉尔格1.6. 从Mono转化⽽来的Flux
Mono.just("asd").flux().subscribe(System.out::print);
1.7. 从多个Mono组合⽽来的Flux
Mono.just("Mono1").concatWith(Mono.just("---Mono2"))
.subscribe(System.out::println);
1.8. 使⽤generate动态创建Flux只能执⾏⼀次的Flux
// 同步动态创建,next 只能被调⽤⼀次
<("第⼀次");
//第⼆次会报错:
/
/java.lang.IllegalStateException: More than one call to onNext //("第⼆次");
}).subscribe(System.out::print);
1.9. 使⽤create动态创建Flux可以执⾏多次的Flux,及Mono
// 同步动态创建,next 能被调⽤多次
for (int i = 0; i < 10; i++) {
<("现在的次数:" + i);
}
}).subscribe(System.out::println);
/
/ 同步动态创建Mono
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sink.success("by create");
}).subscribe(System.out::println);
Mono.fromCallable(() -> {
Thread.sleep(1000);
return "asd";
}).subscribe(System.out::println);
2. 异常处理
//直接创建⼀个包含异常的Flux
<(new Exception());
//直接创建⼀个包含异常的Mono
<(new Exception());
Mono.just("mono")
//连接⼀个包含异常的Mono
.(new Exception("myExc")))
//异常监听
.
doOnError(error -> System.out.println("错误: "+ error)) //在发⽣异常时将其⼊参传递给订阅者
.onErrorReturn("-excRet")
.subscribe(System.out::println);
/*最终输出:
mono
错误: java.lang.Exception: myExc
-excRet
*/
3. 常⽤⽅法
3.1. 使⽤concatWith合并及concatWithValues追加
//合并多个Mono为⼀个Flux
王子山森林公园
Mono.just("Mono1").concatWith(Mono.just("---Mono2")) .subscribe(System.out::print);
//连接多个Flux
Flux.just("连接")
//连接两个Flux
.concatWith(Flux.just("两个"))
//将元素追加到Flux
.concatWithValues("或追加")
3.2. 使⽤zipWith组合为元素
// 结合为元祖,两个取其端的那个,长的那个多余的被舍弃
Flux s1 = Flux.just("s1-0", "s1-1","s1-2");
Flux s2 = Flux.just("s2-0", "s2-1");
s1.zipWith(s2)
.subscribe(tuple -> System.out.T1() + " -> " + T2()));
3.3. 使⽤skip跳过元素
Flux.just(1,2,3,4,5)
//跳过前2两个
.skip(2)
满江红故事//输出: 345
.subscribe(System.out::print);
3.4. 使⽤take截取元素
Flux just = Flux.just("截取", "前⼏个", "元素");
//截取前两个元素组成新的flux,不改变原flux
Flux take = just.take(2);
//输出: 截取前⼏个
take.subscribe(System.out::print);
System.out.println("\n=====");
//输出: 截取前⼏个元素
just.subscribe(System.out::print);
3.5. 使⽤filter过滤元素
Flux.just(1,2,3,4,5,6,7,8,9)
//过滤偶数
.filter(i->i%2==0)
//输出: 2468
.
subscribe(System.out::print);
3.6. 使⽤distinct去重元素
//默认去重
Flux.just(1,1,2,2,3,3)
//去重
.distinct()
//输出: 123
//将要去重的⾃定义的类
class MyClass{
public int key;
public String val;
MyClass(int k, String v){
key=k;val=v;
}
public String toString(){
return String.format("{%d, %s} ",key,val);
}
}
数学日记Flux.just(new MyClass(1,"asd"),new MyClass(1,"asdf"),new MyClass(2,"asd")) //⾃定义对象的⽐较键(参与⽐较的字段)
.distinct(s->s.key)
//输出: {1, asd} {2, asd}
.subscribe(System.out::print);
3.7. 延迟执⾏(异步)
Flux.just("这是","延迟","执⾏")
//在⼀秒后输出: 这是延迟执⾏
.delayElements(Duration.ofSeconds(1)).subscribe(System.out::print); Thread.sleep(1100);
3.8. 从Flux获取⾸个元素
Flux just = Flux.just("这是", "next", "执⾏");
//获取第⼀个元素为Mono,原Flux中的元素不变
Mono next = ();
//输出: 这是
next.subscribe(System.out::println);
System.out.println("=========");
/
/输出: 这是next执⾏
just.subscribe(System.out::print);
3.9. 从Flux阻塞式取⼀个元素
Flux flux = ate(skin -> {
for(int i=0;i<2;++i){