首页 > 编程 > Java > 正文

RxJava: Reactive Extensions for the JVM

2019-11-09 18:55:50
字体:
来源:转载
供稿:网友

Rxjava是Reactive Extensions的Java VM实现:用于通过使用可观察序列来编译异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加运算符,允许您以声明方式组合序列,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。

入门

第一步是将RxJava 2包含到您的项目中,例如,作为Gradle编译依赖关系:
compile "io.reactivex.rxjava2:rxjava:2.x.y"二是编写Hello World程序:
package rxjava.examples;import io.reactivex.*;public class HelloWorld {    public static void main(String[] args) {        Flowable.just("Hello world").subscribe(System.out::PRintln);    }}如果您的平台不支持Java 8 lambdas(尚未),则必须手动创建Consumer的内部类:
Flowable.just("Hello world")  .subscribe(new Consumer<String>() {      @Override public void accept(String s) {          System.out.println(s);      }  );RxJava 2具有几个基类,您可以发现运算符:
io.reactivex.Flowable : 0..N flows, supporting Reactive-Streams and backpressureio.reactivex.Observable: 0..N flows, no backpressureio.reactivex.Single: a flow of exactly 1 item or an errorio.reactivex.Completable: a flow without items but only a completion or error signalio.reactivex.Maybe: a flow with no items, exactly one item or an errorRxJava的一个常见用例是在后台线程上运行一些计算,网络请求,并在UI线程上显示结果(或错误):
Flowable.fromCallable(() -> {    Thread.sleep(1000); //  imitate expensive computation    return "Done";})  .subscribeOn(Schedulers.io())  .observeOn(Schedulers.single())  .subscribe(System.out::println, Throwable::printStackTrace);Thread.sleep(2000); // <--- wait for the flow to finish这种类型的链接方法称为流利的API,类似于构建器模式。 然而,RxJava的反应类型是不可变的;每个方法调用返回一个新的具有添加行为的Flowable。 为了说明,该示例可以重写如下:
Flowable<String> source = Flowable.fromCallable(() -> {    Thread.sleep(1000); //  imitate expensive computation    return "Done";});Flowable<String> runBackground = source.subscribeOn(Schedulers.io());Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());showForeground.subscribe(System.out::println, Throwable::printStackTrace);Thread.sleep(2000);通常,您可以通过subscribeOn将计算或阻止IO移动到其他线程。一旦数据准备就绪,您可以确保它们在前台或GUI线程上通过observeOn处理。RxJava操作符不直接工作线程或执行服务器,而是使用所谓的调度器抽象出统一API后面的并发源。RxJava 2具有多个可通过Schedulers实用程序类访问的标准调度程序。这些在所有JVM平台上都可用,但是某些特定平台(如Android)有自己的典型调度器定义:AndroidSchedulers.mainThread(),SwingScheduler.instance()或JavaFXSchedulers.gui()。Thread.sleep(2000);到底是没有意外。在RxJava中,默认调度程序在守护线程上运行,这意味着一旦Java主线程退出,它们都会停止,并且后台计算可能永远不会发生。在这个例子中,休眠一段时间,让我们看看控制台上的流的输出,有时间。RxJava中的流在本质上是顺序的,分成可以彼此并行运行的处理阶段:
Flowable.range(1, 10)  .observeOn(Schedulers.computation())  .map(v -> v * v)  .blockingSubscribe(System.out::println);此示例流在计算调度程序上将1到10的数字进行平方,并在“主”线程(更准确地说是blockingSubscribe的调用程序线程)上使用结果。然而,对于该流程,lambda v→v * v不并行运行; 它接收相同计算线程上的值1到10。并行处理数字1到10:
Flowable.range(1, 10)  .flatMap(v ->      Flowable.just(v)        .subscribeOn(Schedulers.computation())        .map(w -> w * w)  ).blockingSubscribe(System.out::println);实际上,RxJava中的并行性意味着运行独立流并将其结果合并回单个流。 运算符flatMap通过首先将从1到10的每个数字映射到其自己的单独的Flowable中,运行它们并且合并计算的平方。从2.0.5开始,有一个实验运算符parallel()和类型ParallelFlowable,有助于实现相同的并行处理模式:
Flowable.range(1, 10).parallel().runOn(Schedulers.computation()).map(v -> v * v).sequential().blockingSubscribe(System.out::println);flatMap是一个强大的运算符,在很多情况下都有帮助。 例如,给定返回Flowable的服务,我们想使用第一个服务发出的值调用另一个服务:
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();inventorySource.flatMap(inventoryItem ->    erp.getDemandAsync(inventoryItem.getId())    .map(demand         -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));).subscribe();但是,请注意,flatMap不保证任何顺序,内部流的最终结果可能会交织。 还有其他操作符:concatMap,用于映射和运行一个内部流concatMap Eager,它“一次”运行所有内部流,但输出流将按照创建内部流的顺序。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表