首页 > 系统 > Android > 正文

RxJava取消订阅的各种方式的实现

2019-12-12 00:05:36
字体:
来源:转载
供稿:网友

手动取消订阅

Consumer类型

Observable创建返回Disposable取消

public class SecondActivity extends AppCompatActivity {  private static final String TAG = "SecondActivity";  private Disposable disposable;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_second);    disposable = Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Consumer<String>() {          @Override          public void accept(String s) throws Exception {            Log.d(TAG, "accept: "+s);          }        });  }  @Override  protected void onDestroy() {    super.onDestroy();    Log.d(TAG, "onDestroy: ");    //取消订阅    if(disposable != null && !disposable.isDisposed()){      disposable.dispose();      Log.d(TAG, "onDestroy: dispose");    }  }}

普通类型Observer

在Observer中获取Disposable然后取消

public class ThirdActivity extends AppCompatActivity {  private static final String TAG = "ThirdActivity";  Disposable disposable;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_third);    Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {            disposable = d;          }          @Override          public void onNext(String s) {            Log.d(TAG, "onNext: "+s);          }          @Override          public void onError(Throwable e) {            Log.d(TAG, "onError: ");          }          @Override          public void onComplete() {            Log.d(TAG, "onComplete: ");          }        });  }  @Override  protected void onDestroy() {    super.onDestroy();    Log.d(TAG, "onDestroy: ");    //然后在需要取消订阅的地方调用即可    if (disposable != null && !disposable.isDisposed()) {      Log.d(TAG, "dispose: ");      disposable.dispose();    }  }}

DisposableObserver类型

利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消

public class FourthActivity extends AppCompatActivity {  private static final String TAG = "FourthActivity";  private DisposableObserver<String> observer;  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_fourth);    observer = Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribeWith(new DisposableObserver<String>() {      @Override      public void onNext(String o) {        Log.d(TAG, "onNext: "+o);      }      @Override      public void onError(Throwable e) {        Log.d(TAG, "onError: ");      }      @Override      public void onComplete() {        Log.d(TAG, "onComplete: ");      }    });  }  @Override  protected void onDestroy() {    super.onDestroy();    if (observer != null && !observer.isDisposed()) {      Log.d(TAG, "dispose: ");      observer.dispose();    }  }}

取消多个Observer

把多个Observer添加CompositeDisposable,一次取消

public class ComDisposableActivity extends AppCompatActivity {  private Disposable disposable1;  private Disposable disposable2;  private static final String TAG = "ComDisposableActivity";  @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_com_disposable);    Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .doOnDispose(new Action() {          @Override          public void run() throws Exception {            Log.d(TAG, "run: Unsubscribing subscription from onCreate()");          }        })        .subscribe(new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {            disposable1 = d;          }          @Override          public void onNext(String s) {            Log.d(TAG, "onNext: "+s);          }          @Override          public void onError(Throwable e) {            Log.d(TAG, "onError: ");          }          @Override          public void onComplete() {            Log.d(TAG, "onComplete: ");          }        });    Observable.create(new ObservableOnSubscribe<String>() {      @Override      public void subscribe(ObservableEmitter<String> emitter) throws Exception {        try {          Thread.sleep(5000);          emitter.onNext("testInfo");        } catch (InterruptedException e) {          e.printStackTrace();        }      }    }).subscribeOn(Schedulers.io())        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Observer<String>() {          @Override          public void onSubscribe(Disposable d) {            disposable2 = d;          }          @Override          public void onNext(String s) {            Log.d(TAG, "onNext: "+s);          }          @Override          public void onError(Throwable e) {            Log.d(TAG, "onError: ");          }          @Override          public void onComplete() {            Log.d(TAG, "onComplete: ");          }        });  }  @Override  protected void onDestroy() {    super.onDestroy();    CompositeDisposable compositeDisposable = new CompositeDisposable();    //批量添加    compositeDisposable.add(disposable1);    compositeDisposable.add(disposable2);    //最后一次性全部取消订阅    compositeDisposable.dispose();  }}

RxLifecyle取消

OnDestory取消

Observable.interval(1, TimeUnit.SECONDS)        .doOnDispose(new Action() {          @Override          public void run() throws Exception {            Log.d(TAG, "Unsubscribing bindToLifecycle from onDestroy()");          }        })        .compose(this.<Long>bindToLifecycle())        .subscribe(new Consumer<Long>() {          @Override          public void accept(Long num) throws Exception {            Log.d(TAG, "accept: " + num);          }        });

指定生命周期取消

Observable.interval(1,TimeUnit.SECONDS)        .doOnDispose(new Action() {          @Override          public void run() throws Exception {            Log.d(TAG, "Unsubscribing UbindUntilEvent from onPause()");          }        }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))        .subscribe(new Consumer<Long>() {          @Override          public void accept(Long aLong) throws Exception {            Log.d(TAG, "bindUntilEvent accept: " + aLong);          }        });

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表