ReactiveStreams
1.概述
在本⽂中,我们将介绍Java 9 Reactive Streams。简⽽⾔之,我们将能够使⽤Flow类,它包含⽤于构建反应流处理逻辑的主要构建块。Reactive Streams是具有⽆阻塞背压的异步流处理的标准。此规范在Reactive Manifesto中定义,并且有各种实现,例如,RxJava或Akka-Streams。
2.Reactive API概述
要构建Flow, 我们可以使⽤三个主要抽象并将它们组合成异步处理逻辑。
每个Flow都需要处理Publisher实例发布给它的事件 ; he Publisher 有⼀个⽅法 – subscribe().
如果任何订阅者想要接收由其发布的事件,则他们需要订阅给定的发布者。
消息接收者需要实现订阅者接⼝。通常,这是每个Flow处理的结束,因为它的实例不会进⼀步发送消息。我们可以将Subscriber视为⼀个接收器。这有四个需要重写的⽅法 - onSubscribe(),onNext(),onError()和onComplete()。
如果我们想要转换传⼊消息并将其进⼀步传递给下⼀个订阅服务器,我们需要实现处理器接⼝。
这既可以作为订阅者,也可以作为发布者,因为它处理这些消息并将其发送以进⾏进⼀步处理。
3.发布和使⽤消息
假设我们要创建⼀个简单的Flow,其中我们有⼀个Publisher发布消息,⼀个简单的订阅者在消息到达时消费⼀个消息。让我们创建⼀个EndSubscriber类。我们需要实现Subscriber接⼝。接下来,我们将覆盖所需的⽅法。所述onSubscribe()⽅法在处理开始之前调⽤。订阅的实例作为参数传递。它是⼀个⽤于控制订阅服务器和发布服务器之间的消息流的类:
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
}
}
夏林果我们还初始化⼀个空列表的consumedElements那将在测试中使⽤。
现在,我们需要从Subscriber接⼝实现其余⽅法。这⾥的主要⽅法是onNext() - 每当Publisher发布⼀条新消息时都会调⽤它:
@Override
public void onNext(T item) {
干菜肉System.out.println("Got : " + item);
}
冲三小
请注意,当我们在onSubscribe()⽅法中启动订阅时,当我们处理消息时,我们需要在Subscription上调⽤request()⽅法,以表⽰当前订阅服务器已准备好使⽤更多消息。
最后,我们需要实现onError() - 只要在处理中抛出⼀些异常就会调⽤它,以及onComplete() -当Publisher关闭时调⽤:
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
让我们为Processing Flow编写⼀个测试。我们将使⽤SubmissionPublisher类 - 来⾃urrent的构造- 它实现了Publisher接⼝。
@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
// when
NumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.clo();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> sumedElements)
.containsExactlyElementsOf(items)
);
}
注意,我们在EndSubscriber的实例上调⽤clo()⽅法。它将在给定Publisher的每个订阅服务器下⾯调⽤onComplete()回调。运⾏该程序将产⽣以下输出:
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
4.消息的转换
假设我们想要在发布服务器和订阅服务器之间构建类似的逻辑。我们将创建实现Processor并扩展SubmissionPublisher的TransformProcessor类。
我们将传⼊⼀个将输⼊转换为输出的函数:
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
滑轮鞋自学视频implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(T item) {
submit(function.apply(item));
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
clo();
}
}
现在让我们编写⼀个快速测试,其中包含Publisher发布String元素的处理流程。
我们的TransformProcessor将String解析为Integer - 这意味着需要在此处进⾏转换:
@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor可爱的画
= new TransformProcessor<>(Integer::parInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.clo();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
sumedElements)
.containsExactlyElementsOf(expectedResult)
);
}李家旭
注意,在基本Publisher上调⽤clo()⽅法将导致调⽤TransformProcessor上的onComplete()⽅法。
请注意,处理链中的所有Publisher都需要以这种⽅式关闭。
5.使⽤订阅控制消息需求
让我们修改我们的EndSubscriber只消耗N个消息。我们将传递该数字作为howMuchMessagesConsume构造函数参数:public class EndSubscriber<T> implements Subscriber
<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
生日游戏}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (() > 0) {
}
}
/
/...
}
让我们编写⼀个测试,我们只想从给定的Subscription中使⽤⼀个元素:
@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
// when
NumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.clo();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
sumedElements)
.containsExactlyElementsOf(expected)
);
}
虽然发布者发布了六个元素,但我们的EndSubscriber只消耗⼀个元素,因为它表⽰只处理那个元素的需求。
六,结论
我们了解了如何创建由发布服务器和订阅服务器组成的处理流程。我们使⽤处理器转换元素创建了⼀个更复杂的处理流程。最后,我们使⽤Subscription来控制订阅者对元素的需求。兔妈妈