Observable的执行流程
前两篇文章都是在对RxJava中的几个重要的类做说明,这篇文章来个总结,并对Observable的subscribeOn和observeOn和subsribe()几个方法的流程分析一下。
为了加深理解,做了个流程图=。 = 创建一个Observable后,我们会调用它的subscribeOn,observeOn,subscribe方法。
首先我们要清楚lift到底做了什么事情,我觉得可以总结为两点:
- 它接收一个Operator操作符,operator有一个call方法,接收一个Subsriber
,返回一个新的Subscriber ,实现的是一个变换。 - 它返回一个新的Observable,这个Observable的唯一成员OnSubscribe
的call方法会使用原Observable的OnSubscribe调用call方法,并传入Operator返回的Subscriber作为参数。
重点在于,lift返回的是一个新的Observable,但并没有和原来的Observable断绝关系,当他的OnSubscribe 的call方法被调用时,原Observable的OnSubscribe的call方法也会被调用,这意味着,无论我们调用用多少次lift操作,原始的Observable的OnSubscribe的call方法一定会被调用到。
还有Observable的成员OnSubscribe的call方法,它的参数是Subscriber
接下来看看Observable的几个重要方法了。
subscribeOn
它调用了nest().lift(new OperatorSubscribeOn<T>(scheduler))
。
- 首先是一个nest()转换,nest()调用just(this),最后调用ScalarSynchronousObservable的create方法,返回了一个ScalarSynchronousObservable,这是Observable的子类。
- 所以,向上图描述的那样,nest调用后,之前的Observable变成了ScalarSynchronousObservable,而之前的Observable作为ScalarSynchronousObservable类的构造函数参数,ScalarSynchronousObservable的成员OnSubscribe 是重新创建的一个,它的call方法发射ScalarSynchronousObservable类传入的参数。
- 第3步是一个lift(OperatorSubscribeOn)变换,OperatorSubscribeOn的call方法将传入的subscriber放入到指定的线程中去执行,这样完成线程的切换。
observableOn
它也经历了一个lift(OperatorObserveOn)操作,当observableOn的Schedule传入AndroidSchedulers .mainThread ()时,OperatorObserveOn的call方法返回一个ObserveOnSubscriber,ObserveOnSubscriber的onNext,onError,onCompleted等方法最终都会到Schedule的线程去执行。
subscribe
这是最后的方法,它传入的参数是一个Subscriber,这个Subscriber会作为Observable的成员OnSubscribe的call方法的参数被调用。而经过上面的流程图Observable和OnSubscribe都是重新创建的,新的OnSubscribe的call方法会调用之前的OnSubscribe的call方法也会被调用,所以,所有lift操作生成的OnSubscribe的call方法都会被调用。
算了,还是用代码来测试下执行流程=。=
怎么开始呢,我发现Observable的lift,subscribe,unsafeSubscribe等方法都有使用private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook() ;
这个hook的相关方法的调用。而RxJavaPlugins也有提供registerObservableExecutionHook等方法来注册hook。于是第一个想法就是重写RxJavaObservableExecutionHook来添加我们的东东。
package com.fangler.rxdemo.hook;
import android.util.Log;
import rx.Observable;
import rx.Subscription;
import rx.plugins.RxJavaObservableExecutionHook;
/**
* Created by fangler on 2015/11/2.
*/
public class RxJavaObservableSelfHook extends RxJavaObservableExecutionHook {
private static final String TAG = "MainActivity";
private void log(String period) {
Log.d(TAG, "RxJavaObservableSelfHook "+ period + ".." + Thread.currentThread().getName());
}
@Override public <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> f) {
log("onCreate");
return super.onCreate(f);
}
@Override
public <T, R> Observable.Operator<? extends R, ? super T> onLift(Observable.Operator<? extends R, ? super T> lift) {
log("onLift");
return super.onLift(lift);
}
@Override public <T> Throwable onSubscribeError(Throwable e) {
log("onSubscribeError");
return super.onSubscribeError(e);
}
@Override public <T> Subscription onSubscribeReturn(Subscription subscription) {
log("onSubscribeReturn");
return super.onSubscribeReturn(subscription);
}
@Override
public <T> Observable.OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, Observable.OnSubscribe<T> onSubscribe) {
log("onSubscribeStart");
return super.onSubscribeStart(observableInstance, onSubscribe);
}
}
然后调用RxJavaPlugins. getInstance().registerObservableExecutionHook( new RxJavaObservableSelfHook())
来注册。运行发现报错了“Another strategy was already registered”,看来Observable肯定已经注册了一个而且只允许注册一个。 在看看RxJavaPlugins这个类,发现有一个reset方法可以重置所有注册的hook,不过这个函数外部无法访问,我们可以用反射来试试。
private void hookObservable() {
RxJavaPlugins instance = RxJavaPlugins.getInstance();
Class clazz = instance.getClass();
Method method = null;
try {
method = clazz.getDeclaredMethod("reset");
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
if (method != null) {
try {
method.setAccessible(true);
method.invoke(instance);
instance.registerObservableExecutionHook(new RxJavaObservableSelfHook());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
再次运行时正常了,这下我们可以hook的Log了。
那我们来多测试几种情况来看看执行流程=。=
一次subscribeOn,一次observeOn
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "Producer in " + Thread.currentThread().getName());
subscriber.onNext("1");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {
@Override public void call(String s) {
Log.d(TAG, "Consumer in " + Thread.currentThread().getName());
}
})
输出的log是这样的
11-03 10:17:32.247 664-664/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onCreate..main
11-03 10:17:32.252 664-664/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..main
11-03 10:17:32.252 664-664/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..main
11-03 10:17:32.257 664-664/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..main
11-03 10:17:32.260 664-664/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..main
11-03 10:17:32.260 664-705/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..RxCachedThreadScheduler-1
11-03 10:17:32.260 664-705/com.fangler.rxdemo D/MainActivity﹕ Producer in RxCachedThreadScheduler-1
11-03 10:17:32.261 664-705/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..RxCachedThreadScheduler-1
11-03 10:17:32.294 664-664/com.fangler.rxdemo D/MainActivity﹕ Consumer in main
按函数执行过程分析:在main线程调用create(hook对应onCreate)创建Observable,但是执行流程是反过来的,所以调用subscribe时才会真正执行,subscribe会依次调用hook的onSubscribeStart,onSubscribe的call方法(对应subscribeOn和onserveOn两次lift操作),hook的onSubscribeReturn。但是subscribeOn的执行放到io线程去执行了,所以Log上有一个新的线程RxCachedThreadScheduler-1生成,而数据的生成在这个线程执行。
两次subscribeOn,一次observeOn
Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "Producer in " + Thread.currentThread().getName());
subscriber.onNext("1");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).subscribeOn(Schedulers. newThread())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {
@Override public void call(String s) {
Log.d(TAG, "Consumer in " + Thread.currentThread().getName());
}
})
输出Log如下
11-03 10:32:03.827 6079-6079/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onCreate..main
11-03 10:32:03.832 6079-6079/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..main
11-03 10:32:03.832 6079-6079/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..main
11-03 10:32:03.837 6079-6079/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..main
11-03 10:32:03.837 6079-6079/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..main
11-03 10:32:03.838 6079-6095/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..RxNewThreadScheduler-1
11-03 10:32:03.838 6079-6095/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..RxNewThreadScheduler-1
11-03 10:32:03.841 6079-6095/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..RxNewThreadScheduler-1
11-03 10:32:03.842 6079-6097/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..RxCachedThreadScheduler-1
11-03 10:32:03.842 6079-6097/com.fangler.rxdemo D/MainActivity﹕ Producer in RxCachedThreadScheduler-1
11-03 10:32:03.842 6079-6097/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..RxCachedThreadScheduler-1
11-03 10:32:03.851 6079-6079/com.fangler.rxdemo D/MainActivity﹕ Consumer in main
由于调用了两次suscribeOn且Schedule不同,所以执行流程上多了一个线程。最后生成数据是在里create方法最近的subscribeOn方法对应的Schedule上执行。
两次subscribeOn,两次observeOn
private Subscription subscribeTwice() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "Producer in " + Thread.currentThread().getName());
subscriber.onNext("1");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread()).subscribe(action1);
}
先看看对应的Log..
11-03 10:38:05.085 8248-8248/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onCreate..main
11-03 10:38:05.090 8248-8248/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..main
11-03 10:38:05.090 8248-8248/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..main
11-03 10:38:05.096 8248-8248/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..main
11-03 10:38:05.097 8248-8248/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..main
11-03 10:38:05.097 8248-8268/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..RxNewThreadScheduler-2
11-03 10:38:05.098 8248-8268/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..RxNewThreadScheduler-2
11-03 10:38:05.099 8248-8268/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onLift..RxNewThreadScheduler-2
11-03 10:38:05.102 8248-8268/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..RxNewThreadScheduler-2
11-03 10:38:05.102 8248-8270/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeStart..RxCachedThreadScheduler-1
11-03 10:38:05.102 8248-8270/com.fangler.rxdemo D/MainActivity﹕ Producer in RxCachedThreadScheduler-1
11-03 10:38:05.102 8248-8270/com.fangler.rxdemo D/MainActivity﹕ RxJavaObservableSelfHook onSubscribeReturn..RxCachedThreadScheduler-1
11-03 10:38:05.112 8248-8267/com.fangler.rxdemo D/MainActivity﹕ Consumer in RxNewThreadScheduler-1
这次subscribeOn和observeOn分别嵌套的调用了两次,反过来看看执行顺序,最后的observeOn传入一个newThread的Schedule,所以最后数据的消费在线程RxCachedThreadScheduler-1中执行,倒数第二的subscribeOn传入的是一个newThread的Schedule,所以生成RxNewThreadScheduler-2线程,它会导致它之前的动作都切换到这个线程来执行,所以我们看到两次onlift都在这个线程执行。在往前的observeOn传入AndroidSchedulers.mainThread()的Schedule,发现它似乎没被用到。最前面的subscribeOn出传入的是io的schedule,同前面一样,最后生成数据是在里create方法最近的subscribeOn方法对应的Schedule上执行。
经过上面的测试我们得出的结论是: subscribeOn(schedule),会导致它之前的操作会在shedule对应的线程去执行,而数据的生产只会在离create方法最近的对应的线程执行,数据的消费在离subscribe最近的线程执行。
或者这样
数据的生成在里数据创建最近的地方执行,而数据的消费在离数据的订阅最近的地方执行。中间所有的线程只是执行数据变换的地方。
当然了我们一般也不会用那么多次subscribeOn和observeOn来这么任性的切换线程执行,所以上面的例子只是来加深对这样执行流程和线程切换的理解~~
最后附上测试代码地址:https://github.com/fangler/RxJavaDemo