RxJava背景
RxJava是NetFlix出品的Java框架, 官方描述为 a library for composing asynchronous and event-based programs using observable sequences for the Java VM,翻译过来就是“使用可观察序列组成的一个异步地、基于事件的响应式编程框架”。
RxJava可以解决什么问题?
主要是对数据流的组合变换非常强大和方便。
- 各种偏函数式风格的操作符
- 冷热流
- 异步转同步
- 解决回调地狱
主要缺点是学习和理解成本过高。
RxJava线程切换的原理
RxJava链式调用,采用的都是装饰者模式,对所有的操作符都是对Observable和Observer对象的装饰,达到实现额外效果的目的。
线程切换有个两个方法,一个是subscribeOn,一个是observeOn。
subscribeOn用于设置Observable开始执行时所在的线程。
observeOn用于设置从该操作符调用处开始下游操作符所在的线程。
subscribeOn调用时需要传递一个Scheduler调度器对象,Observable开始执行时的逻辑是在Observable的subscribe()方法里的,subscribeOn的实现原理是把Observable的subscribe()这个动作封装到一个Runnbale的run方法中,然后把这个Runnable给Scheduler调度器进行线程调度。底层用的线程池调度。
Observable的subscribe()中会执行OnSubscribe对象的call方法,call方法里会执行创建操作符里的内容。
observeOn是把下游Observer的onNext、onError等逻辑封装在一个Runnable任务里面,再由传入的Scheduler调度器创建一个Worker对象进行调度。
链式调用是怎么实现的?
每个操作符都会创建一个新的Observable对象
RxJava 订阅执行的过程?
所有的创建操作符,比如Observable的create、just、from,都会创建一个onSubscribe类封装开始的逻辑。
在调用Observable的subscribe订阅的时候,会去执行Observable里的onSubscribe的call方法。
call方法会执行创建操作符里的内容。
创造操作符包括just、from、create、timer、range之类的。
使用案例
Cold Observable、Hot Observable
- Cold Observable
只有当有订阅者订阅的时候,它才开始发射数据。每个订阅者订阅的时候都独立的执行一遍数据流代码,如create、just、range、timer、from。 - Hot Observable
不管有没有订阅者订阅,它创建后就可以开始发射数据。
publish操作符
使用 publish 操作函数可以把 Cold Observable 转化为 Hot Observable,返回一个ConnectableObservable,ConnectableObservable 如果不调用 connect 函数则不会触发数据流的执行。
connect 函数返回的是一个 Subscription,如果调用这个 Subscription 的 unsubscribe 函数,可以停止把数据转发给 Observer,但是这些 Observer 并没有从 ConnectableObservable 上取消注册,只是停止接收数据了。再次调用 connect 来重新开始订阅,会创建一个新的订阅,如果源 Observable 为 Cold Observable 则数据流会重新执行一遍。
refCount操作符
ConnectableObservable.refCount 返回一个特殊的 Observable, 这个 Observable 只要有订阅者就会继续发射数据。
如果没有订阅者订阅到 refCount 返回的 Observable,则不会执行数据流的代码。如果所有的订阅者都取消订阅了,则数据流停止。重新订阅再回重新开始数据流。
share操作符
先执行publish后执行refCount
replay操作符
和 ReplaySubject 类似,不论订阅者在何时订阅,都会收到所有已发射的数据。它有重载函数,可以指定缓存多少个已发射的数据。和 publish 一样也返回一个 ConnectableObservable。
Subject
同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable(调用别的Observable的suscribe方法,参数传本Subject实例);又因为它是一个Observable,它可以转发它收到的数据(先订阅到别的Observable,别的Observable发射数据会调用本Subject实例的onNext方法,然后本Subject实例再转发这个数据给订阅本Subject实例的Observer),也可以手动调用onNext()发射新的数据。
由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是Cold Observable,只有当有订阅者订阅的时候, Cold Observable才开始发射数据)。因此有这样的效果,Subject可以把原来那个”冷”的Observable变成”热”的。就像广播电台一样,不会因为你关了收音机就停止广播,在收听的人就听到了,没在的人也无所谓。
Subject 解决的需求就是不确定什么时候可以发出事件。
假设我们有一些数据希望通过 RxJava 来发出,但我们并不确定这些数据什么时候到来、以及有多少数据。显然 just() 和 from() 不能满足需求,但我们又不想用 create() ,因为它会带来一些其他的问题。
最好的办法就是有一个对象既是 Observable ,这样我们就可以去订阅它并进行一系列操作,它又是一个 Observer ,这样我们就可以向它发出数据以及结束事件了,这种组合就是现在被称作 Subject 的类。
Subject种类
AsyncSubject
一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(如果原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。
然而,如果原始的Observable因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
BehaviorSubject
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
BehaviorSubject 收到 onError/onCompleted 之后,保存的数据就会被丢弃了,后来的 Subscriber 只会收到 onError/onCompleted 了。
PublishSubject
只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。
如果原始的Observable因为发生了一个错误而终止,PublishSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
需要注意的是,PublishSubject可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生,它属于Hot Observable),因此这里有一个风险:在Subject被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create创建那个Observable以便手动给它引入”冷”Observable的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。
作用:当一个界面发生改变,通知另一个界面做出响应。
ReplaySubject
发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
假设你是一个电视内容发布商,你每周都会发布大量的内容。但是你的客户( 观众 )并不能一次性看完所有的内容,但他们也许想错过任何内容。所以智能电视和机顶盒自己就提供了一种缓存功能,它们会保存所有的内容,然后让用户按照自己的速度观看完所有的内容。
注意事项
- Subjcet 不是线程安全的,不要从多个线程中调用它的onNext方法,这可能导致同时(非顺序)调用,给Subject的结果增加了不确定性。要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject。
- Subject 使事件的发送变得不可预知。无论是Observable.create, Observable.from 还是 Observable.just , 这些 Cold Observable 都有一个显著的优点就是数据的来源可预知,我知道将会发送哪些数据,这些数据是什么类型。但是Subject就不一样,我如果创建一个Subject,那么代码任何地方只要能 Get 到这个引用,就可以随意使用它发射元素,滥用的后果导致代码越来越难以维护。
使用Subject时要确保:
- 这是一个 Hot Observable ,你要有对应措施保证不会错过临界的事件。
- 有对应的线程安全措施。
- 确保事件的发送源在掌控中,事件的发送完全可预期。
Backpressure
Backpressure 是用来描述,生产者生产数据的速度比消费者消费数据的速度快的一种情况。如果没有处理这种情况,则会出现 MissingBackpressureException 。
BackpressureMode 有如下几种策略:
- BUFFER(缓存)
使用无限个数的内部缓存,一开始会创建一个 128 个元素的缓冲对象,然后动态的扩展直到 JVM 内存不足。 - LATEST(使用最新的)
只发射最新生成的数据,之前的旧的数据被丢弃。类似于使用缓冲个数为 1 的缓存。
cold Observable 通常可以使用这种策略。比如 Andorid 里面的电量变化、或者 最近的位置信息就可以使用这种策略。之前旧的数据已经为无效数据直接丢弃就好。 - DROP(直接丢弃)
如果消费者无法处理数据,就把该数据丢弃了。 - ERROR / NONE
默认的不指定任何策略,会出现 MissingBackpressureException