Rxjava是Reactive Extensions的Java VM实现:用于通过使用可观察序列来编译异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加运算符,允许您以声明方式组合序列,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。
compile "io.reactivex.rxjava2:rxjava:2.x.y"二是编写Hello World程序:package rxjava.examples;import io.reactivex.*;public class HelloWorld { public static void main(String[] args) { Flowable.just("Hello world").subscribe(System.out::PRintln); }}如果您的平台不支持Java 8 lambdas(尚未),则必须手动创建Consumer的内部类:Flowable.just("Hello world") .subscribe(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } );RxJava 2具有几个基类,您可以发现运算符:io.reactivex.Flowable : 0..N flows, supporting Reactive-Streams and backpressureio.reactivex.Observable: 0..N flows, no backpressureio.reactivex.Single: a flow of exactly 1 item or an errorio.reactivex.Completable: a flow without items but only a completion or error signalio.reactivex.Maybe: a flow with no items, exactly one item or an errorRxJava的一个常见用例是在后台线程上运行一些计算,网络请求,并在UI线程上显示结果(或错误):Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done";}) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace);Thread.sleep(2000); // <--- wait for the flow to finish这种类型的链接方法称为流利的API,类似于构建器模式。 然而,RxJava的反应类型是不可变的;每个方法调用返回一个新的具有添加行为的Flowable。 为了说明,该示例可以重写如下:Flowable<String> source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done";});Flowable<String> runBackground = source.subscribeOn(Schedulers.io());Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());showForeground.subscribe(System.out::println, Throwable::printStackTrace);Thread.sleep(2000);通常,您可以通过subscribeOn将计算或阻止IO移动到其他线程。一旦数据准备就绪,您可以确保它们在前台或GUI线程上通过observeOn处理。RxJava操作符不直接工作线程或执行服务器,而是使用所谓的调度器抽象出统一API后面的并发源。RxJava 2具有多个可通过Schedulers实用程序类访问的标准调度程序。这些在所有JVM平台上都可用,但是某些特定平台(如Android)有自己的典型调度器定义:AndroidSchedulers.mainThread(),SwingScheduler.instance()或JavaFXSchedulers.gui()。Thread.sleep(2000);到底是没有意外。在RxJava中,默认调度程序在守护线程上运行,这意味着一旦Java主线程退出,它们都会停止,并且后台计算可能永远不会发生。在这个例子中,休眠一段时间,让我们看看控制台上的流的输出,有时间。RxJava中的流在本质上是顺序的,分成可以彼此并行运行的处理阶段:Flowable.range(1, 10) .observeOn(Schedulers.computation()) .map(v -> v * v) .blockingSubscribe(System.out::println);此示例流在计算调度程序上将1到10的数字进行平方,并在“主”线程(更准确地说是blockingSubscribe的调用程序线程)上使用结果。然而,对于该流程,lambda v→v * v不并行运行; 它接收相同计算线程上的值1到10。并行处理数字1到10:Flowable.range(1, 10) .flatMap(v -> Flowable.just(v) .subscribeOn(Schedulers.computation()) .map(w -> w * w) ).blockingSubscribe(System.out::println);实际上,RxJava中的并行性意味着运行独立流并将其结果合并回单个流。 运算符flatMap通过首先将从1到10的每个数字映射到其自己的单独的Flowable中,运行它们并且合并计算的平方。从2.0.5开始,有一个实验运算符parallel()和类型ParallelFlowable,有助于实现相同的并行处理模式:Flowable.range(1, 10).parallel().runOn(Schedulers.computation()).map(v -> v * v).sequential().blockingSubscribe(System.out::println);flatMap是一个强大的运算符,在很多情况下都有帮助。 例如,给定返回Flowable的服务,我们想使用第一个服务发出的值调用另一个服务:Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();inventorySource.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId()) .map(demand -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));).subscribe();但是,请注意,flatMap不保证任何顺序,内部流的最终结果可能会交织。 还有其他操作符:concatMap,用于映射和运行一个内部流concatMap Eager,它“一次”运行所有内部流,但输出流将按照创建内部流的顺序。
新闻热点
疑难解答