首页 > 编程 > Java > 正文

RxJava 入门(三)-- 操作符简介

2019-11-06 09:34:38
字体:
来源:转载
供稿:网友

在 Rxjava 中,如果把整个 事件流看成 是工厂的流水线,Observable 就是原料,Observer 就是我们的产品经理。产品如何交到我们的产品经理手上?其中重要的就是操作工人(Operator 操作符),它负责在 Observable 发出的事件 和 Observer 的响应之间做一些处理。

操作符分类

Creating ObservablesTransforming ObservablesFiltering ObservablesCombining Observables(合并)Error Handling OperatorsObservable Utility Operators(辅助类)Conditional and Boolean OperatorsMathematical and Aggregate Operators(算数)Connectable Observable Operators(背压)Operators to Convert Observables(连接)自定义操作符

1. Observable的创建 – 创建性操作符

1.1 使用create( ),最基本的创建方式:

Observable<String> observable = Observable.create(new OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("create1"); // 发射一个"create1"的String subscriber.onNext("create2"); // 发射一个"create2"的String subscriber.onCompleted(); // 发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法 } });

1.2 使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据:

justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"

1.3 使用from( ),遍历集合,发送每个item

List<String> list = new ArrayList<>();list.add("from1");list.add("from2");list.add("from3");fromObservable = Observable.from(list); //遍历list 每次发送一个/** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /

1.4 使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

deferObservable = Observable.defer(new Func0<Observable<String>>() { @Override //注意此处的call方法没有Subscriber参数 public Observable<String> call() { return Observable.just("deferObservable"); }});

1.5 使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器

intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次

1.6 使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常

rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14

1.7 使用timer( ),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法

timeObservable = Observable.timer(3, TimeUnit.SECONDS); //3秒后发射一个值

1.8 使用repeat( ),创建一个重复发射特定数据的Observable:

repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次

2. Observer的创建

// 2.创建观察者Observer Observer<Student> observer = new Observer<Student>() { @Override public void onCompleted() { System.out.PRintln("onCompleted"); } @Override public void onError(Throwable arg0) { System.out.println("onError"); } @Override public void onNext(Student arg0) { System.out.println("onNext = " + arg0); } };

3. 关联Observable 和 Observer 形成 RxJava

有了Observable和Obsever,我们就可以随便玩了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子

// 3.被观察者订阅观察者 observable // 建议在这修改数据 .map(new Func1<String, String>() { // 第一个参数决定 call方法类型,第二个参数决定返回值类型 @Override public String call(String arg0) { return arg0 + "汤圆"; } }) .subscribe(observer);

observeronNext方法将会依次收到来自observable的数据”just1”、”just2”,另外,如果你不在意数据是否接收完或者是否出现错误,即不需要ObserveronCompleted()onError()方法,可使用Action1subscribe()支持将Action1作为参数传入,RxJava将会调用它的call方法来接收数据,代码如下:

observable.subscribe(new Action1<String>() { @Override public void call(String s) { LogUtil.log(s); }});

4. talk is cheaper without code

4.1 获取SD卡中所有以.jpg结尾的文件

public void getFiles() { String basePath = Environment.getExternalStorageDirectory().getPath(); File rootFile = new File(basePath); Observable.just(rootFile) .flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return listFiles(file); } }) .filter(new Func1<File, Boolean>() { @Override public Boolean call(File file) { return file.getName().endsWith(".jpg"); } }) .map(new Func1<File, String>() { @Override public String call(File file) { return file.getName(); } }) .toList() .subscribe(new Observer<List<String>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<String> lists) { Log.e("test", "onNext: size =" + lists.size()); Log.e("test", "onNext: " + lists); } }); } //递归判断是不是文件夹 private Observable<File> listFiles(File file){ if (file.isDirectory()){ return Observable.from(file.listFiles()).flatMap(new Func1<File, Observable<File>>() { @Override public Observable<File> call(File file) { return listFiles(file); } }); }else { return Observable.just(file); } }

4.2 从数据库的用户表查找用户数据

Observable.create(new Observable.OnSubscribe<List<User>>() { @Override public void call(Subscriber<? super List<User>> subscriber) { List<User> userList = null; ··· //从数据库获取用户表数据并赋给userList ··· subscriber.onNext(userList); } }).subscribe(new Action1<List<User>>() { @Override public void call(List<User> users) { //获取到用户信息列表 } });

但是,领导突然又不想要所有用户了,只要名字叫“小明”的用户,行吧,领导最大,我改(假设名字唯一):

Observable.create(new Observable.OnSubscribe<List<User>>() { @Override public void call(Subscriber<? super List<User>> subscriber) { List<User> userList = null; ··· //从数据库获取用户表数据并赋给userList ··· subscriber.onNext(userList); } }).flatMap(new Func1<List<User>, Observable<User>>() { @Override public Observable<User> call(List<User> users) { return Observable.from(users); } }).filter(new Func1<User, Boolean>() { @Override public Boolean call(User user) { return user.getName().equals("小明"); } }).subscribe(new Action1<User>() { @Override public void call(User user) { //拿到谜之小明的数据 } });

搞定,这时候领导又说,我不要小明了,我要小明的爸爸的数据,(坑爹啊~~),我继续改:

Observable.create(new Observable.OnSubscribe<List<User>>() { @Override public void call(Subscriber<? super List<User>> subscriber) { List<User> userList = null; ··· //从数据库获取用户表数据并赋给userList ··· subscriber.onNext(userList); } }).flatMap(new Func1<List<User>, Observable<User>>() { @Override public Observable<User> call(List<User> users) { return Observable.from(users); } }).filter(new Func1<User, Boolean>() { @Override public Boolean call(User user) { return user.getName().equals("小明"); } }).map(new Func1<User, User>() { @Override public User call(User user) { //根据小明的数据user从数据库查找出小明的父亲user2 return user2; } }).subscribe(new Action1<User>() { @Override public void call(User user2) { //拿到谜之小明的爸爸的数据 } });

RxJava在需求不断变更、逻辑愈加复杂的情况下,依旧可以保持代码简洁、可阅读性强的一面,没有各种回调,也没有谜之缩进!


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表