前一篇博客我们介绍了如何对原始事件源进行过滤,这一篇我们将给大家介绍如何对事件源进行转换。什么是事件源转换?打个简单的比喻,我们原始事件源发出某个网站的站名,但是我们订阅者需要知道的是网站的地址,
这时候就可以通过事件源的转换来实现。

  • Buffer
    顾名思义,就是对事件源进行缓存,这里的缓存可以按照时间进行缓存,也可以按事件源个数进行缓存,缓存后将数据以数组集合的方式发送出去。

    方式一:
    只是单纯的缓存,这里buffer方法只有一个参数,那就是缓存大小,下面例子中,该转换器集满5个事件后就以列表的形式发出。

    Observable.range(1, 100).buffer(5).subscribe(new Subscriber<List<Integer>>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(List<Integer> integers) {
    Log.i("xiaohai.lin", integers.toString());
    }
    });

    整个运行结果如下所示:

    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [1, 2, 3, 4, 5]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [6, 7, 8, 9, 10]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [11, 12, 13, 14, 15]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [16, 17, 18, 19, 20]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [21, 22, 23, 24, 25]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [26, 27, 28, 29, 30]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [31, 32, 33, 34, 35]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [36, 37, 38, 39, 40]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [41, 42, 43, 44, 45]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [46, 47, 48, 49, 50]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [51, 52, 53, 54, 55]
    05-22 09:12:48.191 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [56, 57, 58, 59, 60]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [61, 62, 63, 64, 65]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [66, 67, 68, 69, 70]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [71, 72, 73, 74, 75]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [76, 77, 78, 79, 80]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [81, 82, 83, 84, 85]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [86, 87, 88, 89, 90]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [91, 92, 93, 94, 95]
    05-22 09:12:48.192 25401-25401/com.idealist.rxandroiddemo I/xiaohai.lin: [96, 97, 98, 99, 100]

    方式二:
    每M个事件中抽取N个事件源,以列表的形式发出,比如下面的例子,从1到100,每隔10个事件源,在这10个事件源中发送前5个。

    Observable.range(1, 100).buffer(5, 10).subscribe(new Subscriber<List<Integer>>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(List<Integer> integers) {
    Log.i("xiaohai.lin", integers.toString());
    }
    });

    输出结果:

    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [1, 2, 3, 4, 5]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [11, 12, 13, 14, 15]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [21, 22, 23, 24, 25]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [31, 32, 33, 34, 35]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [41, 42, 43, 44, 45]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [51, 52, 53, 54, 55]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [61, 62, 63, 64, 65]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [71, 72, 73, 74, 75]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [81, 82, 83, 84, 85]
    05-22 09:11:03.886 23946-23946/com.idealist.rxandroiddemo I/xiaohai.lin: [91, 92, 93, 94, 95]

    方式三:
    在某个时间间隔内,缓存数据,时间一到就把缓存的数据以列表的形式发送出去:
    下面事件源是一个每隔1秒的事件源,缓冲器是一个每隔4秒缓存一次的缓存器,到达4秒后将缓存数据发送出去:

    Observable.interval(1, TimeUnit.SECONDS).buffer(4, TimeUnit.SECONDS).subscribe(new Subscriber<List<Long>>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(List<Long> longs) {
    Log.i("xiaohai.lin", longs.toString());
    }
    });

    下面是输出结果:

    05-22 09:19:01.614 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [0, 1, 2]
    05-22 09:19:05.611 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [3, 4, 5, 6]
    05-22 09:19:09.612 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [7, 8, 9, 10]
    05-22 09:19:13.611 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [11, 12, 13, 14]
    05-22 09:19:17.612 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [15, 16, 17, 18]
    05-22 09:19:21.611 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [19, 20, 21, 22]
    05-22 09:19:25.611 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [23, 24, 25, 26]
    05-22 09:19:29.612 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [27, 28, 29, 30]
    05-22 09:19:33.611 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [31, 32, 33, 34]
    05-22 09:19:37.612 31383-31451/com.idealist.rxandroiddemo I/xiaohai.lin: [35, 36, 37, 38]
  • Window
    Window和上述的buffer类似,不同之处在于window是将数据源以一些小的Observable对象发送出去,这些小的Observable对象封装了每次缓冲的数据。

    方式一:

    Observable.range(1, 10).window(3).subscribe(new Subscriber<Observable<Integer>>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(final Observable<Integer> integerObservable) {
    integerObservable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    Log.i("xiaohai.lin", "-------------------------------------------------------------");
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Integer integer) {
    Log.i("xiaohai.lin", "integer " + integer.toString() +" : "+ integerObservable.toString());
    }
    });
    }
    });

    下面是运行的结果:

    05-22 10:50:28.687 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 1 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:50:28.688 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 2 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:50:28.688 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 3 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:50:28.688 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 4 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 5 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 6 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 7 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 8 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 9 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:50:28.689 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: integer 10 : rx.internal.operators.UnicastSubject@87a7e21
    05-22 10:50:28.701 12664-12664/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------

    方式二:

    Observable.range(1, 50).window(5,10).subscribe(new Subscriber<Observable<Integer>>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(final Observable<Integer> integerObservable) {
    integerObservable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    Log.i("xiaohai.lin", "-------------------------------------------------------------");
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(Integer integer) {
    Log.i("xiaohai.lin", "integer " + integer.toString() +" : "+ integerObservable.toString());
    }
    });
    }
    });

    运行结果:

    05-22 10:55:17.413 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 1 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:55:17.413 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 2 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:55:17.413 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 3 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:55:17.413 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 4 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:55:17.413 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 5 : rx.internal.operators.UnicastSubject@c9c87a
    05-22 10:55:17.413 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:55:17.414 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 11 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:55:17.414 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 12 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:55:17.414 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 13 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:55:17.414 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 14 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:55:17.414 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 15 : rx.internal.operators.UnicastSubject@95bb82b
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 21 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 22 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 23 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 24 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 25 : rx.internal.operators.UnicastSubject@a6cb488
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 31 : rx.internal.operators.UnicastSubject@87a7e21
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 32 : rx.internal.operators.UnicastSubject@87a7e21
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 33 : rx.internal.operators.UnicastSubject@87a7e21
    05-22 10:55:17.415 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 34 : rx.internal.operators.UnicastSubject@87a7e21
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 35 : rx.internal.operators.UnicastSubject@87a7e21
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 41 : rx.internal.operators.UnicastSubject@d1bf246
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 42 : rx.internal.operators.UnicastSubject@d1bf246
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 43 : rx.internal.operators.UnicastSubject@d1bf246
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 44 : rx.internal.operators.UnicastSubject@d1bf246
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: integer 45 : rx.internal.operators.UnicastSubject@d1bf246
    05-22 10:55:17.416 19057-19057/com.idealist.rxandroiddemo I/xiaohai.lin: -------------------------------------------------------------

    方式三:

    Observable.interval(1, TimeUnit.SECONDS).window(3, TimeUnit.SECONDS).subscribe(new Subscriber<Observable<Long>>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Observable<Long> longObservable) {
    longObservable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    Log.i("xiaohai.lin","=====================================");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Long aLong) {
    Log.i("xiaohai.lin","times = "+aLong);
    }
    });
    }
    });

    运行结果:

    05-22 10:58:40.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 0
    05-22 10:58:41.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 1
    05-22 10:58:42.815 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
    05-22 10:58:42.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 2
    05-22 10:58:43.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 3
    05-22 10:58:44.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 4
    05-22 10:58:45.815 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
    05-22 10:58:45.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 5
    05-22 10:58:46.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 6
    05-22 10:58:47.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 7
    05-22 10:58:48.816 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
    05-22 10:58:48.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 8
    05-22 10:58:49.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 9
    05-22 10:58:50.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 10
    05-22 10:58:51.815 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
    05-22 10:58:51.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 11
    05-22 10:58:52.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 12
    05-22 10:58:53.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 13
    05-22 10:58:54.816 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
    05-22 10:58:54.819 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 14
    05-22 10:58:55.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 15
    05-22 10:58:56.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 16
    05-22 10:58:57.815 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
    05-22 10:58:57.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 17
    05-22 10:58:58.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 18
    05-22 10:58:59.818 22345-22431/com.idealist.rxandroiddemo I/xiaohai.lin: times = 19
    05-22 10:59:00.815 22345-22430/com.idealist.rxandroiddemo I/xiaohai.lin: =====================================
  • Map
    Map 转换一般用在将原始数据源的每个事件,一个个进行处理后,发送出去,它对数据的转化是直接进行的和后面将要介绍的FlatMap是有一定的区别的,FlatMap 是通过Observables过渡的会产生中间Observables:
    下面的例子是一个获取手机中的Apk包名然后将其转换为大写的例子,它是一个个直接进行转换的。

    final Intent mainIntent = new Intent(Intent.ACTION_MAIN);
    mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
    List<ResolveInfo> activityInfoList = getPackageManager().queryIntentActivities(mainIntent, 0);
    Observable.from(activityInfoList).map(new Func1<ResolveInfo, ResolveInfo>() {
    @Override
    public ResolveInfo call(ResolveInfo resolveInfo) {
    resolveInfo.activityInfo.packageName = resolveInfo.activityInfo.packageName.toUpperCase();
    return resolveInfo;
    }
    }).subscribe(new Subscriber<ResolveInfo>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(ResolveInfo resolveInfo) {
    Log.i("xiaohai.lin", resolveInfo.activityInfo.packageName);
    }
    });

    下面是输出的结果:

    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.CONTACTS
    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.DIALER
    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.MMS
    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.VENDING
    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.SETTINGS
    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.CALENDAR
    05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.MEDIATEK.CAMERA
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.CHROME
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.DESKCLOCK
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.APPS.DOCS
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.GM
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.TALK
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.MUSIC
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.APPS.PHOTOS
    05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.YOUTUBE
  • FlatMap

和Map一样都是能够将原始数据转换后将其发送出去,但是它是将原始的Observable转换成另外一个Observable后将其以Observable形式发送出去。

下面是Map和Flatmap的区别:[来自网上]
Map是在一个item被发射之后,到达Map处经过转换变成另一个item,然后继续往下走;
flapMap是item被发射之后,到达flatMap 处经过转换变成一个Observable,而这个Observable并不会直接被发射出去,而是会立即被激活,然后把它发射出的每个 item 都传入流中,再继续走下去。
所以 flatMap 和 map 有两个区别:
经过 Observable 的转换,相当于重新开了一个异步的流;
item 被分散了,个数发生了变化。

final Intent mainIntent = new Intent(Intent.ACTION_MAIN);
mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
List<ResolveInfo> activityInfoList = getPackageManager().queryIntentActivities(mainIntent, 0);
Observable.from(activityInfoList).flatMap(new Func1<ResolveInfo, Observable<ResolveInfo>>() {
@Override
public Observable<ResolveInfo> call(ResolveInfo resolveInfo) {
resolveInfo.activityInfo.packageName = resolveInfo.activityInfo.packageName.toUpperCase();
return Observable.just(resolveInfo);
}
}).subscribe(new Subscriber<ResolveInfo>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(ResolveInfo resolveInfo) {
Log.i("xiaohai.lin", resolveInfo.activityInfo.packageName);
}
});

下面是输出的结果:

05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.CONTACTS
05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.DIALER
05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.MMS
05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.VENDING
05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.SETTINGS
05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.CALENDAR
05-22 09:39:14.200 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.MEDIATEK.CAMERA
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.CHROME
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.ANDROID.DESKCLOCK
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.APPS.DOCS
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.GM
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.TALK
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.MUSIC
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.APPS.PHOTOS
05-22 09:39:14.201 10071-10071/com.idealist.rxandroiddemo I/xiaohai.lin: COM.GOOGLE.ANDROID.YOUTUBE
  • concatMap
    flatMap()与concatMap()大体的行为是一致的,但是flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。这有可能会带来严重的影响,比如我们要计算1到100的每10个元素分成一组,计算每组的和,但是如果有交错发生,
    这样每组计算的和和我们期望的将会差别很大,这时候就可以使用与之类似的concatMap()操作符来避免数据的交错发生。

  • SwitchMap
    switchMap()和flatMap()大体的行为是一致的,除了一点:当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视之前那个数据的Observable产生的Observable,并开始监视当前这一个。

  • Cast
    Cast将Observable发射的数据强制转化为另外一种类型,这个用得比较少,在这里先不做介绍:

  • GroupBy

GroupBy操作符将原始事件源的数据按照key来拆分成一些小的Observable,拆封规则在groupBy中实现,然后将这些同组的事件源封装成一个Observable,再将这些事件源一组一组发送出去:
下面的例子将会扫描手机中的apk包名,将其按照报名的前两个前缀分组,并统计每个类别的应用的数量:

final Intent mainIntent = new Intent(Intent.ACTION_MAIN);
mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
final List<ResolveInfo> activityInfoList = getPackageManager().queryIntentActivities(mainIntent, 0);
Observable.from(activityInfoList).groupBy(new Func1<ResolveInfo, String>() {
@Override
public String call(ResolveInfo resolveInfo) {
String packageName = resolveInfo.activityInfo.packageName;
return packageName.substring(0,packageName.lastIndexOf("."));
}
}).subscribe(new Subscriber<GroupedObservable<String, ResolveInfo>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(final GroupedObservable<String, ResolveInfo> stringResolveInfoGroupedObservable) {
stringResolveInfoGroupedObservable.count().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("xiaohai.lin", stringResolveInfoGroupedObservable.getKey() + " Contain :" + integer.toString() + " elemnts");
}
});
}
});

运行结果如下:

05-22 10:23:20.672 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com  Contain :4  elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.flysnow Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.qihoo360 Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.everimaging Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.android Contain :9 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.jianshu Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.sohu.inputmethod Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: io.manong Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.google.android Contain :7 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.foxit.mobile.pdf Contain :1 elemnts
05-22 10:23:20.674 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.alcatel Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.liangfeizc Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.emoji.keyboard Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.tencent Contain :3 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.baozou.baozou Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.antonioleiva Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.alpha Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.jrdcom Contain :3 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.douban Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.zhihu Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.baidu Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.facebook Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: vStudio.Android Contain :1 elemnts
05-22 10:23:20.675 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.ss.android.article Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.android.providers.downloads Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: cn.wps Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.netease Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.ticktick Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.codoon Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.tct Contain :6 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.google.android.apps Contain :3 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.dajie Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.tcl Contain :1 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.idealist Contain :3 elemnts
05-22 10:23:20.676 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.baozoumanhua Contain :1 elemnts
05-22 10:23:20.677 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.hitherejoe Contain :1 elemnts
05-22 10:23:20.677 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.mediatek Contain :1 elemnts
05-22 10:23:20.677 15638-15638/com.idealist.rxandroiddemo I/xiaohai.lin: com.chinamworld Contain :1 elemnts
  • Scan
    Scan操作符用于对一个序列的数据应用一个函数,将函数返回值作为下一个引用这个函数时候的第一个参数使用。
    Observable.range(1, 10).scan(new Func2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) {
    return integer.intValue() + integer2.intValue();
    }
    }).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Integer integer) {
    Log.i("xiaohai.lin", "====" + integer.toString());
    }
    });
    下面是运行的结果,这个例子实际上是求1到10的整数和:
    05-22 10:40:26.484 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====1
    05-22 10:40:26.484 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====3
    05-22 10:40:26.484 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====6
    05-22 10:40:26.484 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====10
    05-22 10:40:26.485 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====15
    05-22 10:40:26.485 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====21
    05-22 10:40:26.485 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====28
    05-22 10:40:26.485 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====36
    05-22 10:40:26.485 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====45
    05-22 10:40:26.485 1639-1639/com.idealist.rxandroiddemo I/xiaohai.lin: ====55

Contents