Android 进阶之第三方库的介绍 RxJava && RxAndroid 四 [事件源的组合]
- Merege
Merge 操作能够将有多个事件源合并起来产生一个事件源,也就是多输入单输出系统。最后的结果就和一个事件源发出事件一样,Merge操作符,在使用的时候有两点需要注意:
和flatMap一样生成的数据源有可能会相互交错,并且当某一个Observable出错而导致发出onError的时候,merge的过程会被终止并将错误分发给订阅者,如果不想要在发生异常的时候中断merge,可以使用MeregeDelayError它在merge结束后再将数据分发出去。
- Merege
- mergeDelayError
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
运行结果:
Next: 1 |
- Zip
Zip 允许接收多个事件源,并且将对应的事件通过一个方法处理后产生的结果添加到Observable中统一发布出去,这里需要注意的是最终的事件源个数取决于原始数据源中个数最少的那个,看下下面的例子:
事件源observable1 有20个事件,而observable2有10个事件,两个合成起来后最终产生的结果事件为10个。也就是最少的那个:
Observable<Integer> observable1 = Observable.range(1,20); |
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 1 + 1 = 2 |
CombineLatest
CombinLatest可以将多个数据源的最新(最后产生的数据)给组装起来,然然后发送出去.Observable<Long> observable1 = Observable.interval(3, SECONDS);
Observable<Long> observable2 = Observable.interval(1, SECONDS);
Observable.combineLatest(observable1, observable2, new Func2<Long, Long, String>() {
public String call(Long aLong, Long aLong2) {
Log.i("xiaohai.lin", "aLong = " + aLong + " : aLong2 = " + aLong2);
return (aLong+"====="+aLong2);
}
}).subscribe(new Subscriber<String>() {
public void onCompleted() {
}
public void onError(Throwable e) {
}
public void onNext(String s) {
Log.i("xiaohai.lin", s);
}
});运行结果如下所示:每次某个事件源有事件产生的时候,都会取另一个事件源的最新事件进行组合:
05-22 15:37:23.431 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 1
05-22 15:37:23.431 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====1
05-22 15:37:23.432 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 2
05-22 15:37:23.432 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====2
05-22 15:37:24.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 3
05-22 15:37:24.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====3
05-22 15:37:25.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 4
05-22 15:37:25.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====4
05-22 15:37:26.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 5
05-22 15:37:26.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====5
05-22 15:37:26.430 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 1 : aLong2 = 5
05-22 15:37:26.431 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: 1=====5
05-22 15:37:27.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 1 : aLong2 = 6
05-22 15:37:27.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 1=====6
05-22 15:37:28.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 1 : aLong2 = 7
05-22 15:37:28.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 1=====7
05-22 15:37:29.430 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 7
05-22 15:37:29.430 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====7
05-22 15:37:29.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 8
05-22 15:37:29.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====8
05-22 15:37:30.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 9
05-22 15:37:30.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====9
05-22 15:37:31.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 10
05-22 15:37:31.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====10
05-22 15:37:32.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 11
05-22 15:37:32.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====11StartWith
StartWith操作符会在源事件源的前面插上事件后发送:
Observable<Integer> observable1 = Observable.range(1, 10); |
运行结果:
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 11 |