变换:

RX提供了对事件序列进行变换的支持,这是核心功能之一。就是将事件序列中的对象进行加工处理,转换成不同的事件或者事件序列(简单点说:发送出来的事件,我们可以理解成一个对象,事件序列就是对象序列。对事件进行改造加工,就是对对象进行额外处理)。

变换操作符:

Map:

  • 对上游发送的每一个事件,应用一个操作,让每一个经过这个操作的事件,都能按照指定的方式去发生变换。
  • 图例:
  • 通过Map我们可以将上游发射过来的任意事件转换为我们想要的任意类型:
    • Object、集合、自定义对象等等等等
  • 请看代码:
        Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                    e.onNext("事件1");
                    e.onNext("事件2");
                    e.onNext("事件3");
                    e.onNext("事件4");
                    e.onComplete();
                }
            }).map(new Function<String, String>() {
                    @Override
                    public String apply(String string) throws Exception {
                        // 通过变换操作,拼接字符串,随后返回
                        return "this is resule:" + string;
                    }
                }).subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "accept: " + s);
                        }
                    });

FlatMap:

  • 将一个发送事件的上游Observable变换成多个发送事件的Observables,然后将他们发射的事件合并后,放入一个单独的Observable中,随后交给下游处理。(可以理解成,第一个上游,创建了多个上游发射事件,随后将这多个上游发射的事件汇总到一起,交给下游处理)
  • 图例:
  • 注意:
    • FlatMap并不保证事件的发送顺序
  • 请看代码:
        Observable.create(new ObservableOnSubscribe<String>() {
             @Override
             public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                 e.onNext("事件1");
                 e.onNext("事件2");
                 e.onNext("事件3");
                 e.onNext("事件4");
                 e.onComplete();
             }
         }).flatMap(new Function<String, ObservableSource<String>>() {
             @Override
             public ObservableSource<String> apply(String string) throws Exception {
                 final List<String> list = new ArrayList<String>();
                 for (int i = 0; i < 3; i++) {
                     list.add("I am value:" + string);
                 }
                 return Observable.fromIterable(list)/*
                                                      * 后续代码加入了delay功能,
                                                      * 标示发送事件延迟10毫秒
                                                      */.delay(10, TimeUnit.MILLISECONDS);
             }
         }).subscribe(new Consumer<String>() {
             @Override
             public void accept(String s) throws Exception {
                 Log.d(TAG, "accept: " + s);
             }
         });

ConcatMap:

  • 与FlatMap的效果几乎一模一样
  • 图例:
  • 区别:
    • 它的结果是严格按照上游的发送顺序来进行发送的
  • 请看代码:
        Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                    e.onNext("事件1");
                    e.onNext("事件2");
                    e.onNext("事件3");
                    e.onNext("事件4");
                    e.onComplete();
                }
            }).concatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String string) throws Exception {
                    final List<String> list = new ArrayList<String>();
                    for (int i = 0; i < 3; i++) {
                        list.add("I am value:" + string);
                    }
                    return Observable.fromIterable(list)/*
                                                         * 后续代码加入了delay功能,
                                                         * 标示发送事件延迟10毫秒
                                                         */.delay(10, TimeUnit.MILLISECONDS);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept: " + s);
                }
            });

Zip:

  • 通过一个函数将两个个Observable发送的事件整合到一起,将他们发送给下游。
  • 组合的顺序是严格按照事件发送的顺序来进行的,也就是说每个上游的事件顺序都是对应进行zip处理的。
  • zip整合后的事件发射个数:取两个个上游中,事件个数最少的上游的个数为准。因为zip是从每个上游取出一个事件进行处理,一旦有任何一个上游的事件取完,zip就结束了。
  • 具体流程:
  • 请看代码:
        Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
                   @Override
                   public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
                       e.onNext("01我是:");
                       e.onNext("02我是:");
                       e.onNext("03我是:");
                       e.onNext("04我是:");
                       e.onComplete();
                   }
               }).subscribeOn(Schedulers.io());
               Observable<Integer> observable2 = Observable.create(new ObservableOnSubscribe<Integer>() {
                   @Override
                   public void subscribe(ObservableEmitter<Integer> e/* 事件发射器 */) throws Exception {
                       e.onNext(1);
                       e.onNext(2);
                       e.onNext(3);
                       e.onComplete();
                   }
               }).subscribeOn(Schedulers.io());
               Observable.zip(observable1, observable2, new BiFunction<String, Integer, String>() {
                   @Override
                   public String apply(String integer, Integer integer2) throws Exception {
                       return integer + integer2;
                   }
               }).subscribe(new Observer<String>() {
                   @Override
                   public void onSubscribe(/**/Disposable d) {
                       Log.d(TAG, "onSubscribe: ");
                   }
                   @Override
                   public void onNext(String string) {
                       Log.d(TAG, "zip操作结果: " + string);
                   }
                   @Override
                   public void onError(Throwable e) {
                       Log.d(TAG, "onError: ");
                   }
                   @Override
                   public void onComplete() {
                       Log.d(TAG, "onComplete: ");
                   }
               });
  • 需要注意的几点内容:
    • 当两个上游处于同一线程时,他们的发射顺序将会先发送完第一个上游的事件,在发送第二个,而在发送第二个的过程中进行zip,因此,这种情况下的每一个上游的所有事件都会发射。
    • 当两个上游不在同一线程时,他们才是异步的。

 评论