上一篇博文我们已经介绍了事件源的生成方法,介绍了8种事件源,但是这些事件源只是一些基本的事件源,一般都是触发我们需要事件的触发器,比如我们需要定时刷新网络,那么我们需要使用到interval作为原始的事件源,
但是我们一般会在这个事件源的基础上作一定的处理,而对事件源的过滤也属于这些过滤中的一种,在本篇博文中我们就来学习下这些事件源的过滤方法:

  • Filter
    filter()方法根据我们传入的判决方法过滤我们不想要的值.

    Observable.timer(10,1, TimeUnit.SECONDS)
    .filter(new Func1<Long, Boolean>() {
    @Override
    public Boolean call(Long aLong) {
    return (aLong % 2 == 0);
    }
    })
    .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(Long aLong) {
    Log.i("xiaohai.lin", "aLong = " + aLong);
    }
    });

    这个方法返回true的事件将会被选中留下。

  • Take
    只接收从原始事件源发出的前N个元素。

    Observable.timer(10,1, TimeUnit.SECONDS).take(10)
    .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(Long aLong) {
    Log.i("xiaohai.lin", "aLong = " + aLong);
    }
    });
  • TakeLast
    只接收从原始事件源发出的最后N个元素

    Observable.range(10,100).takeLast(10)
    .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(Integer integer) {
    Log.i("xiaohai.lin",String.valueOf(integer));
    }
    });
  • Skip
    忽略从原始事件源发出的前N个元素。

    Observable.interval(1, TimeUnit.SECONDS).skip(10).subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(Long aLong) {
    Log.i("xiaohai.lin", "aLong = " + aLong);
    }
    });
  • SkipLast
    忽略从原始事件源发出的最后N个元素

Observable.range(1,100).skipLast(10).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.i("xiaohai.lin",String.valueOf(integer));
}
});
  • Fist , Last , firstOrDefault , lastOrDefault

first() : 只留下原始数据源的第一个元素

Observable.range(1, 10).first().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.i("xiaohai.lin",String.valueOf(integer));
}
});

last() : 只留下原始数据源的最后一个元素

Observable.range(1, 10).last().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Integer integer) {
Log.i("xiaohai.lin",String.valueOf(integer));
}
});

firstOrDefault():当事件源不存在的时候指定发射一个默认的值
lastOrDefault() : 当事件源不存在的时候指定发射一个默认的值

  • ElementAt ElementAtOrDefault

    ElementAt 返回第N个元素。
    ElementAtOrDefault 返回第N个元素如果没有指定的元素将用某个默认值来替代

    Observable.range(1, 100).elementAt(50).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(Integer integer) {
    Log.i("xiaohai.lin",String.valueOf(integer));
    }
    });

    Observable.range(1, 100).elementAtOrDefault(1000,500).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onNext(Integer integer) {
    Log.i("xiaohai.lin",String.valueOf(integer));
    }
    });
  • Timeout
    这个更好理解了,它一般用于对时效性有要求的事件。在超过规定时间如果没有完成则会调用onError方法。

    Observable.interval(3,TimeUnit.SECONDS).timeout(1,TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable throwable) {
    Log.i("xiaohai.lin", "Time out Error");
    }
    @Override
    public void onNext(Long aLong) {
    Log.i("xiaohai.lin", "aLong = " + aLong);
    }
    });
  • Distinct distinctUntilChanged

    distinct() 根据字面意思很好理解就是对原始事件源过滤处理只剩下不重复的元素
    distinctUntilChanged() 这个可以让我们忽略掉重复的值只有在事件源的值发生改变时才会通过过滤器

  • Sampling
    这个按照字面值也很好理解,就是采样的意思,比如一个事件源是按照每秒发布一次数据,但是我们实际只需要30秒采样一次样本数据,这种情况就可以使用Sampling进行处理。

  • Debounce
    过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。

Contents