RxJava中有四个概念:
- Observable:被观察者
- Observer :观察者
- Subscribe:
- 订阅:
- Observable和Observer通过subscribe()方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer(即被观察者通知观察者)
- 订阅:
- 事件:由被观察者发出的东西。通常都是一个对象。
RxJava与传统观察者模式异同:
多了以下方法:
事件完成
onCompleted():
- 事件队列完结后执行的方法,也就是观察者与被观察者的联系结束以后回调的方法。(可以想象成,被观察者发送完事件,观察者处理完了,就该回调这个方法了)
事件错误
onError(Throwable e):
- 事件队列异常的时候执行的方法,当联系发生问题是,进行回调的方法,通常会带上出错的异常信息(在被观察者发送、观察者处理过程中出现问题时,该回调这个方法)
- 上述两个方法中总有一个会被回调,并且两者是互斥的(也就是有且只有其中一个能够被回调)。
Rx事件模型:
- Rx事件模型图:
首先了解几个概念:
RX事件流:
- 首先Rx的整个流程,我们可以看作是一条小河,河水从上游一直不断地往下游流去。河中的水,就是我们发射的事件,从上游发出,流经下游处理。在这个抽象过程中,上游就是被观察者,下游就是观察者,喝水就是事件。
上游:
- 被观察者(Observable),被观察者在上游,将数据通过河流传递出去
下游:
- 观察者(Observer),观察者在下游,接收上游流下来的数据
联系:
是什么让上下游之间产生联系呢?
他们通过下述方法建立联系.
Observable.subscribe(Observer) //被观察者.subscribe(观察者)
- 从上述代码,可以简单理解成观察者模式中的基本思想:让被观察者持有观察者的引用。
事件:
- 由被观察者发出的数据
- 事件其实就是对象,我们可以把要传递的数据封装成对象,就能传递数据了
- 想象一下,联网后获取到的JSON数据,转换成Bean对象后,就可以当做事件,发送给下游的观察者,然后开始搞事情了。
事件的传递:
- 事件就是从上游(被观察者)发出,流到下游(观察者)处,下游(观察者)接收到事件后做出相应的响应的过程。
基本使用:
创建上游(被观察者:Observable)
- 创建上游有多种方式:
单个创建:
- 先看代码:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
// 调用onNext()函数发射事件
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onComplete();
}
});
- 使用create()方法的创建被观察者方式,需要在上述subscribe()中手动调用onNext()发送事件,发送完后,需要手动调用onComplete()。
- 较为麻烦,因此不推荐使用
概念:
事件发射器:
ObservableEmitter //事件发射器
顾名思义,事件发射器用于发射事件,可以发射以下三种类型的事件:
onNext(T value) //正常地将某个事件发射出去(T:事件的类型支持泛型)
> onComplete() //事件流结束后发射一个完成的消息 > onError(Throwable e) //事件流过程中出现错误发射异常信息 > //显然后面两个方法是上游专门用来通知下游事件流是否成功的
事件发送规则:
发射事件动作:
onNext()
- 可以被上游无限发送、下游也可以无限接收
事件发射完成
onComplete():
- 此动作之后,上游可以继续发送事件,但是下游在此之后,将不再继续接收事件
- 此动作进行多次不造成错误,下游只在第一次响应后不再响应。
- 上游可以不做此动作(也就是说上游完全可以不告诉下游事件流是否发送完毕),哈哈,你们愚蠢的人类就干等着吧!!!
事件发射过程中出错
onError():
- 此动作之后,上游可以继续发送事件,但是下游在此之后,讲不再鸡血接收事件。
- 此动作进行多次将造成错误,即下游接收第二个onError会导致程序崩溃
- 上游可以不做此动作(也就是说上游完全可以不告诉下游事件流是否发送完毕),和Java异常一样的,如果上游不会掉onError,异常就会抛出去,一直往上抛到栈顶崩溃。
逐一发射:
- 先看代码:
Observable.just("事件0", "事件1", "事件2", "事件3",
"事件4", "事件5", "事件6", "事件7",
"事件8", "事件9").subscribe(Observer对象);
- just()方法看上去就比create()要方便多了,它接收最高10个参数,然后将传入的参数逐一发射到河流里去
迭代发射:
- 先看代码:
List<String> strings = new ArrayList<>();
for (int i = 0; i < 10; i++) {
strings.add("事件" + i);
}
Observable.fromIterable(strings)
.subscribe(Observer对象);
- fromIterable()方法可以接收一个可迭代的List对象等,将List中的元素当做事件,逐一发射到河流里去
创建下游(观察者:Observer):
- 先看代码:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
概念:
订阅回调
- 订阅回调指的是下游的on开头的方法都是回调方法,分别对应上游的方法进行回调。
- 如:
- 上游的单个创建方式中,调用的onNext()会回调下游的onNext()
- 上游的单个创建方式中,调用的onComplete()会对应回调下游的onCompete(),诸如此类
Disposable:
从这个单词的字面意思是单次使用,用完就销毁了。
如下图,我们可以将Disposable理解成拦河大坝。
当调用这个类的dispose()方式时,相当于将上下游的连接切断(相当于在小河上下游之间建起了拦河大坝,所有事件都无法流通到下游了),此时会导致下游不再接收事件
拦河大坝只是简单的拦截事件在河流中的传递,上游还将持续发送事件,只是下游收不到了
请看代码:
Disposable mDisposable; Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { Log.d(TAG, "上游 调用onNext发射:事件1"); LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件1"); // 发射事件 e.onNext("事件1"); Log.d(TAG, "上游 调用onNext发射:事件2"); LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件2"); // 发射事件 e.onNext("事件2"); Log.d(TAG, "上游 调用onNext发射:事件3"); LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件3"); // 发射事件 e.onNext("事件3"); Log.d(TAG, "上游 调用onNext发射:事件4"); LogUtils.log2ConsoleOuputView(mConsoleOutput, "上游 调用onNext发射:事件4"); // 发射事件 e.onNext("事件4"); e.onComplete(); } }).subscribe(new Observer<String>() { Disposable mDisposable; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "上下游已连接: "); LogUtils.log2ConsoleOuputView(mConsoleOutput, "上下游已连接"); mDisposable = d;//将Disposable提升为成员变量,便于在其他地方调用 CompositeDisposable compositeDisposable = new CompositeDisposable(); compositeDisposable.add(d); LogUtils.log2ConsoleOuputView(mConsoleOutput, "拦河大坝已建立"); } @Override public void onNext(String string) { Log.d(TAG, "下游 onNext: " + string); LogUtils.log2ConsoleOuputView(mConsoleOutput, "下游 onNext 接收到: " + string); if (string.equals("事件2")) { mDisposable.dispose(); Log.d(TAG, "拦河大坝关上水闸"); } } @Override public void onError(Throwable e) { Log.d(TAG, "下游 onError: " + e.getMessage()); LogUtils.log2ConsoleOuputView(mConsoleOutput, "下游 onError 接收到: " + e.getMessage()); } @Override public void onComplete() { Log.d(TAG, "下游 onComplete: "); LogUtils.log2ConsoleOuputView(mConsoleOutput, "下游 onComplete 订阅完成: "); } });
Disposable容器:
所谓容器就是可以存放Disposable对象的类。我们将Disposable对象存放到容器中就方便对这些拦河大坝统一管理,什么时候开闸放水、蓄水都能够统一到一起。
CompositeDisposable:
如果有多个Disposable的时候,RxJava内置了容器,当我们得到一个Disposable的时候,加入到容器中:
CompositeDisposable.add()
随后,退出的时候,直接使用下述代码就能切断所有联系:
CompositeDisposable.clear()
- 清除容器中所有的Disposable随想,同时截断所有河流。
- clear()的源码中,我们可以看到,它便利了整个容器,将容器中的每一个Disposable对象,都调了一次dispose(),掐断了联系。
实现订阅(Subscribe)
创建了Observable和Observer之后,使用一个方法将两者关联起来就是实现订阅了:
实现订阅:
subscribe(): //实现订阅的API observable.subscribe(observer);
- 实现订阅,就是让上下游之间建立联系,上游发出的数据通过订阅的实现,才能传送到下游的手中
概念:
完整订阅:
- 完整订阅表示的是上游发送的事件,下游都接收并且处理。同时,在处理结束后,onComplete()方法被回调。或者发生了错误,回调了onError()方法。
不完整订阅:
- 不完整订阅表示的是,上游发送的事件,下游接收处理。但是,也许是开发者比较自信,也许是别的原因。开发者不需要onComplete()或者onError()的回调,这也是可以的。
subscribe()方法的多个重载帮助我们实现了完整、不完整订阅:
```java public final Disposable subscribe() {} //不论上游发射什么事件,我都不处理,即上游只发送数据,下游就是不管 public final Disposable subscribe(Consumer<? super T> onNext) //只处理上游发射过来的onNext()事件 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) //只处理上游发射过来的onNext()、onError()事件 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) //只处理上游发射过来的onNext()、onError()、onComplete()事件 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) //只处理上游发射过来的onNext()、onError()、onComplete()、onSubscribe事件 public final void subscribe(Observer<? super T> observer) //完整订阅 ```
- 上述代码可以看出,RxJava给我们的自由度还是很高的,不论下游是否处理事件、是否需要完成提示、是否需要异常处理,都可以进行定制。
链式调用:
- 现在,上下游以及河流都已经备齐了,接下来需要介绍一下,RxJava第一个让人觉得使用比较方便的地方
- 链式调用比较类似Java设计模式中的Builder设计模式,其允许我们一气呵成地书写API,既方便阅读,也显得雅观。现在,让我们开始优雅地编程吧!
// 单一创建
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
Log.d(TAG, "上游调用onNext1");
e.onNext("事件1");
Log.d(TAG, "上游调用onNext2");
e.onNext("事件2");
Log.d(TAG, "上游调用onNext3");
e.onNext("事件3");
Log.d(TAG, "上游调用onNext4");
e.onNext("事件4");
Log.d(TAG, "上游调用onComplete");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.d(TAG, "下游onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "下游onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "下游onComplete: ");
}
});
// 逐一发射
Observable.just("事件1", "事件2")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.d(TAG, "下游onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "下游onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "下游onComplete: ");
}
});
Demo代码:
- Demo代码已经上传到Github,请自行获取