`
hzy3774
  • 浏览: 984786 次
  • 性别: Icon_minigender_1
  • 来自: 珠海
社区版块
存档分类
最新评论

RxJava在Android中使用笔记

阅读更多

使用RxJava可以方便实现观察者模式,数据转换和线程之间通信

https://github.com/ReactiveX/RxJava

在Android中使用RxAndroid增加安卓主线程支持:

https://github.com/ReactiveX/RxAndroid

实际开发中在gradle中注册对应的依赖即可:

compile 'io.reactivex:rxandroid:1.1.0'

 观察者模式一般要有两个对象:

1.被观察者:Obserable

2.观察者:Observer

观察者和被观察者绑定后,被观察者在某种事件发生时会向所有观察它的观察者发送事件,即调用观察者的回调函数。

 

使用java.util里的观察者应该是这样的:

 

import android.util.Log;

import java.util.Observable;
import java.util.Observer;

public class ObserverTest {

    void test0() {
        //创建被观察者
        DemoObserable obserable = new DemoObserable();
        //创建一个观察者
        Observer observer1 = new Observer() {
            //观察者接收到事件时的操作
            @Override
            public void update(Observable observable, Object data) {
                Log.e("TAG", "ToObserver1: " + data);
            }
        };
        //添加绑定
        obserable.addObserver(observer1);
        //被观察者发送事件
        obserable.subscribe("Hello World!!");
        //被观察者将观察者移除
        obserable.deleteObserver(observer1);
    }

    //被监听者
    public class DemoObserable extends Observable {

        //被监听者发送数据给所有观察者
        void subscribe(String data) {
            setChanged();
            notifyObservers(data);
        }

    }
}

 假设观察者有多个,可以依次注册,删除:

 

//创建被观察者
        DemoObserable obserable = new DemoObserable();
        //创建一个观察者
        Observer observer1 = new Observer() {
            //观察者接收到事件时的操作
            @Override
            public void update(Observable observable, Object data) {
                Log.e("TAG", "ToObserver1: " + data);
            }
        };
        //创建一个观察者
        Observer observer2 = new Observer() {
            //观察者接收到事件时的操作
            @Override
            public void update(Observable observable, Object data) {
                Log.e("TAG", "ToObserver2: " + data);
            }
        };
        //添加绑定
        obserable.addObserver(observer1);
        obserable.addObserver(observer2);
然而,我们接下来使用 rx.Observer, rx.Observable, 思想也是类似的:
1.观察者与被观察者
void test0() {

        Observable.OnSubscribe<String> onSubscribe = new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello world!!");
                subscriber.onCompleted();
            }
        };

        Observable<String> observable = Observable.create(onSubscribe);

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e("TAG", s);
            }

            @Override
            public void onCompleted() {
                Log.e("TAG", "completed");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG", "error" + e);
            }
        };

        observable.subscribe(observer);

    }
 完成了类似的功能。
Observable构造方法是保护的,只能通过提供的静态方法创建对象,如Observable.create()
RxJava可以使用链式调用简化代码,所以也可以写成:
Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("hello world!!");
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e("TAG", s);
            }

            @Override
            public void onCompleted() {
                Log.e("TAG", "completed");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG", "error" + e);
            }
        });
 如果直接知道被监听者发送事件onNext()时的参数,可以使用just()来创建Observable,效果和上例一样
Observable.just("Hello World!")
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.e("TAG", "complete");
                    }
                    @Override
                    public void onError(Throwable e) {

                    }
                    @Override
                    public void onNext(String s) {
                        Log.e("TAG", s);
                    }
                });
 just函数可以传入多个参数,事件依次调用后最后执行onComplete()
Observable.just("Hello World!", "RxJava Demo")
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        Log.e("TAG", "complete");
                    }
                    @Override
                    public void onError(Throwable e) {

                    }
                    @Override
                    public void onNext(String s) {
                        Log.e("TAG", s);
                    }
                });
 如果观察者只需要重写onNext()方法,可以直接用ActionX来节省掉onError和onComplete的代码:
Observable.just("Hello World!", "RxJava Demo")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("TAG", s);
                    }
                });
 现在观察者也变的好简单,使用多个参数just()来创建也可以用from(T[])来创建,与上例等价:
String[] arr = {"Hello World!", "RxJava Demo"};
        Observable.from(arr)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("TAG", s);
                    }
                });
 如果不想用Observer,又想处理onError和onComplete,就需要在subscribe()函数传多个ActionX:
String[] arr = {"Hello World!", "RxJava Demo"};
        Action1 nextAction, errorAction;
        Action0 completeAction;
        nextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("TAG", s);
            }
        };
        errorAction = new Action1<Exception>() {
            @Override
            public void call(Exception e) {
                e.printStackTrace();
            }
        };
        completeAction = new Action0() {
            @Override
            public void call() {
                Log.e("TAG", "complete");
            }
        };
        Observable.from(arr)
                .subscribe(nextAction, errorAction, completeAction);
 2.观察者与被观察者在不同线程
加上线程控制语句subscribeOn,observeOn,就可以将被观察者和观察者指定在不同线程中:
Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.e("TAG", "I am in computation");
                subscriber.onNext("Hello World!!");
            }
        })
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("TAG", s);
                    }
                });
 两个操作发生在不同线程中,通过log可以看出来:
03-29 21:00:39.333 13153-13226/com.hzy.rxjavademo E/TAG: I am in computation
03-29 21:00:39.334 13153-13153/com.hzy.rxjavademo E/TAG: Hello World!!
03-29 21:00:39.914 13153-13227/com.hzy.rxjavademo E/TAG: I am in computation
03-29 21:00:39.915 13153-13153/com.hzy.rxjavademo E/TAG: Hello World!!
03-29 21:00:40.607 13153-13228/com.hzy.rxjavademo E/TAG: I am in computation
03-29 21:00:40.608 13153-13153/com.hzy.rxjavademo E/TAG: Hello World!!
 通过日志pid可以看到,Observable执行会开启新的线程,而Observer观察者收到消息后执行的操作在UI线程执行,所以可以方便的替代之前Android的new Thread加Handler发送消息的机制。
如下代码就可以完成一个完整的异步网络请求,并把结果和出错信息通知到UI线程的观察者:
Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    URL url = new URL("https://m.baidu.com/");
                    InputStream is = url.openStream();
                    InputStreamReader isr = new InputStreamReader(is);
                    BufferedReader br = new BufferedReader(isr);
                    String line;
                    StringBuilder builder = new StringBuilder();
                    while ((line = br.readLine()) != null) {
                        builder.append(line);
                    }
                    subscriber.onNext(builder.toString());
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("TAG", s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
 异步操作变得好简单,而且很容易理解
3.数据的过滤
数据过滤使用filter方法:
String[] arr = {"Hello World!", "RxJava Demo", "RxAndroid"};
        Observable.from(arr)
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.startsWith("Rx");
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("TAG", s);
                    }
                });
 这时,如例子中所有返回以Rx开头字符串的事件,才会去通知观察者。
4.数据的转换
消息发送者返回的数据用map()进行转化后再发送给观察者
Observable.just(123)
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        String ret = integer + ":" + integer * 2;
                        return ret;
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("TAG", s);
            }
        });
 把消息发送者发来的 Integer类型转成String类型再传给观察者去处理。
使用lift()把一个观察者转成另一个:
Observable.just(123)
                .lift(new Observable.Operator<String, Integer>() {
                    @Override
                    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
                        return new Subscriber<Integer>() {
                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }
                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }
                            @Override
                            public void onNext(Integer integer) {
                                subscriber.onNext("hello" + integer);
                            }
                        };
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("TAG", s);
            }
        });
 把一个Integer类型的Subscriber转成String类型,Subscriber实现Observer接口,所以Subscriber也是观察者。

使用compose直接把被监听者转换掉,Transformer规定了转换的规则,Obserable怎么转换呢,还是用个map来转吧,Transformer在此显得多此一举了:

Observable.just(123)
                .compose(new Observable.Transformer<Integer, String>() {
                    @Override
                    public Observable<String> call(Observable<Integer> integerObservable) {
                        return integerObservable.map(new Func1<Integer, String>() {
                            @Override
                            public String call(Integer integer) {
                                return integer + "hello";
                            }
                        });
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("TAG", s);
            }
        });

 把Integer类型转换成String类型

 

5.条件判断

条件判断的Observable范型都会被转换成Boolean类型

判断是否存在某种类型用exists()函数

Observable.just(123, 20, 34, 4, 0, 2)
                .exists(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer == 0;
                    }
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.e("TAG", "" + aBoolean);
                    }
                });

判断那些值中是否存在为0的Integer,显然存在,返回true

用all()判断是否全都是某种情况:

Observable.just(123, 20, 34, 4, 0, 2)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 0;
                    }
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.e("TAG", "" + aBoolean);
                    }
                });

上面那些值全都大于0么,显然不是,返回false。

 

6.观察者嵌套

有一种场景,观察者收到消息后,不能完全处理掉消息,自身又作为消息发送者给其他观察者发消息。

Observable.just(123)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Observable.just(integer)
                                .subscribe(new Action1<Integer>() {
                                    @Override
                                    public void call(Integer integer) {
                                        Log.e("TAG", "" + integer);
                                    }
                                });
                    }
                });

 两层嵌套之后缩进已经很长了,为解决这个问题,可以用flatMap()来减少缩进:

Observable.just(123)
                .flatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(Integer integer) {
                        return Observable.just(integer);
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e("TAG", "" + integer);
            }
        });

 这样,缩进好多了。

最后是一个综合实例:
//使用IO线程读取SD卡上所有png文件并在主线程上打印
    private void testy() {
        Observable.create(new Observable.OnSubscribe<File>() {
            @Override
            public void call(Subscriber<? super File> subscriber) {
                //获取外部存储上的所有文件
                File exDir = Environment.getExternalStorageDirectory();
                traceFiles(exDir, subscriber);
            }
        }).subscribeOn(Schedulers.io())
                //过滤出png文件
                .filter(new Func1<File, Boolean>() {
                    @Override
                    public Boolean call(File file) {
                        return file.isFile() && file.getName().endsWith(".png");
                    }
                    //使用buffer防止消息发送过快
                }).onBackpressureBuffer()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<File>() {
                    @Override
                    public void call(File file) {
                        Log.e("TAG", file.getAbsolutePath());
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e("TAG", "Error:" + throwable);
                        throwable.printStackTrace();
                    }
                });
    }

    private void traceFiles(File file, Subscriber<? super File> subscriber) {
        subscriber.onNext(file);
        if (file.isDirectory()) {
            for (File f : file.listFiles()) {
                traceFiles(f, subscriber);
            }
        }
    }
  
结束
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics