“在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。”
(来自给 Android 开发者的 RxJava 详解)
Scheduler 的作用是用于指定某个代码逻辑应该运行在哪个线程。

RxJava中的5种默认调度器

RxJava默认情况下提供了5种调度器:
.io()

用于读写文件、读写数据库、网络信息交互等IO操作
.io() is backed by an unbounded thread-pool and is the sort of thing you'd use for non-computationally intensive tasks, that is stuff that doesn't put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
创建一个用于IO操作的线程的调度器,内部实现机制是通过一个自动增长的线程池来实现的,这种机制可以用于异步操作阻塞IO工作,不要使用这个调度器来完成计算类工作,要完成计算类工作请使用computation()

.computation()

.computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule cpu intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it's potentially a big performance hit.
这个调度器一般用于运行CPU密集型的计算任务,这些任务不会被IO等阻塞操作给阻塞,这个 Scheduler 使用大小为 CPU 核数的线程池来完成工作。同上面介绍的IO调度器一样,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

.immediate()

Creates and returns a Scheduler that executes work immediately on the current thread.
使用当前线程来完成任务,这是默认的 Scheduler。

.newThread()

Creates and returns a Scheduler that creates a new Thread for each unit of work.
创建新线程,并在新线程执行操作。

.trampoline()

Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.
这种方式拥有一个队列,当我们想在当前线程执行一个任务,而是立即,我们可以先将这个任务存入队列,这个调度器将会按顺序运行队列中每一个任务。

为被观察者及订阅者指定线程

要为某个任务指定运行在哪个线程可以使用subscribeOn() 和 observeOn()两个方法:
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
这里大家可能会碰到一个 AndroidSchedulers.mainThread(),顾名思义它就是Android的主线程。

int drawableRes = R.drawable.testImage;
ImageView imageView = (ImageView)findviewById(R.id.imageView);
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
});

observeOn() 指定的是它之后的操作所运行的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次,和observeOn不同的是subscribeOn()只能调用一次。

Observable.from(testEventList) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.flatmap(......) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(.......) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // 回到主线程

在学习的期间我们会看到还有个比较有意思是方法-onBackpressureBuffer()
它将将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。从而不会照成消费者的拥堵。

Contents
  1. 1. RxJava中的5种默认调度器
  2. 2. 为被观察者及订阅者指定线程