当不指定线程的时候,RX遵循线程不变原则。
- 线程不变原则:
- 当我们在主线程中创建一个Observable发射事件,则上游默认在主线程发射时间
- 当我们在主线程中创建一个Observer来接收事件,则下游默认在主线程接收事件
- 如果需要切换线程,需要使用以下概念:
Scheduler:
- 相当于线程控制器,Rx通过它来指定每一段代码执行在什么样的线程中。
内置的几个Scheduler:
Scheduler.immediate():
- 直接在当前线程,这是默认情况( 仅在RxJava1.0中存在,RxJava2.0中已经删除此API )
Scheduler.newThread():
Scheduler.io():
- I/O操作(文件读写、数据库读写、网络信息交互等)所使用的Scheduler。
- 和newThread一样会启用RxJava准备的线程
- 区别在于:
- io()内部实现是用一个没有上限的线程池,可以重用空闲的线程,多数情况下,比newThread()要高效。
- 建议:不要把计算工作放在这个线程中
Scheduler.computation():
- 计算用的Scheduler。
- 指的是CPU密集型计算,也就是不会被I/O操作限制性能的操作,如图形计算。
- 使用固定的线程池,线程数量大小为CPU核数。
- 同理:I/O操作不要放到这个线程中
AndroidSchedulers.mainThread():
- android主线程运行
- android专用的线程,指明在android主线程运行
- 此API由RxAndroid库中提供,找不到的同学可以检查下是否已经依赖RxAndroid
线程控制
- 结合上述的Scheduler,就可以使用下列方法控制线程:
subscribeOn():
- 指定subscribe()所发生的线程,也就是上游事件发射的线程
- 只有第一次指定的时候生效
- 代码实例:
Observable.create(onSubscribe)
.subscribeOn(AndroidSchedulers.mainThread())//指定上游发射事件的操作,发生在UI线程
.subscribe(subscriber);
- 但是如果上游事件发射的时候我也想要切换线程怎么办?
- doOnsubscribe()
- Subscriber的onSubscribe()方法用于预处理初始化,但是由于onSubscribe()在subscribe()发生的时候就调用了,因此不能指定线程。
- 那么比如我想在预处理的时候就进行一次UI重绘,必须在主线程执行,这就会导致有线程非法的风险,因为我们不好控制我们当前所在线程
- 因此,引入doOnSubscribe()方法
- doOnsubscribe()可以指定线程。(我们将在文章后面讲解)
- 默认情况下,它运行在subscribe()发生的线程
- 而如果在doOnsubscribe()之后有subscribeOn()方法,它将执行在离它最近的subscribeOn()方法指定的线程中。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread())// 指定主线程,因此doOnsubscribe()方法执行在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
observeOn():
- 指定Subscriber所运行在的线程,也就是下游事件接收的线程
- 每进行一次指定,就会切换一次线程
- 从这可以看出来,我们在下游观察者处接收到被观察者发出来的数据时,可以多次切换线程,做不同的工作:
- 如:在io线程中异步联网,随后切换到UI线程中刷新联网获取到的数据
- observeOn()方法制定的是Subscriber之后的操作所在的线程,因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()就行了
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
标准线程代码:
// 创建一个上游的Observable(被观察者)
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
// 发射事件
Log.d(TAG, "Observable thread is :" + Thread.currentThread().getName());
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onComplete();
}
});
// 创建一个下游Observer(观察者)
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(/**/Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext: " + string);
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
// 上下游建立连接(观察者被观察者建立联系)
observable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()/* 指定下游接收事件的Thread */)
.subscribe(observer);
doOnSubscribe():
- Subscriber的onStart()方法用于预处理初始化,但是由于onStart()在subscribe()发生的时候就调用了,因此不能指定线程。
- 那么比如我想在预处理的时候就进行一次UI重绘,必须在主线程执行,这就会导致有线程非法的风险,因为我们不好控制我们当前所在线程
- 因此,引入doOnSubscribe()方法
- doOnsubscribe()可以指定线程。
- 实际上可以这么理解:doOnsubscribe()其实是抛出了一个额外的执行项
- 默认情况下,它运行在subscribe()发生的线程
- 而如果在doOnsubscribe()之后有subscribeOn()方法,它将执行在离它最近的subscribeOn()方法指定的线程中。
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
}
})
.subscribeOn(AndroidSchedulers.mainThread())// 指定主线程,因此doOnsubscribe()方法执行在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);