Flowable:

  • 前面一章,我们介绍了什么是BackPressure,随后在结尾的时候提到了Flowable。它是一种新的上下游订阅关系。通过它,我们能够结合BackPressure对事件的发射和处理进行同步。

  • 到这里我们要转换一下概念:

    • 被观察者(Observable)

      Flowable

    • 观察者(Observer)

      Subscriber

    • 订阅

      Subscribe

    • 事件:

      由被观察者发出的东西。通常都是一个对象。

    • 以上:

      • Flowable和Subscriber通过subscribe()方法实现订阅关系,从而Flowable可以在需要的时候发出事件来通知Subscriber
      • 到这里我们可以看到,使用Flowable的方式和Observable的方式是差不多的,只是API的名称做了修改。

基本实现:

创建Flowable(被观察者):

  • 先看代码:
        Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
                @Override
                public void subscribe(FlowableEmitter<String> e) throws Exception {
                    e.onNext("事件1");
                    e.onNext("事件2");
                    e.onNext("事件3");
                    e.onNext("事件4");
                    e.onComplete();
                }
            }, BackpressureStrategy.ERROR);
  • 新增参数:

    BackpressureStrategy

    • 这个参数表示的是BackPressure的属性。
    • 用于解决我们上下游的流速不均匀的时候的情况。
    • 在上一篇文章中,我们说到了BackPressure的概念,而控制BackPressure属性的参数,就是BackPressureStrategy

BackpressureStrategy:

  • 我们知道,解决上下游流速不均匀的问题,主要是让下游控制上游的流速。下游告诉上游,我的处理方式,以及我能处理的个数,随后上游将对应的数据发送到河流里。这样就解决了流速问题

  • 流速控制:

    • 使用默认的队列大小(128)缓存事件,缓存不下后抛出异常

      BackpressureStrategy.ERROR

    • 缓存事件的为unlimited,因此使用这个方式和Observable的方式是一致的,将会无限存储事件,直到OOM

      BackpressureStrategy.BUFFER

    • 当默认缓存队列大小存不下事件的时候,新发射的事件会被丢弃

      BackpressureStrategy.DROP

    • 当默认缓存队列大小存不下时间的时候,把最早进入队列的事件丢弃,缓存最新生成的。

      BackpressureStrategy.LATEST

创建Subscriber:

  • 先看代码:
        Subscriber<String> subscriber = new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe: ");
                    s.request(Long.MAX_VALUE);
                }
                @Override
                public void onNext(String string) {
                    Log.d(TAG, "onNext: " + string);
                }
                @Override
                public void onError(Throwable t) {
                    Log.d(TAG, "onError: " + t.toString());
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            };
  • 变更参数:

    • Subscription:

      • 和Observer不同的地方是,Disposable被替换成了Subscription

        • 取消订阅的API修改成了以下方法:

          Subscription.cancel()

        • 并且,Subscription新增了一个方法:

          void request(long n)

      • 新增API原因:

        • 结合BackPressure,解决流速问题:
          • 这种设计,是为了适应上下游流速不同的问题。我们都知道上下游实现订阅以后,会回调onSubscribe()方法,此时,下游采取调用request()方法,告诉上游我能够处理多少事件。随后上游将事件进行派发。这就解决了前面BackPressure中“减缓流速会导致整个api流性能下降”、“减少进入队列的事件造成的事件丢失”的问题。
        • 传入的数值表示的就是从上游取得多少个事件进行处理
      • onSubscribe中如果不调用上述方法会有两种情况:

        • 抛异常MissingBackpressureException。
          • 这种情况出现在上下游都在同一个线程中。
          • 这是一个没有指定BackPressure参数时会报的异常。
        • 下游无法接收数据
          • 这种情况出现在上下游不同线程时
          • 没有抛异常是因为Flowable内部有一个size为128大小的缓存队列,只有发射事件超出Flowable的队列大小,并且下游一直没有进行处理的时候(没有进行request()),才会报错终止。因此,我们可以通过控制这个缓存队列的大小,以及使用request()进行控制下游处理事件,就能很有效的解决上下游流速不同的问题。

 评论