首页 > 编程 > Java > 正文

Java扩展库RxJava的基本结构与适用场景小结

2019-11-26 14:10:36
字体:
来源:转载
供稿:网友

基本结构

我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。

Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {  @Override  public void call(Subscriber<? super String> subscriber) {    subscriber.onNext("1");    subscriber.onCompleted();  }};Subscriber<String> subscriber1 = new Subscriber<String>() {  @Override  public void onCompleted() {  }  @Override  public void onError(Throwable e) {  }  @Override  public void onNext(String s) {  }};Observable.create(onSubscriber1)    .subscribe(subscriber1);

首先我们来看一下Observable.create的代码

public final static <T> Observable<T> create(OnSubscribe<T> f) {  return new Observable<T>(hook.onCreate(f));}protected Observable(OnSubscribe<T> f) {  this.onSubscribe = f;}

直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。

接下来我们来看看subscribe方法。

public final Subscription subscribe(Subscriber<? super T> subscriber) {  return Observable.subscribe(subscriber, this);}private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {  ...  subscriber.onStart();  hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);  return hook.onSubscribeReturn(subscriber);}

可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?

public void call(Subscriber<? super String> subscriber) {  subscriber.onNext("1");  subscriber.onCompleted();}

RxJava使用场景小结

1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的

final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {  @Override  public void call(Subscriber<? super String> subscriber) {    if (memoryCache != null) {      subscriber.onNext(memoryCache);    } else {      subscriber.onCompleted();    }  }});Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {  @Override  public void call(Subscriber<? super String> subscriber) {    String cachePref = rxPreferences.getString("cache").get();    if (!TextUtils.isEmpty(cachePref)) {      subscriber.onNext(cachePref);    } else {      subscriber.onCompleted();    }  }});Observable<String> network = Observable.just("network");//主要就是靠concat operator来实现Observable.concat(memory, disk, network).first().subscribeOn(Schedulers.newThread()).subscribe(s -> {  memoryCache = "memory";  System.out.println("--------------subscribe: " + s);});

2.界面需要等到多个接口并发取完数据,再更新

//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者private void testMerge() {  Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());  Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());  Observable.merge(observable1, observable2)      .subscribeOn(Schedulers.newThread())      .subscribe(System.out::println);}

3.一个接口的请求依赖另一个API请求返回的数据

举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。

这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫Callback hell

NetworkService.getToken("username", "password")  .flatMap(s -> NetworkService.getMessage(s))  .subscribe(s -> {    System.out.println("message: " + s);  });

4.界面按钮需要防止连续点击的情况

RxView.clicks(findViewById(R.id.btn_throttle))  .throttleFirst(1, TimeUnit.SECONDS)  .subscribe(aVoid -> {    System.out.println("click");  });

5.响应式的界面

比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);RxCompoundButton.checkedChanges(checkBox)    .subscribe(checked.asAction());

6.复杂的数据变换

Observable.just("1", "2", "2", "3", "4", "5")  .map(Integer::parseInt)  .filter(s -> s > 1)  .distinct()  .take(3)  .reduce((integer, integer2) -> integer.intValue() + integer2.intValue())  .subscribe(System.out::println);//9

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表