变换:
RX提供了对事件序列进行变换的支持,这是核心功能之一。就是将事件序列中的对象进行加工处理,转换成不同的事件或者事件序列(简单点说:发送出来的事件,我们可以理解成一个对象,事件序列就是对象序列。对事件进行改造加工,就是对对象进行额外处理)。
变换操作符:
Map:
- 对上游发送的每一个事件,应用一个操作,让每一个经过这个操作的事件,都能按照指定的方式去发生变换。
- 图例:
- 通过Map我们可以将上游发射过来的任意事件转换为我们想要的任意类型:
- Object、集合、自定义对象等等等等
- 请看代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onNext("事件4");
e.onComplete();
}
}).map(new Function<String, String>() {
@Override
public String apply(String string) throws Exception {
// 通过变换操作,拼接字符串,随后返回
return "this is resule:" + string;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
FlatMap:
- 将一个发送事件的上游Observable变换成多个发送事件的Observables,然后将他们发射的事件合并后,放入一个单独的Observable中,随后交给下游处理。(可以理解成,第一个上游,创建了多个上游发射事件,随后将这多个上游发射的事件汇总到一起,交给下游处理)
- 图例:
- 注意:
- FlatMap并不保证事件的发送顺序
- 请看代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onNext("事件4");
e.onComplete();
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String string) throws Exception {
final List<String> list = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
list.add("I am value:" + string);
}
return Observable.fromIterable(list)/*
* 后续代码加入了delay功能,
* 标示发送事件延迟10毫秒
*/.delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
ConcatMap:
- 与FlatMap的效果几乎一模一样
- 图例:
- 区别:
- 它的结果是严格按照上游的发送顺序来进行发送的
- 请看代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onNext("事件4");
e.onComplete();
}
}).concatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String string) throws Exception {
final List<String> list = new ArrayList<String>();
for (int i = 0; i < 3; i++) {
list.add("I am value:" + string);
}
return Observable.fromIterable(list)/*
* 后续代码加入了delay功能,
* 标示发送事件延迟10毫秒
*/.delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
Zip:
- 通过一个函数将两个个Observable发送的事件整合到一起,将他们发送给下游。
- 组合的顺序是严格按照事件发送的顺序来进行的,也就是说每个上游的事件顺序都是对应进行zip处理的。
- zip整合后的事件发射个数:取两个个上游中,事件个数最少的上游的个数为准。因为zip是从每个上游取出一个事件进行处理,一旦有任何一个上游的事件取完,zip就结束了。
- 具体流程:
- 请看代码:
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e/* 事件发射器 */) throws Exception {
e.onNext("01我是:");
e.onNext("02我是:");
e.onNext("03我是:");
e.onNext("04我是:");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e/* 事件发射器 */) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<String, Integer, String>() {
@Override
public String apply(String integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(/**/Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.d(TAG, "zip操作结果: " + string);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
- 需要注意的几点内容:
- 当两个上游处于同一线程时,他们的发射顺序将会先发送完第一个上游的事件,在发送第二个,而在发送第二个的过程中进行zip,因此,这种情况下的每一个上游的所有事件都会发射。
- 当两个上游不在同一线程时,他们才是异步的。