• 什么是RxJava
    RxJava其实就是一个异步的解决方案,是Android中AsyncTask 以及Thread + Handle的替代方案。
  • 为什么要采用RxJava而不用AsyncTask 以及Thread + Handle?
    因为它随着程序逻辑变得越来越复杂,依然能够保持简洁。而这些正是Thread + Handler 以及AsyncTask 很难做到的,因为当逻辑一多的时候难保证不会出现嵌套。一嵌套整个代码结构就显得很乱。
    而RxJava是基于序列的,最重要的是两个元素Observable被观察对象,Subscriber事件订阅者,Observable是整个事件的源头,产生原始信息,然后再使用各种操作符建立起链式关系,整个过程就和一条流水线一样把原始数据经过层层加工,过滤最后发送给Subscriber。
  • 如何引入到Android studio中:
    只需要在module的build.gradle 中添加如下两个依赖关系语句即可:
    compile 'io.reactivex:rxandroid:1.2.0'
    compile 'io.reactivex:rxjava:1.1.5'
    Hello RxJava

我们首先先建立一个Hello RxJava的例子,让大家有个简单的认识:

 //创建一个事件源,在事件源中调用每个订阅者的对应响应
Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello RxJava");
}

});
//创建一个订阅者,实现每个事件节点的响应
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("xiaohai.lin","onCompleted");
}
@Override
public void onError(Throwable throwable) {
Log.i("xiaohai.lin","onError");
}
@Override
public void onNext(String s) {
Log.i("xiaohai.lin","onNext :=" +s);
}
};
//事件源接受订阅者的订阅
stringObservable.subscribe(subscriber);

输出结果就是:Hello RxJava。但是这里并没有涉及到事件源的处理,在接下来的章节中将陆续对事件源的创建,订阅者,以及事件处理这些部分展开介绍。

RxJava 事件源介绍:

* Create

这个是最基本的方法,它没有任何的特效,但是它也是最灵活的,调用传入的Subscriber的onComplete,onError,onNext 来实现自己想要的事件源:
下面是Create操作符号的时序图:

下面是一个实际的例子:
这些都来自官网:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});

我们来分析下上面的代码:
这里会调用Observable.create并传入OnSubscribe 对象,其中call方法的参数Subscriber其实就是它的订阅者,在有订阅者订阅这个被观察对象的时候,它就会被传入,Observerable首先判断下当前订阅者是否还在订阅自己,如果还继续订阅则向订阅者发送0-5的数字,然后调用onCompleted方法,宣告整个过程完毕,整个过程的核心是Subscriber的onNext/onComplete/onError方法。onNext就是发射处理好的数据给Subscriber, onComplete一般在结束事件源的时候告诉Subscriber所有的数据都已发射完毕;onError是在发生错误的时候发射一个Throwable对象给Subscriber。
一般Observable必须调用所有的Subscriber的onComplete方法,并且一般只能调用一次,onError方法也类似,一般发生出错的时候被调用,调用后一般就停止执行任何其他操作。

* Just


Just可以接受最多9个参数,它将这一系列的对象作为一个整体发送出去,这些对象可以是一个数字、一个字符串、数组、Iterate对象等。

Observable.just(1, 2, 3)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}

@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
* From

From操作符用来将传入的参数内容拆封开来,依次将其内容发射出去。它和just的区别在于,但是just将对象整个发射出去。而from会将对象拆封开来一次一次发送,比如说一个含有100个数字的数组,使用from就会发射100次,而使用just则一次将整个数组发送出去。

Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);

myObservable.subscribe(
new Action1<Integer>() {
@Override
public void call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println("Sequence complete");
}
}
);
* Defer


Defer操作符每次订阅都会得到一个刚创建的最新的Observable对象,从而确保了每次订阅者订阅到的Observable对象里的数据都是最新的。

Observable.defer(new Func0<Observable<Long>>() {
@Override
public Observable<Long> call() {
return Observable.just(System.currentTimeMillis());
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}

@Override
public void onNext(Long t) {
Log.i("xiaohai.lin", ""+t);
}
});
* Interval

这个事件源会在一定的时间间隔后产生一个从0开始的数字。

Observable.interval(1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Long aLong) {
Log.i("xiaohai.lin", "Current Time = " + aLong);
}
});

上面的例子会间隔1秒时间发射一个数字。这个数值会传递到订阅者的onNext方法中。

* Range


这个事件源可以产生一定范围的数值。需要传入两个表示范围的参数。

Observable.range(4, 20).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.i("xiaohai.lin",String.valueOf(integer));
}
});
* Repeat


这个一般用在某个事件源中,使用repeat后会将原事件源重复一定的次数。

final Intent mainIntent = new Intent(Intent.ACTION_MAIN);
mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
List<ResolveInfo> activityInfoList = getPackageManager().queryIntentActivities(mainIntent, 0);
Observable.from(activityInfoList).repeat(2).subscribe(new Action1<ResolveInfo>() {
@Override
public void call(ResolveInfo resolveInfo) {
Log.i("xiaohai.lin",resolveInfo.activityInfo.name);
}
});
* Timmer


这个是在一定的时间间隔之后产生定时周期的事件源。

Observable.timer(10,1, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Long aLong) {
Log.i("xiaohai.lin", "aLong = " + aLong);
}

});

RxAndroid && RxJava Github地址

https://github.com/ReactiveX/RxAndroid  
https://github.com/ReactiveX/RxJava
Contents
  1. 1. Hello RxJava
  • RxJava 事件源介绍:
    1. 1. * Create
    2. 2. * Just
    3. 3. * From
    4. 4. * Defer
    5. 5. * Interval
    6. 6. * Range
    7. 7. * Repeat
    8. 8. * Timmer