|
observeOn和subscribeOn都是對(duì)observable的一種操作,區(qū)別就是subscribeOn改變了observable本身產(chǎn)生事件的schedule以及發(fā)出事件后相關(guān)處理事件的程序所在的scheduler,而obseveron僅僅是改變了對(duì)發(fā)出事件后相關(guān)處理事件的程序所在的scheduler。
或許你會(huì)問(wèn),這有多大的區(qū)別嗎?的確是有的,比如說(shuō)產(chǎn)生observable事件是一件費(fèi)時(shí)可能會(huì)卡主線(xiàn)程的操作(比如說(shuō)獲取網(wǎng)絡(luò)數(shù)據(jù)),那么subscribeOn就是你的選擇,這樣可以避免卡住主線(xiàn)程。
兩者最主要的差別是影響的范圍不同,observeOn is more limited,但是卻是可以多次調(diào)用,多次改變不同的接受者所在的scheduler,在調(diào)用這個(gè)函數(shù)之后的observable造成影響。而subscribeOn則是一次性的,無(wú)論在什么地方調(diào)用,總是從改變最原始的observable開(kāi)始影響整個(gè)observable的處理。
subscribeOn()和observeOn()的區(qū)別
- subscribeOn()主要改變的是訂閱的線(xiàn)程,即call()執(zhí)行的線(xiàn)程;
- observeOn()主要改變的是發(fā)送的線(xiàn)程,即onNext()執(zhí)行的線(xiàn)程。
subscribeOn
我們先看一個(gè)例子。
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String integer) {
System.out.println(integer);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
運(yùn)行如下:
a
b
我們看一下subscribeOn()中,都干了什么
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
很明顯,會(huì)走if之外的方法。
在這里我們可以看到,我們又創(chuàng)建了一個(gè)Observable對(duì)象,但創(chuàng)建時(shí)傳入的參數(shù)為OperatorSubscribeOn(this,scheduler),我們看一下此對(duì)象以及其對(duì)應(yīng)的構(gòu)造方法
OperatorSubscribeOn代碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
可以看到,OperatorSubscribeOn實(shí)現(xiàn)Onsubscribe,并且由其構(gòu)造方法可知,他保存了上一個(gè)Observable對(duì)象,并保存了Scheduler對(duì)象。
這里做個(gè)總結(jié)。
把Observable.create()創(chuàng)建的稱(chēng)之為Observable_1,OnSubscribe_1。把subscribeOn()創(chuàng)建的稱(chēng)之為Observable_2,OnSubscribe_2(= OperatorSubscribeOn)。
那么,前兩步就是創(chuàng)建了兩個(gè)的observable,和OnSubscribe,并且OnSubscribe_2中保存了Observable_1的應(yīng)用,即source。
調(diào)用Observable_2.subscribe()方法會(huì)調(diào)用OnSubscibe_2的call方法,即OperatorSubscribeOn的call()。
下面分析下call()方法。
- inner.schedule()改變了線(xiàn)程,此時(shí)Action的call()在我們指定的線(xiàn)程中運(yùn)行。
- Subscriber被包裝了一層。
- source.unsafeSubscribe(s);,注意source是Observable_1對(duì)象。
unsafeSubscribe方法代碼:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
代碼很多,關(guān)鍵代碼:
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
該方法即調(diào)用了OnSubscribe_1.call()方法。注意,此時(shí)的call()方法在我們指定的線(xiàn)程中運(yùn)行。那么就起到了改變線(xiàn)程的作用。
對(duì)于以上線(xiàn)程,我們可以總結(jié),其有如下流程:
- Observable.create() : 創(chuàng)建了Observable_1和OnSubscribe_1;
- subscribeOn(): 創(chuàng)建Observable_2和OperatorSubscribeOn(OnSubscribe_2),同時(shí)OperatorSubscribeOn保存了Observable_1的引用。
- observable_2.subscribe(Observer):
- 調(diào)用OperatorSubscribeOn的call()。call()改變了線(xiàn)程的運(yùn)行,并且調(diào)用了Observable_1.unsafeSubscribe(s);
- Observable_1.unsafeSubscribe(s);,該方法的實(shí)現(xiàn)中調(diào)用了OnSubscribe_1的call()。
從這個(gè)可以了解,無(wú)論我們的subscribeOn()放在哪里,他改變的是subscribe()的過(guò)程,而不是onNext()的過(guò)程。
那么如果有多個(gè)subscribeOn(),那么線(xiàn)程會(huì)怎樣執(zhí)行呢。如果按照我們的邏輯,有以下程序
Observable.just("ss")
.subscribeOn(Schedulers.io()) // ----1---
.subscribeOn(Schedulers.newThread()) //----2----
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
那么,我們根據(jù)之前的源碼分析其執(zhí)行邏輯。
- Observable.just(“ss”),創(chuàng)建Observable_1,OnSubscribe_1
- Observable_1.subscribeOn(Schedulers.io()):創(chuàng)建observable_2,OperatorSubscribeOn_2并保存Observable_1的引用。
- observable_2.subscribeOn(Schedulers.newThread()):創(chuàng)建Observable_3,OperatorSubscribeOn_3并保存Observable_2的引用。
- Observable_3.subscribe():
- 調(diào)用OperatorSubscribeOn_3.call(),改變線(xiàn)程為Schedulers.newThread()。
- 調(diào)用OperatorSubscribeOn_2.call(),改變線(xiàn)程為Schedulers.io()。
- 調(diào)用OnSubscribe_1.call(),此時(shí)call()運(yùn)行在Schedulers.io()。
根據(jù)以上邏輯分析,會(huì)按照1的線(xiàn)程進(jìn)行執(zhí)行。
subscribeOn如何工作,關(guān)鍵代碼其實(shí)就是一行代碼:
source.unsafeSubscribe(s);
注意它所在的位置,是在worker的call里面,說(shuō)白了,就是把source.subscribe這一行調(diào)用放在指定的線(xiàn)程里,那么總結(jié)起來(lái)的結(jié)論就是:
subscribeOn的調(diào)用,改變了調(diào)用前序列所運(yùn)行的線(xiàn)程。
observeOn
看一下observeOn()源碼:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
這里引出了新的操作符lift
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
這里不再介紹了,詳見(jiàn):http://blog.csdn.net/jdsjlzx/article/details/51686152
在lift()中,有如下關(guān)鍵代碼:
Subscriber<? super T> st = hook.onLift(operator).call(o);
OperatorObserveOn.call()核心代碼:
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
我們看到其返回了ObserveOnSubscriber< T>,注意:此時(shí)只調(diào)用了call()方法,但call()方法中并沒(méi)有改變線(xiàn)程的操作,此時(shí)為subscribe()過(guò)程。
我們直奔重點(diǎn),因?yàn)?,我們了解到其改變的是onNext()過(guò)程,那么我們肯定要看一下ObserveOnSubscriber.onNext()找找在哪改變線(xiàn)程
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
這里做了兩件事,首先把結(jié)果緩存到一個(gè)隊(duì)列里,然后調(diào)用schedule啟動(dòng)傳入的worker
我們這里需要注意下:
在調(diào)用observeOn前的序列,把結(jié)果傳入到onNext就是它的工作,它并不關(guān)心后續(xù)的流程,所以工作就到這里就結(jié)束了,剩下的交給ObserveOnSubscriber繼續(xù)。
onNext方法最后調(diào)用了schedule(),從方法名可以看到,其肯定是改變線(xiàn)程用的,并且該方法經(jīng)過(guò)一番循環(huán)之后,調(diào)用了該類(lèi)的call()方法。
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
recursiveScheduler 就是之前我們傳入的Scheduler,我們一般會(huì)在observeOn傳入AndroidScheluders.mainThread()。
scheduler中調(diào)用的call()方法
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(localOn.getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
call()中有l(wèi)ocalChild.onNext(localOn.getValue(v));調(diào)用。
在Scheduler啟動(dòng)后, 我們?cè)贠bservable.subscribe(a)傳入的a就是這里的child, 我們看到,在call中終于調(diào)用了它的onNext方法,把真正的結(jié)果傳了出去,但是在這里,我們是工作在observeOn的線(xiàn)程上的。
總結(jié)起來(lái)的結(jié)論就是:
- observeOn 對(duì)調(diào)用之前的序列默不關(guān)心,也不會(huì)要求之前的序列運(yùn)行在指定的線(xiàn)程上
- observeOn 對(duì)之前的序列產(chǎn)生的結(jié)果先緩存起來(lái),然后再在指定的線(xiàn)程上,推送給最終的subscriber
observeOn改變的是onNext()調(diào)用。
subcribeOn和observeOn 對(duì)比分析
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特別注意
.subscribe(handleData)
有如上邏輯,則我們對(duì)其運(yùn)行進(jìn)行分析。
首先,我們需要先明白其內(nèi)部執(zhí)行的邏輯。
在調(diào)用subscribe之后,邏輯開(kāi)始運(yùn)行。分別調(diào)用每一步OnSubscribe.call(),注意:自下往上。當(dāng)運(yùn)行到最上,即Observable.create()后,我們?cè)谄渲姓{(diào)用了subscriber.onNext(),于是程序開(kāi)始自上往下執(zhí)行每一個(gè)對(duì)象的subscriber.onNext()方法。最終,直到subscribe()中的回調(diào)。
其次,從上面對(duì)subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。
那么對(duì)于以上的邏輯,我們可以得出如下結(jié)論:
- 操作1,2,3,4在io線(xiàn)程中,因?yàn)樵谌绻麤](méi)有observeOn()影響,他們的回調(diào)操作默認(rèn)在訂閱的線(xiàn)程中。而我們的訂閱線(xiàn)程在subscribeOn(io)發(fā)生了改變。注意他們執(zhí)行的先后順序。
- 操作5,6在main線(xiàn)程中運(yùn)行。因?yàn)閛bserveOn()改變了onNext().
- 特別注意那一個(gè)邏輯沒(méi)起到作用
再簡(jiǎn)單點(diǎn)總結(jié)就是
- subscribeOn的調(diào)用切換之前的線(xiàn)程。
- observeOn的調(diào)用切換之后的線(xiàn)程。
- observeOn之后,不可再調(diào)用subscribeOn 切換線(xiàn)程
復(fù)雜情況
我們經(jīng)常多次使用subscribeOn切換線(xiàn)程,那么以后是否可以組合observeOn和subscribeOn達(dá)到自由切換的目的呢?
組合是可以的,但是他們的執(zhí)行順序是有條件的,如果仔細(xì)分析的話(huà),可以知道observeOn調(diào)用之后,再調(diào)用subscribeOn是無(wú)效的,原因是什么?
因?yàn)閟ubscribeOn改變的是subscribe這句調(diào)用所在的線(xiàn)程,大多數(shù)情況,產(chǎn)生內(nèi)容和消費(fèi)內(nèi)容是在同一線(xiàn)程的,所以改變了產(chǎn)生內(nèi)容所在的線(xiàn)程,就改變了消費(fèi)內(nèi)容所在的線(xiàn)程。
經(jīng)過(guò)上面的闡述,我們知道,observeOn的工作原理是把消費(fèi)結(jié)果先緩存,再切換到新線(xiàn)程上讓原始消費(fèi)者消費(fèi),它和生產(chǎn)者是沒(méi)有一點(diǎn)關(guān)系的,就算subscribeOn調(diào)用了,也只是改變observeOn這個(gè)消費(fèi)者所在的線(xiàn)程,和OperatorObserveOn中存儲(chǔ)的原始消費(fèi)者一點(diǎn)關(guān)系都沒(méi)有,它還是由observeOn控制。
@扔物線(xiàn) 大神給的總結(jié):
- 下面提到的“操作”包括產(chǎn)生事件、用操作符操作事件以及最終的通過(guò) subscriber 消費(fèi)事件;
- 只有第一subscribeOn() 起作用(所以多個(gè) subscribeOn() 無(wú)意義);
- 這個(gè) subscribeOn() 控制從流程開(kāi)始的第一個(gè)操作,直到遇到第一個(gè) observeOn();
- observeOn() 可以使用多次,每個(gè) observeOn() 將導(dǎo)致一次線(xiàn)程切換(),這次切換開(kāi)始于這次 observeOn() 的下一個(gè)操作;
- 不論是 subscribeOn() 還是 observeOn(),每次線(xiàn)程切換如果不受到下一個(gè) observeOn() 的干預(yù),線(xiàn)程將不再改變,不會(huì)自動(dòng)切換到其他線(xiàn)程。
參考文章:
https://segmentfault.com/a/1190000004856071
http://blog.csdn.net/lisdye2/article/details/51113837
|