原文是简书一位猿友写的。写的很实在,不错,值得收藏。http://www.jianshu.com/p/64aa976a46be
想写好一篇文章确实不容易,感谢那些大牛们能够分享自己的所见所得。关于rxjava的原理以及观察者模式等理论性的内容我就不在这里献丑了,网上有很多很优秀的博客。回头我添加几篇到这里,大家可以看一看,这篇文章只是简单的通过例子告诉大家rxjava的基本使用方法,如果有任何的批评、建议或者疑问可以评论,第一时间给予答复。
[TOC]
这一章中,我们将研究可观测序列的本质:过滤。我们将学到如何从发射的Observable中选取我们想要的值,如何获取有限个数的值,如何处理溢出的场景,以及更多的有用的技巧。

first()方法和last()方法很容易弄明白。它们从Observable中只发射第一个元素或者最后一个元素。


skip()和skipLast()函数与take()和takeLast()相对应。它们用整数N作参数,从本质上来说,它们不让Observable发射前N个或者后N个值。
skip(2)
举个例子
Observable.just("hello", "my", "world").skip(1).skipLast(1) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(tag, s); } });//跳过第一个hello和最后一个world,打印myelementAt()函数仅从一个序列中发射第n个元素然后就完成了。如果我们想查找第五个元素但是可观测序列只有三个元素可供发射时该怎么办?我们可以使用elementAtOrDefault()。
下图展示了如何通过使用elementAt(2)从一个序列中选择第三个元素以及如何创建一个只发射指定元素的新的Observable。

举个例子
Observable.just("hello", "my", "world").elementAt(1) .subscribe(); Observable.just("hello", "my", "world").elementAtOrDefault(10, "null") .subscribe();在Observable后面加一个sample(),我们将创建一个新的可观测序列,它将在一个指定的时间间隔里由Observable发射最近一次的数值:
List<String> list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { list.add("i = " + i); } Observable<String> observable = Observable.from(list); observable.sample(50, TimeUnit.MILLISECONDS).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(tag, s); } }); //50毫秒取一次最近的消息进行打印我们可以认为timeout()为一个Observable的限时的副本。如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。我们可以使用timeout()函数来监听源可观测序列,就是在我们设定的时间间隔内如果没有得到一个值则发射一个错误。

RxJava提供了几个mapping函数:map(),flatMap(),concatMap(),flatMapIterable()以及switchMap().所有这些函数都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。让我们用合适的“真实世界”的例子一个个的学习下。
RxJava的map函数接收一个指定的Func对象然后将它应用到每一个由Observable发射的值上。下图展示了如何将一个乘法函数应用到每个发出的值上以此创建一个新的Observable来发射转换的数据。

举个例子:接收一系列的数字,经过map转成string类型,然后打印出来。
Observable.just(1, 2, 3, 4, 5) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "the position is" + integer; } }).subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(tag, s); } });flatMap是map中比较常见但也是比较难理解内。
在复杂的场景中,我们有一个这样的Observable:它发射一个数据序列,这些数据本身也可以发射Observable。RxJava的flatMap()函数提供一种铺平序列的方式,然后合并这些Observables发射的数据,最后将合并后的结果作为最终的Observable。

举个例子:假设我有个天气客户端,某个页面需要获取四个城市(南昌、北京、天津和深圳)的天气。那么首先我们需要传入四个城市名,然后分别根据城市名获取天气信息,然后更行ui更新。
public void flatMap() { Observable.just("南昌", "深圳", "天津", "北京").flatMap(new Func1<String, Observable<WeatherInfo>>() { @Override public Observable<WeatherInfo> call(String s) { return getWeather(s); } }).observeOn(AndroidSchedulers.mainThread()).//更新ui一定要加这句 subscribe(new Subscriber<WeatherInfo>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(WeatherInfo weatherInfo) { //更新ui } }); } //新建天气信息类 class WeatherInfo { } PRivate Observable<WeatherInfo> getWeather(final String city) { Observable<WeatherInfo> observable = Observable.create(new Observable.OnSubscribe<WeatherInfo>() { @Override public void call(Subscriber<? super WeatherInfo> subscriber) { subscriber.onNext(getWeatherInfo(city)); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io());//网络请求一定要加这句 return observable; } private WeatherInfo getWeatherInfo(String city) { //模拟网络请求返回weatherinfo return new WeatherInfo(); } //南昌->能够获取南昌天气的observable->更新uiRxJava的concatMap()函数解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们,如下图所示:

注:拿上面的例子来说我们传入的是"南昌", "深圳", "天津", "北京",flatmap转换之后得到的顺序并不是这样,而是被打乱了的;用concaatmap的话就是按顺序的转换,只有这种区别。
如下图所示,switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

例子参考
SwitchMap
scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用,有点类似于递归操作

举个例子
private Observable<Integer> scanObserver() { return Observable.from(list).scan((x, y) -> x * y).observeOn(AndroidSchedulers.mainThread()); }结果为:

groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。
值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。
groupBy操作符的流程图如下:
调用例子如下:Observable.interval(1, TimeUnit.SECONDS).take(10).groupBy(new Func1<Long, Long>() { @Override public Long call(Long value) { //按照key为0,1,2分为3组 return value % 3; } }).subscribe(new Action1<GroupedObservable<Long, Long>>() { @Override public void call(GroupedObservable<Long, Long> result) { result.subscribe(new Action1<Long>() { @Override public void call(Long value) { System.out.println("key:" + result.getKey() +", value:" + value); } }); } });运行结果如下: key:0, value:0 key:1, value:1 key:2, value:2 key:0, value:3 key:1, value:4 key:2, value:5 key:0, value:6 key:1, value:7 key:2, value:8 key:0, value:9RxJava中的buffer()函数将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。

上图中展示了buffer()如何将count作为一个参数来指定有多少数据项被包在发射的列表中。实际上,buffer()函数有几种变体。其中有一个是允许你指定一个skip值:此后每skip项数据,然后又用count项数据填充缓冲区。如下图所示:

举个例子
List<String> list = new ArrayList<>();for (int i = 0; i < 30; i++){ list.add("hello i:" + i);}Observable.from(list).buffer(4).subscribe(new Subscriber<List<String>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<String> strings) { for (String s : strings) { Log.i(tag, s); } Log.i(tag, "/n next group"); }});RxJava的window()函数和buffer()很像,但是它发射的是Observable而不是列表。下图展示了window()如何缓存3个数据项并把它们作为一个新的Observable发射出去。
List<String> list = new ArrayList<>();for (int i = 0; i < 30; i++) { list.add("hello i:" + i);}Observable.from(list).window(4).subscribe(new Subscriber<Observable<String>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Observable<String> stringObservable) { stringObservable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(tag, s); } }); Log.i(tag, "/n next group"); }});这些Observables中的每一个都发射原始Observable数据的一个子集,数量由count指定,最后发射一个onCompleted()结束。正如buffer()一样,window()也有一个skip变体,如下图所示:

RxJava的cast()函数是本章中最后一个操作符。它是map()操作符的特殊版本。它将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。
public void cast() { List<Person> list = new ArrayList<>(); for (int i = 0; i < 30; i++) { list.add(new Male(true, "i" + i)); } Observable.from(list).cast(Male.class).subscribe(new Subscriber<Male>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Male s) { Log.i(tag, "name:" + s.name + "/n hasJJ:" + s.hasJJ); } });}class Person { String name; public Person(String name) { this.name = name; }}class Male extends Person { public boolean hasJJ; public Male(boolean hasJJ, String name) { super(name); this.hasJJ = hasJJ; }}//多态,我们用Person接收Male对象,然后通过cast转成Male类型。有点instanceof的意思。我们学到如何转换可观测序列。我们也看到了map(),scan(),groupBY(),以及更多有用的函数的实际例子,它们帮助我们操作Observable来创建我们想要的Observable。
我们将研究组合函数并学习如何同时处理多个Observables来创建我们想要的Observable。
RxJava的merge()方法将帮助你把两个甚至更多的Observables合并到他们发射的数据项里。下图给出了把两个序列合并在一个最终发射的Observable。

举个例子
List<String> list1 = new ArrayList<>();for (int i = 0; i < 30; i++) { list1.add("hello i:" + i);}List<String> list = new ArrayList<>();for (int j = 0; j < 30; j++) { list.add("world j:" + j);}List<String> list2 = new ArrayList<>();for (int m = 0; m < 30; m++) { list2.add("fuck m:" + m);}Observable<String> world = Observable.from(list);Observable<String> hello = Observable.from(list1);Observable<String> fuck = Observable.from(list2);Observable.merge(world, hello, fuck).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(tag, s); }});注意错误时的toast消息,你可以认为每个Observable抛出的错误都将会打断合并。如果你需要避免这种情况,RxJava提供了mergeDelayError(),它能从一个Observable中继续发射数据即便是其中有一个抛出了错误。当所有的Observables都完成时,mergeDelayError()将会发射onError(),如下图所示:

我们在处理多源时可能会带来这样一种场景:多从个Observables接收数据,处理它们,然后将它们合并成一个新的可观测序列来使用。RxJava有一个特殊的方法可以完成:zip() 合并两个或者多个Observables发射出的数据项,根据指定的函数Func* 变换它们,并发射一个新值。下图展示了zip() 方法如何处理发射的“numbers”和“letters”然后将它们合并一个新的数据项:

举个例子
List<String> hellos = new ArrayList<>();for (int i = 0; i < 30; i++) { hellos.add("hello i:" + i);}List<Integer> worlds = new ArrayList<>();for (int j = 0; j < 20; j++) { worlds.add(j);}Observable.zip(Observable.from(hellos).subscribeOn(Schedulers.io()), Observable.from(worlds).subscribeOn(Schedulers.io()), new Func2<String, Integer, String>() { @Override public String call(String s, Integer integer) { return "index:" + integer + "/t s:" + s; }}).subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.i(tag, "onCompleted"); } @Override public void onError(Throwable e) { Log.i(tag, "error"); } @Override public void onNext(String s) { Log.i(tag, s); }});//这里只会打印20条,而不是30条,应该是有个observable完成就完成我们提升标准看看如何使用RxJava的调度器来处理多线程和并发编程的问题。我们将学习到如何以响应式的方式创建网络操作,内存访问,以及耗时任务。
为了获得更多出现在代码中的关于公共问题的信息,我们激活了StrictMode模式。
StrictMode帮助我们侦测敏感的活动,如我们无意的在主线程执行磁盘访问或者网络调用。正如你所知道的,在主线程执行繁重的或者长时的任务是不可取的。因为Android应用的主线程时UI线程,它被用来处理和UI相关的操作:这也是获得更平滑的动画体验和响应式App的唯一方法。
为了在我们的App中激活StrictMode,我们只需要在MainActivity中添加几行代码,即onCreate()方法中这样:
阻塞I/O的操作会导致App必须等待结果返回(阻塞结束)才能进行下一步操作。在UI线程上执行一个阻塞操作会将UI强行卡住,直接造成很糟糕的用户体验。
我们激活StrictMode后,我们开始收到了关于我们的App错误操作磁盘I/O的不良信息。
上一条信息告诉我们Utils.storeBitmap()函数执行完耗时998ms:在UI线程上近1秒的不必要的工作和App上近1秒不必要的迟钝。这是因为我们以阻塞的方式访问磁盘。我们的storeBitmap()函数包含了:
它直接访问智能手机的固态存储然后就慢了。我们该如何提高访问速度呢?storeBitmap()函数保存了已安装App的图标。他的返回值类型为void,因此在执行下一个操作前我们毫无理由去等待直到它完成。我们可以启动它并让它执行在不同的线程。近几年来Android的线程管理发生了许多变化,导致App出现诡异的行为。我们可以使用AsyncTask,但是我们要避免掉入前几章里的onPre... onPost...doInBackGround地狱。下面我们将换用RxJava的方式。调度器万岁!
调度器以一种最简单的方式将多线程用在你的Apps的中。它们是RxJava重要的一部分并能很好地与Observables协同工作。它们无需处理实现、同步、线程、平台限制、平台变化而可以提供一种灵活的方式来创建并发程序。
RxJava提供了5种调度器:
.io().computation().immediate().newThread().trampoline()让我们一个一个的来看下它们:
这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复我们之前看到的StrictMode违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。
重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。
这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。
这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。
这个调度器正如它所看起来的那样:它为指定任务启动一个新的线程。
当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。
我们如何利用它来和Observables一起工作呢?RxJava提供了subscribeOn()方法来用于每个Observable对象。subscribeOn()方法用Scheduler来作为参数并在这个Scheduler上执行Observable调用。
在“真实世界”这个例子中,我们调整loadList()函数。首先,我们需要一个新的getApps()方法来检索已安装的应用列表:
getApps()方法返回一个AppInfo的Observable。它先从Android的SharePreferences读取到已安装的应用程序列表。反序列化,并一个接一个的发射AppInfo数据。使用新的方法来检索列表,loadList()函数改成下面这样:
如果我们运行代码,StrictMode将会报告一个不合规操作,这是因为SharePreferences会减慢I/O操作。我们所需要做的是指定getApps()需要在调度器上执行:
Schedulers.io()将会去掉StrictMode的不合规操作,但是我们的App现在崩溃了是因为:
Only the original thread that created a view hierarchy can touch its views.
我们再次回到Android的世界。这条信息简单的告诉我们我们试图在一个非UI线程来修改UI操作。意思是我们需要在I/O调度器上执行我们的代码。因此我们需要和I/O调度器一起执行代码,但是当结果返回时我们需要在UI线程上操作。RxJava让你能够订阅一个指定的调度器并观察它。我们只需在loadList()函数添加几行代码,那么每一项就都准备好了:
observeOn()方法将会在指定的调度器上返回结果:如例子中的UI线程。onBackpressureBuffer()方法将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。做完这些工作之后,如果我们运行App,就会出现已安装的程序列表。
总结:
RxJava为此提供了极其实用的工具:调度器。调度器以及不同应用场景下的优化方案一起,将我们从StrictMode中的不合法操作以及阻塞I/O的方法中解放出来。我们现在可以用简单的,响应式的,并在整个App中保持一致的方式来访问本地存储和网络。
我们的旅程结束了。相信你已经准备好将你的Java应用带到一个新的代码质量水平。你可以享受一个新的编程模式并把更流畅的思维方式应用到日常编程生活中。RxJava提供了一种以面向时序的方式考虑数据的机会:所有事情都是持续变化的,数据在更新,事件在触发,然后你就可以创建事件响应式的、灵活的、运行流畅的App。
刚开始切换到RxJava看起来困难并且耗时,但我们已经体验到了如何通过响应式的方式有效地处理日常问题。现在你可以把你的旧代码迁移到RxJava上:给这些同步getters一种新的响应式。
RxJava是一个正在不断发展和扩大的世界。还有许多方法我们还没有去探索。有些方法甚至还没有,通过RxJava,你可以创建你自己的操作符并把他们发展地更远。
Android是一个好玩的地方,但是它也有局限性。作为一个Android开发者,你可以用RxJava和RxAndroid克服其中的许多。我们用AndroidScheduler只简单提了下RxAndroid,除了在最后一章,你了解了ViewObservable。RxAndroid给了你许多:例如,WidgetObservable,LifecycleObservable。往后将它发展地更长远的任务就取决于你了。
谨记可观测序列就像一条河:它们是流动的。你可以“过滤”(filter)一条河,你可以“转换”(transform)一条河,你可以将两条河合并(combine)成一个,然后依然畅流如初。最后,它就成了你想要的那条河。
“Be Water,my friend” --Bruce Lee
新闻热点
疑难解答