RxJava中有四个概念:

  • Observable:被观察者
  • Observer :观察者
  • Subscribe:
    • 订阅:
      • Observable和Observer通过subscribe()方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer(即被观察者通知观察者)
  • 事件:由被观察者发出的东西。通常都是一个对象。

RxJava与传统观察者模式异同:

  • 多了以下方法:

    • 事件完成

      onCompleted():

      • 事件队列完结后执行的方法,也就是观察者与被观察者的联系结束以后回调的方法。(可以想象成,被观察者发送完事件,观察者处理完了,就该回调这个方法了)
    • 事件错误

      onError(Throwable e):

    - 事件队列异常的时候执行的方法,当联系发生问题是,进行回调的方法,通常会带上出错的异常信息(在被观察者发送、观察者处理过程中出现问题时,该回调这个方法)

    • 上述两个方法中总有一个会被回调,并且两者是互斥的(也就是有且只有其中一个能够被回调)。

Rx事件模型:

  • Rx事件模型图:
    img

首先了解几个概念:

RX事件流:

  • 首先Rx的整个流程,我们可以看作是一条小河,河水从上游一直不断地往下游流去。河中的水,就是我们发射的事件,从上游发出,流经下游处理。在这个抽象过程中,上游就是被观察者,下游就是观察者,喝水就是事件。

上游:

  • 被观察者(Observable),被观察者在上游,将数据通过河流传递出去

下游:

  • 观察者(Observer),观察者在下游,接收上游流下来的数据

联系:

  • 是什么让上下游之间产生联系呢?

    • 他们通过下述方法建立联系.

      Observable.subscribe(Observer) //被观察者.subscribe(观察者)

      • 从上述代码,可以简单理解成观察者模式中的基本思想:让被观察者持有观察者的引用。

事件:

  • 由被观察者发出的数据
  • 事件其实就是对象,我们可以把要传递的数据封装成对象,就能传递数据了
  • 想象一下,联网后获取到的JSON数据,转换成Bean对象后,就可以当做事件,发送给下游的观察者,然后开始搞事情了。

事件的传递:

  • 事件就是从上游(被观察者)发出,流到下游(观察者)处,下游(观察者)接收到事件后做出相应的响应的过程。

基本使用:

创建上游(被观察者:Observable)

  • 创建上游有多种方式:

单个创建:

  • 先看代码:
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                    // 调用onNext()函数发射事件
                    e.onNext("事件1");
                    e.onNext("事件2");
                    e.onNext("事件3");
                    e.onComplete();
                }
            });
-   使用create()方法的创建被观察者方式,需要在上述subscribe()中手动调用onNext()发送事件,发送完后,需要手动调用onComplete()。
-   较为麻烦,因此不推荐使用

  1. 概念:

    • 事件发射器:

      ObservableEmitter //事件发射器

    • 顾名思义,事件发射器用于发射事件,可以发射以下三种类型的事件:

      onNext(T value) //正常地将某个事件发射出去(T:事件的类型支持泛型)

    > onComplete() //事件流结束后发射一个完成的消息 > onError(Throwable e) //事件流过程中出现错误发射异常信息 > //显然后面两个方法是上游专门用来通知下游事件流是否成功的

  2. 事件发送规则:

    • 发射事件动作:

      onNext()

      • 可以被上游无限发送、下游也可以无限接收
    • 事件发射完成

      onComplete():

      • 此动作之后,上游可以继续发送事件,但是下游在此之后,将不再继续接收事件
      • 此动作进行多次不造成错误,下游只在第一次响应后不再响应。
      • 上游可以不做此动作(也就是说上游完全可以不告诉下游事件流是否发送完毕),哈哈,你们愚蠢的人类就干等着吧!!!
    • 事件发射过程中出错

      onError():

      • 此动作之后,上游可以继续发送事件,但是下游在此之后,讲不再鸡血接收事件。
      • 此动作进行多次将造成错误,即下游接收第二个onError会导致程序崩溃
      • 上游可以不做此动作(也就是说上游完全可以不告诉下游事件流是否发送完毕),和Java异常一样的,如果上游不会掉onError,异常就会抛出去,一直往上抛到栈顶崩溃。
  3. Github地址

逐一发射:

  • 先看代码:
        Observable.just("事件0", "事件1", "事件2", "事件3",
                        "事件4", "事件5", "事件6", "事件7",
                        "事件8", "事件9").subscribe(Observer对象);
-   just()方法看上去就比create()要方便多了,它接收最高10个参数,然后将传入的参数逐一发射到河流里去

  1. Github地址

迭代发射:

  • 先看代码:
        List<String> strings = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            strings.add("事件" + i);
        }
        Observable.fromIterable(strings)
        .subscribe(Observer对象);
-   fromIterable()方法可以接收一个可迭代的List对象等,将List中的元素当做事件,逐一发射到河流里去

  1. Github地址

创建下游(观察者:Observer):

  • 先看代码:
        Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
                @Override
                public void onNext(String string) {
                    Log.d(TAG, "onNext: " + string);
                }
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: " + e.getMessage());
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            };
  1. 概念:

    • 订阅回调

      • 订阅回调指的是下游的on开头的方法都是回调方法,分别对应上游的方法进行回调。
      • 如:
        • 上游的单个创建方式中,调用的onNext()会回调下游的onNext()
        • 上游的单个创建方式中,调用的onComplete()会对应回调下游的onCompete(),诸如此类
    • Disposable:

      • 从这个单词的字面意思是单次使用,用完就销毁了。

      • 如下图,我们可以将Disposable理解成拦河大坝。

      • 当调用这个类的dispose()方式时,相当于将上下游的连接切断(相当于在小河上下游之间建起了拦河大坝,所有事件都无法流通到下游了),此时会导致下游不再接收事件

      • 拦河大坝只是简单的拦截事件在河流中的传递,上游还将持续发送事件,只是下游收不到了

      • 请看代码:

        
        				Disposable mDisposable;
        				Observable.create(new ObservableOnSubscribe<String>() {
        							@Override
        							public void subscribe(ObservableEmitter<String> e) throws Exception {
        								Log.d(TAG, "上游 调用onNext发射:事件1");
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件1");
        								// 发射事件
        								e.onNext("事件1");
        
        								Log.d(TAG, "上游 调用onNext发射:事件2");
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件2");
        								// 发射事件
        								e.onNext("事件2");
        
        								Log.d(TAG, "上游 调用onNext发射:事件3");
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件3");
        								// 发射事件
        								e.onNext("事件3");
        
        								Log.d(TAG, "上游 调用onNext发射:事件4");
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件4");
        								// 发射事件
        								e.onNext("事件4");
        
        								e.onComplete();
        
        							}
        						}).subscribe(new Observer<String>() {
        							Disposable mDisposable;
        
        							@Override
        							public void onSubscribe(Disposable d) {
        								Log.d(TAG, "上下游已连接: ");
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "上下游已连接");
        								mDisposable = d;//将Disposable提升为成员变量,便于在其他地方调用
        								CompositeDisposable compositeDisposable = new CompositeDisposable();
        								compositeDisposable.add(d);
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "拦河大坝已建立");
        							}
        
        							@Override
        							public void onNext(String string) {
        								Log.d(TAG, "下游 onNext: " + string);
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "下游 onNext 接收到: " + string);
        								if (string.equals("事件2")) {
        									mDisposable.dispose();
        									Log.d(TAG, "拦河大坝关上水闸");
        								}
        							}
        
        							@Override
        							public void onError(Throwable e) {
        								Log.d(TAG, "下游 onError: " + e.getMessage());
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "下游 onError 接收到: " + e.getMessage());
        							}
        
        							@Override
        							public void onComplete() {
        								Log.d(TAG, "下游 onComplete: ");
        								LogUtils.log2ConsoleOuputView(mConsoleOutput, "下游 onComplete 订阅完成: ");
        							}
        						});
        
    • Disposable容器:

      • 所谓容器就是可以存放Disposable对象的类。我们将Disposable对象存放到容器中就方便对这些拦河大坝统一管理,什么时候开闸放水、蓄水都能够统一到一起。

      • CompositeDisposable:

        • 如果有多个Disposable的时候,RxJava内置了容器,当我们得到一个Disposable的时候,加入到容器中:

          CompositeDisposable.add()
          
        • 随后,退出的时候,直接使用下述代码就能切断所有联系:

          CompositeDisposable.clear()
          
          
          • 清除容器中所有的Disposable随想,同时截断所有河流。
          • clear()的源码中,我们可以看到,它便利了整个容器,将容器中的每一个Disposable对象,都调了一次dispose(),掐断了联系。
  2. Github地址

实现订阅(Subscribe)

  • 创建了Observable和Observer之后,使用一个方法将两者关联起来就是实现订阅了:

    • 实现订阅:

      subscribe(): //实现订阅的API observable.subscribe(observer);

      • 实现订阅,就是让上下游之间建立联系,上游发出的数据通过订阅的实现,才能传送到下游的手中

概念:

  • 完整订阅:

    • 完整订阅表示的是上游发送的事件,下游都接收并且处理。同时,在处理结束后,onComplete()方法被回调。或者发生了错误,回调了onError()方法。
  • 不完整订阅:

    • 不完整订阅表示的是,上游发送的事件,下游接收处理。但是,也许是开发者比较自信,也许是别的原因。开发者不需要onComplete()或者onError()的回调,这也是可以的。
  • subscribe()方法的多个重载帮助我们实现了完整、不完整订阅:

    ```java
    public final Disposable subscribe() {}
    	//不论上游发射什么事件,我都不处理,即上游只发送数据,下游就是不管
    public final Disposable subscribe(Consumer<? super T> onNext)
    	//只处理上游发射过来的onNext()事件
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    	//只处理上游发射过来的onNext()、onError()事件
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
    	//只处理上游发射过来的onNext()、onError()、onComplete()事件
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
    	//只处理上游发射过来的onNext()、onError()、onComplete()、onSubscribe事件
    public final void subscribe(Observer<? super T> observer)
    	//完整订阅
    ```
    
    
    • 上述代码可以看出,RxJava给我们的自由度还是很高的,不论下游是否处理事件、是否需要完成提示、是否需要异常处理,都可以进行定制。

链式调用:

  • 现在,上下游以及河流都已经备齐了,接下来需要介绍一下,RxJava第一个让人觉得使用比较方便的地方
  • 链式调用比较类似Java设计模式中的Builder设计模式,其允许我们一气呵成地书写API,既方便阅读,也显得雅观。现在,让我们开始优雅地编程吧!
        // 单一创建
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                Log.d(TAG, "上游调用onNext1");
                e.onNext("事件1");
                Log.d(TAG, "上游调用onNext2");
                e.onNext("事件2");
                Log.d(TAG, "上游调用onNext3");
                e.onNext("事件3");
                Log.d(TAG, "上游调用onNext4");
                e.onNext("事件4");
                Log.d(TAG, "上游调用onComplete");
                e.onComplete();

            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String string) {
                Log.d(TAG, "下游onNext: " + string);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "下游onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "下游onComplete: ");
            }
        });

        // 逐一发射
        Observable.just("事件1", "事件2")
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String string) {
                        Log.d(TAG, "下游onNext: " + string);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "下游onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "下游onComplete: ");
                    }
                });
  1. Github地址

Demo代码:

  • Demo代码已经上传到Github,请自行获取

 评论