首页 > 编程 > Java > 正文

RXJava学习(二):Observable 和observel订阅流程的详细介绍

2019-11-09 14:43:32
字体:
来源:转载
供稿:网友

   在上篇文章简单的介绍了RXjava的使用与概念,在这篇文章中将会大概的介绍Observable和Observel、Observel的实现类Subscriber,详细地分析subscribe订阅的代码及流程。

Observel和Subscriber

  Observel是一个接口,至于为什么是一个接口,自行体会微笑。Observel其中有三个回调的方法:

        OnNext():其作用相当于监听的OnClick(),在该方法中做具体的事件处理功能。

        OnCompelted():队列事件的完结。在RxJAVA中规定只有不再有新的OnNext()事件的发送,才会执行该方法。RxJava不仅会单独处理每个事件,还会把他们当作队列来看待

        OnError():事件队列异常。在事件队列处理过程中出现异常,触发onError()。且队列事件的处理也会随之中断。

   注:OnCompleted和OnError是互斥的,只会触发其中一个方法

   subscriber实现了Observel,所以回调方法也有Observel的回调方法,除此之外还有一一个onStart回调方法。也因为它还实现了Subscription所特有了unSubscribe()和isUnsubscribed()方法。

    Onstart():该方法执行在创建订阅关系后,发送事件处理前。可作数据的初始和清零的处理。

    unsubscribe():取消observel和Observable的订阅,取消后将不再接受事件的处理。

    isUnsubscribed():在取消之前判断是否有订阅关系。

    注:在订阅后一定要取消订阅,以防止内存泄露的风险。

Observable:

Observable的创建有三种办法:create,from,just三种办法(具体的创建请看RXjava学习(一) )。

   Create()方法RXjava提供了不同参数的多种方法,可根据需要选择。

   Observable与observel则是通过Subscribe(observel/subscriber/actionX)来完成定阅关系。

   ActionX()是subscribe支持不完整定义的回调,RXjava会自动根据定义创建出Subscriber。见下图

 

下面是Subscribe的流程:

   Observable.subscribe()方法的具体实现

  If的判断目的是若是传递来的参数就是Subscriber的话不再新建subscripter。在该处可以看出若是传递来的参数时Observel的话也是会转换成subscriber来接受事物的。

接下来subscribe处理:

接下的处理

PRivatestatic<T> Subscription subscribe(Subscriber<?super T>subscriber, Observable<T>observable) { // validate and proceed    if (subscriber ==null) {        throw new IllegalArgumentException("observer can not benull");    }    if (observable.onSubscribe== null) {        throw new IllegalStateException("onSubscribe functioncan not be null.");        /*         * the subscribe function can also beoverridden but generally that's not the appropriate approach         * so I won't mention that in theexception         */    }    // new Subscriber so onStart it    subscriber.onStart();该方法就是subscriber中Onstart的执行的地方    /*     * Seehttps://github.com/ReactiveX/RxJava/issues/216 for discussion on"Guideline 6.4: Protect calls     * to user code from within anObserver"     */    // if not already wrapped

   //这段代码是什么意思呢,标注为SafaSubscriber后面介绍    if (!(subscriber instanceof SafeSubscriber)) {       

//assign to `observer` so we return the protected version        subscriber = new SafeSubscriber<T>(subscriber);    }    // The code below is exactly the same anunsafeSubscribe but not used because it would    // add a significant depth to alreadyhuge call stacks.    try {        // allow the hook to intercept and/ordecorate       hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);        return hook.onSubscribeReturn(subscriber);    } catch (Throwable e) {        // special handling for certainThrowable/Error/Exception types        Exceptions.throwIfFatal(e);        // if an unhandled error occurs executingthe onSubscribe we will propagate it        try {            subscriber.onError(hook.onSubscribeError(e));        } catch (Throwable e2) {            Exceptions.throwIfFatal(e2);            // if this happens it means the onErroritself failed (perhaps an invalid function implementation)            // so we are unable to propagate theerror correctly and will just throw            RuntimeException r = newRuntimeException("Error occurredattempting to subscribe ["+ e.getMessage() + "] and then again while trying topass to onError.", e2);            // TODO could the hook be the cause ofthe error in the on error handling.            hook.onSubscribeError(r);            // TODO why aren't we throwing the hook'sreturn value.            throw r;        }        return Subscriptions.unsubscribed();    }}

其中重要的代码就是

//allow the hook to intercept and/or decorate        hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);        return hook.onSubscribeReturn(subscriber);那hook是RxJavaObservableExecutionHook的对象,这是一个什么类呢,察看该类并未有什么特别的处理就是传递的什么参数返回什么值的一个类。

 hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber)在该处掉用之前observabel的实现call方法。 最后在调用OnSubScriberReturn返回subscription。

 至于观察者Observel的回调方法是怎么调用的,那就是在call方法中调用哪些方法执行哪些方法了。。。

在上面subscribe中标注了一个safesubscriber的代码那块是什么意思(红色字体)??

原来是如果传过来的不实safesubscriber的话会转换类型后在处理,那转换类型后有什么不同呢?带着问题进行下面的分析。

其中的oncompleted()方法的实现:

 

在OnNext()和OnError()中都回执行_onError()方法,不同的是onNext是出现异常的时候会触发。

 protectedvoid_onError(Throwablee) {    RxJavaPluginUtils.handleException(e);    try {       actual.onError(e);    } catch (Throwable e2) {        if (e2instanceof OnErrorNotImplementedException) {            /*             * onError isn't implementedso throw             *  https://github.com/ReactiveX/RxJava/issues/198             * Rx Design Guidelines 5.2             * "when calling theSubscribe method that only has an onNext argument, the OnError behavior             * will be to rethrow theexception on the thread that the message comes out from the observable             * sequence. The OnCompletedbehavior in this case is to do nothing."             */            try {                unsubscribe();            } catch (Throwable unsubscribeException) {                RxJavaPluginUtils.handleException(unsubscribeException);                throw new RuntimeException("Observer.onError notimplemented and error while unsubscribing.",new CompositeException(Arrays.asList(e,unsubscribeException)));            }            throw (OnErrorNotImplementedException) e2;        } else {            /*             * throw since the Rxcontract is broken if onError failed             *              *https://github.com/ReactiveX/RxJava/issues/198             */            RxJavaPluginUtils.handleException(e2);            try {                unsubscribe();            } catch (Throwable unsubscribeException) {                RxJavaPluginUtils.handleException(unsubscribeException);                throw new OnErrorFailedException("Error occurred whentrying to propagate error to Observer.onError and during unsubscription.",new CompositeException(Arrays.asList(e,e2, unsubscribeException)));            }            throw new OnErrorFailedException("Error occurred whentrying to propagate error to Observer.onError",new CompositeException(Arrays.asList(e,e2)));        }    }    // if we did not throw above we willunsubscribe here, if onError failed then unsubscribe happens in the catch    try {        unsubscribe();    } catch (RuntimeException unsubscribeException) {        RxJavaPluginUtils.handleException(unsubscribeException);        throw new OnErrorFailedException(unsubscribeException);    }}

仔细的看该类的代码发现在最后都会触发unSubscribe()方法,取消订阅。

在这里Observer和Subscriber作为参数传递处理第二次的事件队列请求时就会出现问题了。

Observable.subscribe(subscriber)就会出现当你在执行了一次事件的处理后,再进行第二的时候就不会执行了。而Observable.subscribe(observel)就不会出现给问题。

在使用的时候注意该处的区别。。

 

参考文章:http://gank.io/post/560e15be2dca930e00da1083#toc_1


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表