- 什么是RxJava
RxJava其实就是一个异步的解决方案,是Android中AsyncTask 以及Thread + Handle的替代方案。
- 为什么要采用RxJava而不用AsyncTask 以及Thread + Handle?
因为它随着程序逻辑变得越来越复杂,依然能够保持简洁。而这些正是Thread + Handler 以及AsyncTask 很难做到的,因为当逻辑一多的时候难保证不会出现嵌套。一嵌套整个代码结构就显得很乱。
而RxJava是基于序列的,最重要的是两个元素Observable被观察对象,Subscriber事件订阅者,Observable是整个事件的源头,产生原始信息,然后再使用各种操作符建立起链式关系,整个过程就和一条流水线一样把原始数据经过层层加工,过滤最后发送给Subscriber。
- 如何引入到Android studio中:
只需要在module的build.gradle 中添加如下两个依赖关系语句即可:compile 'io.reactivex:rxandroid:1.2.0' compile 'io.reactivex:rxjava:1.1.5'
|
Hello RxJava
我们首先先建立一个Hello RxJava的例子,让大家有个简单的认识:
Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava"); }
});
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i("xiaohai.lin","onCompleted"); } @Override public void onError(Throwable throwable) { Log.i("xiaohai.lin","onError"); } @Override public void onNext(String s) { Log.i("xiaohai.lin","onNext :=" +s); } };
stringObservable.subscribe(subscriber);
|
输出结果就是:Hello RxJava。但是这里并没有涉及到事件源的处理,在接下来的章节中将陆续对事件源的创建,订阅者,以及事件处理这些部分展开介绍。
RxJava 事件源介绍:
* Create
这个是最基本的方法,它没有任何的特效,但是它也是最灵活的,调用传入的Subscriber的onComplete,onError,onNext 来实现自己想要的事件源:
下面是Create操作符号的时序图:
下面是一个实际的例子:
这些都来自官网:
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onCompleted() { System.out.println("Sequence complete."); } });
|
我们来分析下上面的代码:
这里会调用Observable.create并传入OnSubscribe 对象,其中call方法的参数Subscriber其实就是它的订阅者,在有订阅者订阅这个被观察对象的时候,它就会被传入,Observerable首先判断下当前订阅者是否还在订阅自己,如果还继续订阅则向订阅者发送0-5的数字,然后调用onCompleted方法,宣告整个过程完毕,整个过程的核心是Subscriber的onNext/onComplete/onError方法。onNext就是发射处理好的数据给Subscriber, onComplete一般在结束事件源的时候告诉Subscriber所有的数据都已发射完毕;onError是在发生错误的时候发射一个Throwable对象给Subscriber。
一般Observable必须调用所有的Subscriber的onComplete方法,并且一般只能调用一次,onError方法也类似,一般发生出错的时候被调用,调用后一般就停止执行任何其他操作。
* Just
Just可以接受最多9个参数,它将这一系列的对象作为一个整体发送出去,这些对象可以是一个数字、一个字符串、数组、Iterate对象等。
Observable.just(1, 2, 3) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onCompleted() { System.out.println("Sequence complete."); } });
|
* From
From操作符用来将传入的参数内容拆封开来,依次将其内容发射出去。它和just的区别在于,但是just将对象整个发射出去。而from会将对象拆封开来一次一次发送,比如说一个含有100个数字的数组,使用from就会发射100次,而使用just则一次将整个数组发送出去。
Integer[] items = { 0, 1, 2, 3, 4, 5 }; Observable myObservable = Observable.from(items);
myObservable.subscribe( new Action1<Integer>() { @Override public void call(Integer item) { System.out.println(item); } }, new Action1<Throwable>() { @Override public void call(Throwable error) { System.out.println("Error encountered: " + error.getMessage()); } }, new Action0() { @Override public void call() { System.out.println("Sequence complete"); } } );
|
* Defer
Defer操作符每次订阅都会得到一个刚创建的最新的Observable对象,从而确保了每次订阅者订阅到的Observable对象里的数据都是最新的。
Observable.defer(new Func0<Observable<Long>>() { @Override public Observable<Long> call() { return Observable.just(System.currentTimeMillis()); } }).subscribe(new Subscriber<Long>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {}
@Override public void onNext(Long t) { Log.i("xiaohai.lin", ""+t); } });
|
* Interval
这个事件源会在一定的时间间隔后产生一个从0开始的数字。
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(Long aLong) { Log.i("xiaohai.lin", "Current Time = " + aLong); } });
|
上面的例子会间隔1秒时间发射一个数字。这个数值会传递到订阅者的onNext方法中。
* Range
这个事件源可以产生一定范围的数值。需要传入两个表示范围的参数。
Observable.range(4, 20).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(Integer integer) { Log.i("xiaohai.lin",String.valueOf(integer)); } });
|
* Repeat
这个一般用在某个事件源中,使用repeat后会将原事件源重复一定的次数。
final Intent mainIntent = new Intent(Intent.ACTION_MAIN); mainIntent.addCategory(Intent.CATEGORY_LAUNCHER); List<ResolveInfo> activityInfoList = getPackageManager().queryIntentActivities(mainIntent, 0); Observable.from(activityInfoList).repeat(2).subscribe(new Action1<ResolveInfo>() { @Override public void call(ResolveInfo resolveInfo) { Log.i("xiaohai.lin",resolveInfo.activityInfo.name); } });
|
* Timmer
这个是在一定的时间间隔之后产生定时周期的事件源。
Observable.timer(10,1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(Long aLong) { Log.i("xiaohai.lin", "aLong = " + aLong); }
});
|
RxAndroid && RxJava Github地址
https://github.com/ReactiveX/RxAndroid https://github.com/ReactiveX/RxJava
|