当不指定线程的时候,RX遵循线程不变原则。

  • 线程不变原则:
    • 当我们在主线程中创建一个Observable发射事件,则上游默认在主线程发射时间
    • 当我们在主线程中创建一个Observer来接收事件,则下游默认在主线程接收事件
  • 如果需要切换线程,需要使用以下概念:
    • Scheduler:调度器

Scheduler:

  • 相当于线程控制器,Rx通过它来指定每一段代码执行在什么样的线程中。

内置的几个Scheduler:

Scheduler.immediate():

  • 直接在当前线程,这是默认情况( 仅在RxJava1.0中存在,RxJava2.0中已经删除此API

Scheduler.newThread():

  • 总是启用新线程。

Scheduler.io():

  • I/O操作(文件读写、数据库读写、网络信息交互等)所使用的Scheduler。
  • 和newThread一样会启用RxJava准备的线程
  • 区别在于:
    • io()内部实现是用一个没有上限的线程池,可以重用空闲的线程,多数情况下,比newThread()要高效。
    • 建议:不要把计算工作放在这个线程中

Scheduler.computation():

  • 计算用的Scheduler。
  • 指的是CPU密集型计算,也就是不会被I/O操作限制性能的操作,如图形计算。
  • 使用固定的线程池,线程数量大小为CPU核数。
  • 同理:I/O操作不要放到这个线程中

AndroidSchedulers.mainThread():

  • android主线程运行
  • android专用的线程,指明在android主线程运行
  • 此API由RxAndroid库中提供,找不到的同学可以检查下是否已经依赖RxAndroid

线程控制

  • 结合上述的Scheduler,就可以使用下列方法控制线程:

subscribeOn():

  • 指定subscribe()所发生的线程,也就是上游事件发射的线程
  • 只有第一次指定的时候生效
  • 代码实例:
        Observable.create(onSubscribe)
        .subscribeOn(AndroidSchedulers.mainThread())//指定上游发射事件的操作,发生在UI线程
        .subscribe(subscriber);
  • 但是如果上游事件发射的时候我也想要切换线程怎么办?
    • doOnsubscribe()
      • Subscriber的onSubscribe()方法用于预处理初始化,但是由于onSubscribe()在subscribe()发生的时候就调用了,因此不能指定线程。
      • 那么比如我想在预处理的时候就进行一次UI重绘,必须在主线程执行,这就会导致有线程非法的风险,因为我们不好控制我们当前所在线程
      • 因此,引入doOnSubscribe()方法
      • doOnsubscribe()可以指定线程。(我们将在文章后面讲解)
        • 默认情况下,它运行在subscribe()发生的线程
        • 而如果在doOnsubscribe()之后有subscribeOn()方法,它将执行在离它最近的subscribeOn()方法指定的线程中。
          • 请查看代码如下:
                        Observable.create(onSubscribe)
                            .subscribeOn(Schedulers.io())
                            .doOnSubscribe(new Action0() {
                                    @Override
                                    public void call() {
                                        progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
                                    }
                                })
                            .subscribeOn(AndroidSchedulers.mainThread())// 指定主线程,因此doOnsubscribe()方法执行在主线程
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(subscriber);

observeOn():

  • 指定Subscriber所运行在的线程,也就是下游事件接收的线程
  • 每进行一次指定,就会切换一次线程
  • 从这可以看出来,我们在下游观察者处接收到被观察者发出来的数据时,可以多次切换线程,做不同的工作:
    • 如:在io线程中异步联网,随后切换到UI线程中刷新联网获取到的数据
  • observeOn()方法制定的是Subscriber之后的操作所在的线程,因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()就行了
    • 伪代码如下:
            Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(mapOperator) // 新线程,由 observeOn() 指定
                .observeOn(Schedulers.io())
                .map(mapOperator2) // IO 线程,由 observeOn() 指定
                .observeOn(AndroidSchedulers.mainThread)
                .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

标准线程代码:

  • 以下是一个标准的线程切换代码的实例:
        // 创建一个上游的Observable(被观察者)
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                // 发射事件
                Log.d(TAG, "Observable thread is :" + Thread.currentThread().getName());
                e.onNext("事件1");
                e.onNext("事件2");
                e.onNext("事件3");
                e.onComplete();
            }
        });
        // 创建一个下游Observer(观察者)
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(/**/Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }
            @Override
            public void onNext(String string) {
                Log.d(TAG, "onNext: " + string);
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
        // 上下游建立连接(观察者被观察者建立联系)
        observable
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread()/* 指定下游接收事件的Thread */)
                .subscribe(observer);

doOnSubscribe():

  • Subscriber的onStart()方法用于预处理初始化,但是由于onStart()在subscribe()发生的时候就调用了,因此不能指定线程。
  • 那么比如我想在预处理的时候就进行一次UI重绘,必须在主线程执行,这就会导致有线程非法的风险,因为我们不好控制我们当前所在线程
  • 因此,引入doOnSubscribe()方法
  • doOnsubscribe()可以指定线程。
    • 实际上可以这么理解:doOnsubscribe()其实是抛出了一个额外的执行项
  • 默认情况下,它运行在subscribe()发生的线程
  • 而如果在doOnsubscribe()之后有subscribeOn()方法,它将执行在离它最近的subscribeOn()方法指定的线程中。
        Observable.create(onSubscribe)
            .subscribeOn(Schedulers.io())
            .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
                    }
                })
            .subscribeOn(AndroidSchedulers.mainThread())// 指定主线程,因此doOnsubscribe()方法执行在主线程
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(subscriber);

 评论