PRivate void test1() { //注册观察活动 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext(sayName()); } }); //第一种订阅方式:传入Observer对象 observable.subscribe(new Observer<String>() { @Override public void onCompleted() { Log.e("TAG", "completed"); } @Override public void onError(Throwable e) { Log.e("TAG", "error"); } @Override public void onNext(String s) { Log.e("TAG", "name=" + s); } }); //第二种订阅方式:传入Subscribe对象 observable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.e("TAG", "completed"); } @Override public void onError(Throwable e) { Log.e("TAG", "error"); } @Override public void onNext(String s) { Log.e("TAG", "name=" + s); } }); } private String sayName() { return "I am Beautiful"; }2:Observable.from();
可以通过列表或数组来创建对象,并通过循环发射每一个对象,或者也可以从传入javaFeature类。
List<String> lists = new ArrayList<>(); lists.add("aa"); lists.add("bb"); lists.add("cc"); Observable<String> observable = Observable.from(lists); observable.subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.e("TAG", "string=" + s); } });3:Observable.just();
你可以传入1~9个参数(也可以传入函数)。它们会被按顺序依次发射出去。
private void testJust() { Observable.just(a(), b()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.e("TAG", "string=" + s); } }); } private String a() { return ""; } private String b() { return ""; }二、过滤Observables
1:通过filter()过滤序列。
private void testFilter() { List<String> lists = new ArrayList<>(); lists.add("aa"); lists.add("bb"); lists.add("cc"); Observable.from(lists) .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.startsWith("b"); } }) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }); }2:通过take、takeLast取开头或结尾的数据
private void testTake() { List<String> lists = new ArrayList<>(); lists.add("aa"); lists.add("bb"); lists.add("cc"); Observable.from(lists) .take(2) .takeLast(1) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }); }3:使用distinct去除重复的数据
private void testDistinct() { List<Integer> lists = new ArrayList<>(); lists.add(1); lists.add(2); lists.add(3); lists.add(4); Observable.from(lists) .take(3)//取出前三个数据 .repeat(3)//重复3次,即此时有9个数据待发射 .distinct() .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer num) { } }); }4:获取第一个或者最后一个:first(),last(),firstOrDefalt(T),lastOrDefault(T)
private void testFirtLast(){ List<String> lists = new ArrayList<>(); lists.add("aa"); lists.add("bb"); lists.add("cc"); Observable.from(lists) .firstOrDefault("") .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }); }5:通过ElemtAt(),elementAtOrDefault() 发射指定位置数据
private void testFirtLast(){ List<String> lists = new ArrayList<>(); lists.add("aa"); lists.add("bb"); lists.add("cc"); Observable.from(lists) .firstOrDefault("default") .subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { } }); }6:通过sample()每隔一段时间发送最近一次的值
List<String> list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { list.add("i = " + i); } Observable<String> observable = Observable.from(list); observable.sample(50, TimeUnit.MILLISECONDS).subscribe(new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.i(tag, s); } }); //50毫秒取一次最近的消息进行打印7:timeout,debounce
timeout如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发
onError()
函数。我们可以使用timeout()
函数来监听源可观测序列,就是在我们设定的时间间隔内如果没有得到一个值则发射一个错误。debounce:debounce()函数过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
新闻热点
疑难解答