• Merege
    Merge 操作能够将有多个事件源合并起来产生一个事件源,也就是多输入单输出系统。最后的结果就和一个事件源发出事件一样,Merge操作符,在使用的时候有两点需要注意:
    和flatMap一样生成的数据源有可能会相互交错,并且当某一个Observable出错而导致发出onError的时候,merge的过程会被终止并将错误分发给订阅者,如果不想要在发生异常的时候中断merge,可以使用MeregeDelayError它在merge结束后再将数据分发出去。
  1. Merege
  2. mergeDelayError
    Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
    Observable<Integer> evens = Observable.just(2, 4, 6);
    Observable.merge(odds, evens)
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
    System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
    System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
    System.out.println("Sequence complete.");
    }
    });

运行结果:

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.
  • Zip

Zip 允许接收多个事件源,并且将对应的事件通过一个方法处理后产生的结果添加到Observable中统一发布出去,这里需要注意的是最终的事件源个数取决于原始数据源中个数最少的那个,看下下面的例子:
事件源observable1 有20个事件,而observable2有10个事件,两个合成起来后最终产生的结果事件为10个。也就是最少的那个:

Observable<Integer> observable1 = Observable.range(1,20);
Observable<Integer> observable2 = Observable.range(1,10);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
Log.i("xiaohai.lin", integer.toString() + " + " + integer2.toString() + " = " + (integer.intValue() + integer2.intValue()));
return integer.intValue() + integer2.intValue();
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i("xiaohai.lin", integer.toString() + " =================== ");
}
});
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 1 + 1 = 2
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 2 ===================
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 2 + 2 = 4
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 4 ===================
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 3 + 3 = 6
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 6 ===================
05-22 14:46:16.246 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 4 + 4 = 8
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 8 ===================
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 5 + 5 = 10
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 10 ===================
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 6 + 6 = 12
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 12 ===================
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 7 + 7 = 14
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 14 ===================
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 8 + 8 = 16
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 16 ===================
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 9 + 9 = 18
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 18 ===================
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 10 + 10 = 20
05-22 14:46:16.247 16854-16854/com.idealist.rxandroiddemo I/xiaohai.lin: 20 ===================
  • CombineLatest

    CombinLatest可以将多个数据源的最新(最后产生的数据)给组装起来,然然后发送出去.

    Observable<Long> observable1 = Observable.interval(3, SECONDS);
    Observable<Long> observable2 = Observable.interval(1, SECONDS);
    Observable.combineLatest(observable1, observable2, new Func2<Long, Long, String>() {
    @Override
    public String call(Long aLong, Long aLong2) {
    Log.i("xiaohai.lin", "aLong = " + aLong + " : aLong2 = " + aLong2);
    return (aLong+"====="+aLong2);
    }
    }).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
    Log.i("xiaohai.lin", s);
    }
    });

    运行结果如下所示:每次某个事件源有事件产生的时候,都会取另一个事件源的最新事件进行组合:

    05-22 15:37:23.431 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 1
    05-22 15:37:23.431 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====1
    05-22 15:37:23.432 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 2
    05-22 15:37:23.432 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====2
    05-22 15:37:24.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 3
    05-22 15:37:24.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====3
    05-22 15:37:25.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 4
    05-22 15:37:25.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====4
    05-22 15:37:26.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 0 : aLong2 = 5
    05-22 15:37:26.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 0=====5
    05-22 15:37:26.430 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 1 : aLong2 = 5
    05-22 15:37:26.431 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: 1=====5
    05-22 15:37:27.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 1 : aLong2 = 6
    05-22 15:37:27.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 1=====6
    05-22 15:37:28.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 1 : aLong2 = 7
    05-22 15:37:28.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 1=====7
    05-22 15:37:29.430 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 7
    05-22 15:37:29.430 4781-4839/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====7
    05-22 15:37:29.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 8
    05-22 15:37:29.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====8
    05-22 15:37:30.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 9
    05-22 15:37:30.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====9
    05-22 15:37:31.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 10
    05-22 15:37:31.430 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====10
    05-22 15:37:32.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: aLong = 2 : aLong2 = 11
    05-22 15:37:32.431 4781-4841/com.idealist.rxandroiddemo I/xiaohai.lin: 2=====11

  • StartWith

StartWith操作符会在源事件源的前面插上事件后发送:

Observable<Integer> observable1 = Observable.range(1, 10);
Observable<Integer> observable2 = Observable.range(11, 20);
observable1.startWith(observable2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer o) {
Log.i("xiaohai.lin", o.toString());
}
});

运行结果:

05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 11
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 12
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 13
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 14
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 15
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 16
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 17
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 18
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 19
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 20
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 21
05-22 15:48:12.555 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 22
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 23
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 24
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 25
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 26
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 27
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 28
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 29
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 30
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 1
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 2
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 3
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 4
05-22 15:48:12.556 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 5
05-22 15:48:12.557 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 6
05-22 15:48:12.557 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 7
05-22 15:48:12.557 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 8
05-22 15:48:12.557 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 9
05-22 15:48:12.557 15979-15979/com.idealist.rxandroiddemo I/xiaohai.lin: 10
Contents