首先要事先声明下这篇博文是转载自如下两篇博客,后续部分也会另外创建一篇博文用于记录自己在RxJava实践中的使用场景:
http://blog.csdn.net/lzyzsd/article/details/50120801
http://blog.csdn.net/theone10211024/article/details/50435325

取数据先检查缓存的场景

取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
//主要就是靠concat operator来实现
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});

界面需要等到多个接口并发取完数据,再更新

//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
private void testMerge() {
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
}

一个接口的请求依赖另一个API请求返回的数据

举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。
这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫Callback hell

NetworkService.getToken("username", "password")
.flatMap(s -> NetworkService.getMessage(s))
.subscribe(s -> {
System.out.println("message: " + s);
});

界面按钮需要防止连续点击的情况

RxView.clicks(findViewById(R.id.btn_throttle))
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(aVoid -> {
System.out.println("click");
});

响应式的界面

比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
.subscribe(checked.asAction());

复杂的数据变换

Observable.just("1", "2", "2", "3", "4", "5")
.map(Integer::parseInt)
.filter(s -> s > 1)
.distinct()
.take(3)
.reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
.subscribe(System.out::println);//9

Scheduler线程切换

这种场景经常会在“后台线程取数据,主线程展示”的模式中看见

Observable.just(1, 2, 3, 4)  
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

使用debounce做textSearch

用简单的话讲就是当N个结点发生的时间太靠近(即发生的时间差小于设定的值T),debounce就会自动过滤掉前N-1个结点。比如在做百度地址联想的时候,可以使用debounce减少频繁的网络请求。避免每输入(删除)一个字就做一次联想

RxTextView.textChangeEvents(inputEditText)  
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted() {
log.d("onComplete");
}

@Override
public void onError(Throwable e) {
log.d("Error");
}

@Override
public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
}
});

Retrofit结合RxJava做网络请求框架

这里不作详解,具体的介绍可以看扔物线的这篇文章,对RxJava的入门者有很大的启发。其中也讲到了RxJava和Retrofit如何结合来实现更简洁的代码

RxJava代替EventBus进行数据传递:RxBus

注意:RxBus并不是一个库,而是一种模式,是使用了RxJava的思想来达到EventBus的数据传递效果。这篇文章把RxBus讲的比较详细。

使用combineLatest合并最近N个结点

例如:注册的时候所有输入信息(邮箱、密码、电话号码等)合法才点亮注册按钮。

Observable<CharSequence> _emailChangeObservable = RxTextView.textChanges(_email).skip(1);  
Observable<CharSequence> _passwordChangeObservable = RxTextView.textChanges(_password).skip(1);
Observable<CharSequence> _numberChangeObservable = RxTextView.textChanges(_number).skip(1);

Observable.combineLatest(_emailChangeObservable,
_passwordChangeObservable,
_numberChangeObservable,
new Func3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean call(CharSequence newEmail,
CharSequence newPassword,
CharSequence newNumber) {

Log.d("xiayong",newEmail+" "+newPassword+" "+newNumber);
boolean emailValid = !isEmpty(newEmail) &&
EMAIL_ADDRESS.matcher(newEmail).matches();
if (!emailValid) {
_email.setError("Invalid Email!");
}

boolean passValid = !isEmpty(newPassword) && newPassword.length() > 8;
if (!passValid) {
_password.setError("Invalid Password!");
}

boolean numValid = !isEmpty(newNumber);
if (numValid) {
int num = Integer.parseInt(newNumber.toString());
numValid = num > 0 && num <= 100;
}
if (!numValid) {
_number.setError("Invalid Number!");
}

return emailValid && passValid && numValid;

}
})//
.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
log.d("completed");
}

@Override
public void onError(Throwable e) {
log.d("Error");
}

@Override
public void onNext(Boolean formValid) {
_btnValidIndicator.setEnabled(formValid);
}
});

使用merge合并两个数据源。

例如一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。

Observable.merge(getDataFromFile(), getDataFromNet())  
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log.d("done loading all data");
}

@Override
public void onError(Throwable e) {
log.d("error");
}

@Override
public void onNext(String data) {
log.d("all merged data will pass here one by one!")
});

使用timer做定时操作。当有“x秒后执行y操作”类似的需求的时候,想到使用timer

例如:2秒后输出日志“hello world”,然后结束。

Observable.timer(2, TimeUnit.SECONDS)  
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log.d ("completed");
}

@Override
public void onError(Throwable e) {
log.e("error");
}

@Override
public void onNext(Long number) {
log.d ("hello world");
}
});

使用interval做周期性操作。当有“每隔xx秒后执行yy操作”类似的需求的时候,想到使用interval

例如:每隔2秒输出日志“helloworld”。

Observable.interval(2, TimeUnit.SECONDS)  
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log.d ("completed");
}

@Override
public void onError(Throwable e) {
log.e("error");
}

@Override
public void onNext(Long number) {
log.d ("hello world");
}
});

使用schedulePeriodically做轮询请求

Observable.create(new Observable.OnSubscribe<String>() {  
@Override
public void call(final Subscriber<? super String> observer) {

Schedulers.newThread().createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext(doNetworkCallAndGetStringResult());
}
}, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
log.d("polling….”));
}
})

RxJava进行数组、list的遍历

String[] names = {"Tom", "Lily", "Alisa", "Sheldon", "Bill"};  
Observable
.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
log.d(name);
}
});
Contents
  1. 1. 取数据先检查缓存的场景
  2. 2. 界面需要等到多个接口并发取完数据,再更新
  3. 3. 一个接口的请求依赖另一个API请求返回的数据
  4. 4. 界面按钮需要防止连续点击的情况
  5. 5. 响应式的界面
  6. 6. 复杂的数据变换
  7. 7. Scheduler线程切换
  8. 8. 使用debounce做textSearch
  9. 9. Retrofit结合RxJava做网络请求框架
  10. 10. RxJava代替EventBus进行数据传递:RxBus
  11. 11. 使用combineLatest合并最近N个结点
  12. 12. 使用merge合并两个数据源。
  13. 13. 使用timer做定时操作。当有“x秒后执行y操作”类似的需求的时候,想到使用timer
  14. 14. 使用interval做周期性操作。当有“每隔xx秒后执行yy操作”类似的需求的时候,想到使用interval
  15. 15. 使用schedulePeriodically做轮询请求
  16. 16. RxJava进行数组、list的遍历