首页 > 编程 > Java > 正文

Rxjava功能操作符的使用方法详解

2019-11-26 10:57:38
字体:
来源:转载
供稿:网友

Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'// Because RxAndroid releases are few and far between, it is recommended you also// explicitly depend on RxJava's latest version for bug fixes and new features.  compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'
import android.os.Bundle;import android.support.v7.app.AppCompatActivity;import android.view.View;import android.widget.TextView; import com.alibaba.fastjson.JSONObject; import java.io.IOException;import java.util.concurrent.TimeUnit; import io.reactivex.BackpressureStrategy;import io.reactivex.Flowable;import io.reactivex.FlowableEmitter;import io.reactivex.FlowableOnSubscribe;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.android.schedulers.AndroidSchedulers;import io.reactivex.annotations.NonNull;import io.reactivex.disposables.Disposable;import io.reactivex.functions.BiFunction;import io.reactivex.functions.Consumer;import io.reactivex.functions.Function;import io.reactivex.schedulers.Schedulers;import okhttp3.Call;import okhttp3.Callback;import okhttp3.OkHttpClient;import okhttp3.Request;import okhttp3.Response; public class MainActivity extends AppCompatActivity {   private TextView name;   @Override  protected void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setContentView(R.layout.activity_main);     name = (TextView) findViewById(R.id.name);    //用来调用下面的方法,监听。    name.setOnClickListener(new View.OnClickListener() {      @Override      public void onClick(View v) {         interval();      }    });  }   //例1:Observer  public void observer() {    //观察者    Observer<string> observer = new Observer<string>() {      @Override      public void onSubscribe(@NonNull Disposable d) {       }      @Override      public void onNext(@NonNull String s) {        //接收从被观察者中返回的数据        System.out.println("onNext :" + s);      }      @Override      public void onError(@NonNull Throwable e) {       }      @Override      public void onComplete() {       }    };    //被观察者    Observable<string> observable = new Observable<string>() {      @Override      protected void subscribeActual(Observer<!--? super String--> observer) {        observer.onNext("11111");        observer.onNext("22222");        observer.onComplete();      }    };    //产生了订阅    observable.subscribe(observer);  }   //例2:Flowable  private void flowable(){    //被观察者    Flowable.create(new FlowableOnSubscribe<string>() {      @Override      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {        for (int i = 0; i < 100; i++) {          e.onNext(i+"");        }      }      //背压的策略,buffer缓冲区        观察者      //背压一共给了五种策略      // BUFFER、      // DROP、打印前128个,后面的删除      // ERROR、      // LATEST、打印前128个和最后一个,其余删除      // MISSING      //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误    }, BackpressureStrategy.BUFFER).subscribe(new Consumer<string>() {      @Override      public void accept(String s) throws Exception {        System.out.println("subscribe accept"+s);        Thread.sleep(1000);      }    });  }   //例3:线程调度器 Scheduler  public void flowable1(){    Flowable.create(new FlowableOnSubscribe<string>() {      @Override      public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {        for (int i = 0; i < 100; i++) {          //输出在哪个线程          System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());          e.onNext(i+"");        }      }    },BackpressureStrategy.BUFFER)        //被观察者一般放在子线程        .subscribeOn(Schedulers.io())        //观察者一般放在主线程        .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Consumer<string>() {          @Override          public void accept(String s) throws Exception {            System.out.println("s"+ s);            Thread.sleep(100);            //输出在哪个线程            System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());          }        });  }    //例4:http请求网络,map转化器,fastjson解析器  public void map1(){    Observable.create(new ObservableOnSubscribe<string>() {      @Override      public void subscribe(@NonNull final ObservableEmitter<string> e) throws Exception {        OkHttpClient client = new OkHttpClient();        Request request = new Request.Builder()            .url("https://qhb.2dyt.com/Bwei/login")            .build();        client.newCall(request).enqueue(new Callback() {          @Override          public void onFailure(Call call, IOException e) {           }           @Override          public void onResponse(Call call, Response response) throws IOException {            String result = response.body().string();            e.onNext(result);          }        });      }    })        //map转换器 flatmap(无序),concatmap(有序)        .map(new Function<string, bean="">() {      @Override      public Bean apply(@NonNull String s) throws Exception {        //用fastjson来解析数据        return JSONObject.parseObject(s,Bean.class);      }    }).subscribe(new Consumer<bean>() {      @Override      public void accept(Bean bean) throws Exception {        System.out.println("bean = "+ bean.toString() );      }    });  }   //常见rxjava操作符  //例 定时发送消息  public void interval(){    Observable.interval(2,1, TimeUnit.SECONDS)        .take(10)        .subscribe(new Consumer<long>() {          @Override          public void accept(Long aLong) throws Exception {            System.out.println("aLong = " + aLong);          }        });  }    //例 zip字符串合并  public void zip(){    Observable observable1 = Observable.create(new ObservableOnSubscribe<string>() {      @Override      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {        e.onNext("1");        e.onNext("2");        e.onNext("3");        e.onNext("4");        e.onComplete();       }    });    Observable observable2 = Observable.create(new ObservableOnSubscribe<string>() {      @Override      public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {        e.onNext("A");        e.onNext("B");        e.onNext("C");        e.onNext("D");        e.onComplete();      }    });     Observable.zip(observable1, observable2, new BiFunction<string,string,string>() {      @Override      public String apply(@NonNull String o, @NonNull String o2) throws Exception {        return o + o2;      }    }).subscribe(new Consumer<string>() {      @Override      public void accept(String o) throws Exception {        System.out.println("o"+ o);      }    });  }

总结

以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解分享一个简单的java爬虫框架Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表