Rxjava3⽂档级教程⼀:介绍和基本使⽤关于兔子的谚语
商业转载请联系作者获得授权,⾮商业转载请注明出处。
⽬录
夏枯草的功效与作用及禁忌⼀ Rxjava3简介
ReactiveX的历史
ReactiveX是Reactive Extensions的缩写,⼀般简写为Rx,最初是LINQ的⼀个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11⽉开源,Rx是⼀个编程模型,⽬标是提供⼀致的编程接⼝,帮助开发者更⽅便的处理异步数据流,Rx近⼏年越来越流⾏了,现在已经⽀持⼏乎全部的流⾏编程语⾔了,Rx的⼤部分语⾔库由ReactiveX这个组织负责维护,⽐较流⾏的有RxJava/RxJS/Rx,社区⽹站是 。
什么是ReactiveX
微软给的定义是,Rx是⼀个函数库,让开发者可以利⽤可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使⽤Rx,开发者可以⽤Obrvables表⽰异步数据流,⽤LINQ操作符查询异步数据流,
⽤Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Obrvables + LINQ + Schedulers。
ReactiveX.io给的定义是,Rx是⼀个使⽤可观察数据流进⾏异步编程的编程接⼝,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
Obrvable拥有它的近亲Iterable的全部优雅与灵活。任何对Iterable的操作,你都可以对Obrvable使⽤。
RxJava
RxJava是响应式编程(Reactive Extensions)的java实现,它基于观察者模式的实现了异步编程接⼝。
Rxjava 3.x 的;
RxJava2将被⽀持到2021年2⽉28⽇,错误的会同时在2.x和3.x修复,但新功能只会在3.x上添加;
Rxjava 3.0的⼀些改变:;
Rxjava 3.x ⽂档可以在中找到;
使⽤Rxjava3.x之前的准备⼯作:
添加依赖
//RxJava的依赖包
implementation 'java3:rxandroid:3.0.0'
//RxAndroid的依赖包
implementation 'java3:rxjava:3.0.0'
RxJava 在很长⼀段时间⾥以java6 为baline( Android 运⾏时⽀持的锅),但在即将到来的 Android Studio 4预览中,⼀个叫做desuging 的过程能够将许多 Java 7和8的特性,透明地转换成与 Java 6兼容的特性。因此我们可以将 RxJava 的基准提⾼到 java 8,并为许多 Java 8构造增加官⽅⽀持⽐如:Optional、Stream等,因此必须将项⽬的编译⽬标设置更改为 java8:
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
⼆ Rx中的⼀些概念
2.1 字段含义
Reactive 直译为反应性的,有活性的,根据上下⽂⼀般翻译为反应式、响应式
Iterable 可迭代对象,⽀持以迭代器的形式遍历,许多语⾔中都存在这个概念
Obrvable 可观察对象,在Rx中定义为更强⼤的Iterable,在观察者模式中是被观察的对象,⼀旦数据产⽣或发⽣变化,会通过某种⽅式通知观察者或订阅者
Obrver 观察者对象,监听Obrvable发射的数据并做出响应,Subscriber是它的⼀个特殊实现
emit 直译为发射,发布,发出,含义是Obrvable在数据产⽣或变化时发送通知给Obrver,调⽤Obrver对应的⽅法,⽂章⾥⼀律译为发射
items 直译为项⽬,条⽬,在Rx⾥是指Obrvable发射的数据项,⽂章⾥⼀律译为数据,数据项。
2.2 上/下流
在RxJava中,数据以流的⽅式组织:Rxjava包括⼀个源数据流,源数据流后跟着若⼲个⽤于消费数据流的步骤。
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
在代码中,对于operator2来说,在它前⾯叫做上流,在它后⾯的叫做下流。
2.3 流对象
在RxJava的⽂档中,emission, emits, item, event, signal, data and message都被认为在数据流中被传递的数据对象。
2.4 背压(Backpressure)
当上下游在不同的线程中,通过Obrvable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,⽽是存放在⼀个异步缓存池中,如果缓存池中的数据⼀直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。
为此,RxJava带来了backpressure的概念。背压是⼀种流量的控制步骤,在不知道上流还有多少数据的情形下控制内存的使⽤,表⽰它们还能处理多少数据。背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,⼀种告诉上游的被观察者降低发送速度的策略
在Rxjava1.0中,有的Obrvable⽀持背压,有的不⽀持,为了解决这种问题,2.0把⽀持背压和不⽀持背压的Obrvable区分开来:⽀持背压的有Flowable类,不⽀持背压的有Obrvable,Single, Maybe and Completable类。
1. 在订阅的时候如果使⽤FlowableSubscriber,那么需要通过s.request(Long.MAX_VALUE)去主动请求上游的数据项。如果遇到背
压报错的时候,FlowableSubscriber默认已经将错误try-catch,并通过onError()进⾏回调,程序并不会崩溃;
2. 在订阅的时候如果使⽤Consumer,那么不需要主动去请求上游数据,默认已经调⽤了s.request(Long.MAX_VALUE)。如果遇到背
第四代战机
压报错、且对Throwable的Consumer没有new出来,则程序直接崩溃;
3. 背压策略的上游的默认缓存池是128。
背压策略:
1. error, 缓冲区⼤概在128
2. buffer, 缓冲区在1000左右
3. drop, 把存不下的事件丢弃
4. latest, 只保留最新的
5. missing, 缺省设置,不做任何操作
public enum BackpressureStrategy {
/**上海旅游景点推荐
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Uful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in ca the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
课程设计致谢
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
2.5 线程调度器(Schedulers)
对于Android开发者⽽⾔,RxJava最简单的是通过调度器来⽅便地切换线程。在不同平台还有不同的调度器,例如我们Android的主线程:AndroidSchedulers.mainThread()。
调度器功能
AndroidSchedulers.mainThread()需要引⽤rxandroid, 切换到UI线程
杨老爷
Schedulers.io()⽤于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需求,它默认是⼀个wThread()为每⼀个任务创建⼀个新线程
Scheduler.from(executor)指定Executor作为调度器
2.6 事件调度器
RxJava事件发出去并不是置之不顾,要有合理的管理者来管理它们,在合适的时机要进⾏释放事件,这样才不会导致内存泄漏,这⾥的管理者我们称为事件调度器(或事件管理者)CompositeDisposable。
2.7 基类
RxJava 3 中的基类相⽐RxJava 2 没啥改变,主要有以下⼏个基类:
2.8 Obrvables的"热"和"冷"
Obrvable什么时候开始发射数据序列?这取决于Obrvable的实现,⼀个"热"的Obrvable可能⼀创建完就开始发射数据,因此所有后续订阅它的观察者可能从序列中间的某个位置开始接受数据(有⼀些数据错过了)。⼀个"冷"的Obrvable会⼀直等待,直到有观察者订阅它才开始发射数据,因此这个观察者可以确保会收到整个数据序列。
在⼀些ReactiveX实现⾥,还存在⼀种被称作Connectable的Obrvable,不管有没有观察者订阅它,这种Obrvable都不会开始发射数据,除⾮Connect⽅法被调⽤。
三 RxJava的简单使⽤
需要知道的是,RxJava以观察者模式为⾻架,有两种常见的观察者模式:
Obrvable(被观察者)/Obrver(观察者)
Flowable(被观察者)/Subscriber(观察者)
RxJava2/3中,Obrveable⽤于订阅Obrver,是不⽀持背压的,⽽Flowable⽤于订阅Subscriber,是⽀持背压(Backpressure)的。
3.1 Obrvable/Obrver
Obrvable正常⽤法:
Obrvable ate(new ObrvableOnSubscribe<Integer>() {梦见白老鼠
@Override
public void subscribe(ObrvableEmitter<Integer> e) throws Exception {
}
});
Obrver mObrver=new Obrver<Integer>() {
//这是新加⼊的⽅法,在订阅后发送数据之前,
/
/回⾸先调⽤这个⽅法,⽽Disposable可⽤于取消订阅
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("lucas", "onNext: "+value );
}
反义疑问句的用法归纳
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObrvable.subscribe(mObrver);
这种观察者模型不⽀持背压:当被观察者快速发送⼤量数据时,下游不会做其他处理,即使数据⼤量堆积,调⽤链也不会报MissingBackpressureException,消耗内存过⼤只会OOM。所以,当我们使⽤Obrvable/Obrver的时候,我们需要考虑的是,数据量是不是很⼤(官⽅给出以1000个事件为分界线作为参考)。
3.2 Flowable/Subscriber