Flowable:
前面一章,我们介绍了什么是BackPressure,随后在结尾的时候提到了Flowable。它是一种新的上下游订阅关系。通过它,我们能够结合BackPressure对事件的发射和处理进行同步。
到这里我们要转换一下概念:
被观察者(Observable)
Flowable
观察者(Observer)
Subscriber
订阅
Subscribe
事件:
由被观察者发出的东西。通常都是一个对象。
以上:
- Flowable和Subscriber通过subscribe()方法实现订阅关系,从而Flowable可以在需要的时候发出事件来通知Subscriber
- 到这里我们可以看到,使用Flowable的方式和Observable的方式是差不多的,只是API的名称做了修改。
基本实现:
创建Flowable(被观察者):
- 先看代码:
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onNext("事件4");
e.onComplete();
}
}, BackpressureStrategy.ERROR);
新增参数:
BackpressureStrategy
- 这个参数表示的是BackPressure的属性。
- 用于解决我们上下游的流速不均匀的时候的情况。
- 在上一篇文章中,我们说到了BackPressure的概念,而控制BackPressure属性的参数,就是BackPressureStrategy
BackpressureStrategy:
我们知道,解决上下游流速不均匀的问题,主要是让下游控制上游的流速。下游告诉上游,我的处理方式,以及我能处理的个数,随后上游将对应的数据发送到河流里。这样就解决了流速问题
流速控制:
使用默认的队列大小(128)缓存事件,缓存不下后抛出异常
BackpressureStrategy.ERROR
缓存事件的为unlimited,因此使用这个方式和Observable的方式是一致的,将会无限存储事件,直到OOM
BackpressureStrategy.BUFFER
当默认缓存队列大小存不下事件的时候,新发射的事件会被丢弃
BackpressureStrategy.DROP
当默认缓存队列大小存不下时间的时候,把最早进入队列的事件丢弃,缓存最新生成的。
BackpressureStrategy.LATEST
创建Subscriber:
- 先看代码:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe: ");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext: " + string);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: " + t.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
变更参数:
Subscription:
和Observer不同的地方是,Disposable被替换成了Subscription
取消订阅的API修改成了以下方法:
Subscription.cancel()
并且,Subscription新增了一个方法:
void request(long n)
新增API原因:
- 结合BackPressure,解决流速问题:
- 这种设计,是为了适应上下游流速不同的问题。我们都知道上下游实现订阅以后,会回调onSubscribe()方法,此时,下游采取调用request()方法,告诉上游我能够处理多少事件。随后上游将事件进行派发。这就解决了前面BackPressure中“减缓流速会导致整个api流性能下降”、“减少进入队列的事件造成的事件丢失”的问题。
- 传入的数值表示的就是从上游取得多少个事件进行处理
- 结合BackPressure,解决流速问题:
onSubscribe中如果不调用上述方法会有两种情况:
- 抛异常MissingBackpressureException。
- 这种情况出现在上下游都在同一个线程中。
- 这是一个没有指定BackPressure参数时会报的异常。
- 下游无法接收数据
- 这种情况出现在上下游不同线程时
- 没有抛异常是因为Flowable内部有一个size为128大小的缓存队列,只有发射事件超出Flowable的队列大小,并且下游一直没有进行处理的时候(没有进行request()),才会报错终止。因此,我们可以通过控制这个缓存队列的大小,以及使用request()进行控制下游处理事件,就能很有效的解决上下游流速不同的问题。
- 抛异常MissingBackpressureException。