首页 > 系统 > Android > 正文

详解用RxJava实现事件总线(Event Bus)

2019-10-22 18:22:43
字体:
来源:转载
供稿:网友

目前大多数开发者使用EventBus或者Otto作为事件总线通信库,对于RxJava使用者来说,RxJava也可以轻松实现事件总线,因为它们都依据于观察者模式。

不多说,上代码

/*** RxBus* Created by YoKeyword on 2015/6/17.*/public class RxBus {  private static volatile RxBus defaultInstance;  private final Subject<Object, Object> bus;  // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者  public RxBus() {   bus = new SerializedSubject<>(PublishSubject.create());  }  // 单例RxBus  public static RxBus getDefault() {    if (defaultInstance == null) {      synchronized (RxBus.class) {        if (defaultInstance == null) {          defaultInstance = new RxBus();        }      }    }    return defaultInstance ;  }  // 发送一个新的事件  public void post (Object o) {    bus.onNext(o);  }  // 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者  public <T> Observable<T> toObservable (Class<T> eventType) {    return bus.ofType(eventType);//    这里感谢小鄧子的提醒: ofType = filter + cast//    return bus.filter(new Func1<Object, Boolean>() {//      @Override//      public Boolean call(Object o) {//        return eventType.isInstance(o);//      }//    }) .cast(eventType);  }}

注:

1、Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

2、PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

3、ofType操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子  的提醒)

public final <R> Observable<R> ofType(final Class<R> klass) {  return filter(new Func1<T, Boolean>() {    @Override    public final Boolean call(T t) {      return klass.isInstance(t);    }  }).cast(klass);}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。

cast操作符可以将一个Observable转换成指定类型的Observable。

分析:

RxJava实现事件总线,RxJava,Event,Bus

RxBus工作流程图

1、首先创建一个可同时充当Observer和Observable的Subject;

2、在需要接收事件的地方,订阅该Subject(此时Subject是作为Observable),在这之后,一旦Subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至Subject,此时Subject作为Observer接收到事件(onNext),然后会发射给所有订阅该Subject的订阅者。

对于RxBus的使用,就和普通的RxJava订阅事件很相似了。

先看发送事件的代码:

RxBus.getDefault().post(new UserEvent (1, "yoyo"));

userEvent是要发送的事件,如果你用过EventBus, 很容易理解,UserEvent的代码:

public class UserEvent {  long id;  String name;  public UserEvent(long id,String name) {    this.id= id;    this.name= name;  }  public long getId() {    return id;  }  public String getName() {    return name;  }}

再看接收事件的代码:

// rxSubscription是一个Subscription的全局变量,这段代码可以在onCreate/onStart等生命周期内rxSubscription = RxBus.getDefault().toObserverable(UserEvent.class)    .subscribe(new Action1<UserEvent>() {        @Override        public void call(UserEvent userEvent) {          long id = userEvent.getId();          String name = userEvent.getName();          ...        }      },    new Action1<Throwable>() {      @Override      public void call(Throwable throwable) {        // TODO: 处理异常      }        });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

@Overrideprotected void onDestroy() {  super.onDestroy();  if(!rxSubscription.isUnsubscribed()) {    rxSubscription.unsubscribe();  }}

这样,一个简单的Event Bus就实现了!如果你的项目已经开始使用RxJava,也许可以考虑替换掉EventBus或Otto,减小项目体积。

RxBus、EventBus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、Subject来代替事件总线。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VEVB武林网。


注:相关教程知识阅读请移步到Android开发频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表