현재 약 2년동안 실무에 RxJava를 사용하고 있지만, 아직까지(부끄럽게도) 항상 동작이 헷갈리는 함수 2개가 있다.
바로 subscribeOn()
과 observeOn()
이다. 이미 많은 블로그들의 글을 읽어봤지만, 여전히 헷갈리는 듯하여 RxJava 내부적인 구현의 관점에서 나름대로 한번 정리를 해볼려고 한다.
Reactive Streams
먼저 RxJava의 subscribeOn()
과 observeOn()
의 동작을 이해하기 위해서는, Reactive Streams 의 기본 메커니즘을 이해하여야 한다. RxJava도 결국 이 Reactive Streams의 규격을 따르는 구현체의 하나일 뿐이기 때문이다.
API Specification
Reactive Streams API 명세를 살펴보면, 아래와 같이 3개의 interface Publisher
, Subscription
, Subscriber
가 있다. (사실 Processor
도 있지만, 여기서는 무시)
public interface Publisher<T> {
// 새로운 Subscription을 하나 만들어 Subscribe의 onSubscribe() 호출
public void subscribe(Subscriber<? super T> s);
}
public interface Subscription {
// Publisher에게 n개의 데이터 발행 요청
public void request(long n);
// Publisher에게 데이터 발행 중단 요청
public void cancel();
}
public interface Subscriber<T> {
// Subscription의 request() 호출
// Publisher는 그에 대한 응답을 내려줌
public void onSubscribe(Subscription s);
// Subscription의 request() 요청에 대한 Publisher의 데이터 응답
public void onNext(T t)
// Publisher의 데이터 발행 중의 에러 공지
public void onError(Throwable t);
// Publisher의 데이터 발행 완료 공지
public void onComplete();
}
Flow
Subscriber
생성하여Publisher
의subscribe(Subscriber s)
실행Publisher
의subscribe(Subscriber s)
:Subscription
생성하여Subscriber
의onSubscribe(Subscription s)
실행Subscriber
의onSubscribe(Subscription s)
:Subscription
의request()
실행Subscription
의request()
:Publisher
에게 데이터 요청Publisher
은Subscription
을 통해Subscriber
의onNext()
/onError()
/onComplete()
실행
RxJava2의 Flowable 구현
코드를 통해 직접 확인해보기 위해 Reactive Streams의 규격을 잘 따르고 있는 RxJava2의 Flowable
의 구현을 한번 살펴보자.
(Single
, Mabye
의 경우는 Reactive Streams의 규격을 따르지 않고 있음.)
Rxjava2 2.2.19 버젼을 사용하였습니다.
subscribeOn
Flowable.just(1)
.subscribeOn(Schedulers.io()) // HERE
.subcribe()
subscribeOn(Schedulers.io())
코드의 구현을 따라가다 보면, SubscribeOnSubscriber
이라는 FlowableSubscribeOn
클래스의 내부 클래스를 만나게 된다.
내부 코드 일부분을 살펴보면,
@Override
public void onNext(T t) {
downstream.onNext(t); // NOTE
}
@Override
public void onError(Throwable t) {
downstream.onError(t); // NOTE
worker.dispose();
}
@Override
public void onComplete() {
downstream.onComplete(); // NOTE
worker.dispose();
}
일단 downstream의 Subscriber
의 동작을 실행하는 부분에는 설정한 Scheduler
로 별도로 스케줄링하지 않고 있다.
@Override
public void request(final long n) {
if (SubscriptionHelper.validate(n)) {
Subscription s = this.upstream.get();
if (s != null) {
requestUpstream(n, s); // NOTE
} else {
// ... 생략 ...
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n)); // NOTE
}
}
static final class Request implements Runnable {
final Subscription upstream;
final long n;
Request(Subscription s, long n) {
this.upstream = s;
this.n = n;
}
@Override
public void run() {
upstream.request(n); // NOTE
}
}
request()
함수를 따라가보면, scheduler.createWorker()
(위에 코드는 넣지 않았지만)로 생성한 Worker
에 의해 upstream Subscription
의 request()
이 스케쥴링됨을 확인할 수 있다.
observeOn
Flowable.just(1)
.observeOn(Schedulers.io()) // HERE
.subcribe()
마찬가지로 observeOn(Schedulers.io())
코드의 구현을 따라가다 보면, ObserveOnSubscriber
이라는 FlowableObserveOn
클래스의 내부 클래스를 만나게 된다. 그리고 이 ObserveOnSubscriber
상속하고 있는 추상 클래스 BaseObserveOnSubscriber
를 살펴보면,
@Override public final void onNext(T t) {
// ... 생략 ...
trySchedule(); // NOTE
}
@Override
public final void onError(Throwable t) {
// ... 생략 ...
trySchedule(); // NOTE
}
@Override
public final void onComplete() {
// ... 생략 ...
trySchedule(); // NOTE
}
Subscriber
모든 함수의 마지막에 trySchedule()
가 실행됨을 볼 수 있다. trySchedule()
의 구현을 보면,
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this); // NOTE
}
역시 scheduler.createWorker()
를 통해 생성된 Worker
를 통해 실행되는데, 여기서 this
는 Runnable
를 구현한 자신이다.
그럼 Runnable
를 구현한 부분을 보면,
@Override
public final void run()
// ... 생략 ...
runAsync(); // NOTE
}
runAsync()
이라는 추상 메쏘드를 실행한다. 처음으로 돌아와서 이 추상 클래스를 상속한 ObserveOnSubscriber
에서 구현한 runAsync()
을 확인해보면,
@Override
void runAsync() {
// ... 생략 ...
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
// ... 생략 ...
for (;;) {
try {
v = q.poll();
} catch (Throwable ex) {
// ... 생략 ...
a.onError(ex); // NOTE
worker.dispose();
return;
}
boolean empty = v == null;
// NOTE: checkTerminated() 반환값이 true인 경우, 반환하기 전에 함수 내에서 a.onError() 또는 a.onComplete()실행됨
if (checkTerminated(d, empty, a)) {
return;
}
// ... 생략 ...
a.onNext(v); // NOTE
}
}
무한 반복문을 돌면서, queue
에 있는 value들을 하나씩 처리하면서 적절한 downstream Subscriber
의 함수를 실행하게 된다.
즉, downstream Subscriber
의 함수들은 설정한 Schedule
의 Worker
에 의해 스케쥴링된다.
정리
구현에서 중요한 부분만 최대한 요약할려고 했음에도, 설명이 다소 길어졌던 것 같다. 구현 부분에 대한 설명이 길었지만, 확인한 결과를 간단히 요약해서 정리하면(Flowable
기준),
subscribeOn(Scheduler scheduler)
: upstreamSubscription
의reqeust()
를 실행할Scheduler
를 설정observeOn(Scheduler scheduler)
: downstreamSubscriber
의 동작(onNext()
,onError()
,onComplete()
) 을 실행할Scheduler
를 설정
이 2가지만 기억한다면, 하나 이상의 subscribeOn()
또는 observeOn()
을 붙였을 때 일어나는 스케쥴링 동작들을 이해하는데 큰 도움이 될 것이다.
아래는 추가로 Reactive Streams의 규격을 따르고 있지는 않지만, Rxjava2의 Single
, Maybe
에서의 동작에 대해서도 적어보았다.
Reactive Streams의 Publiser
, Subscriber
가 각각 SingleSource
(또는 MaybeSource
), SingleObserver
(또는 MaybeObserver
)에 대응된다고 생각하면 Flowable
과 거의 유사하다.
subscribeOn(Scheduler scheduler)
: upstreamSingleSource
(또는MaybeSource
)의subscribe()
를 실행할Scheduler
를 설정observeOn(Scheduler scheduler)
: downstreamSingleObserver
(또는MaybeObserver
)의 동작(onSuccess()
,onError()
,onComplete()
) 을 실행할Scheduler
를 설정
다음 글 에서는 실제 로그를 통해 이 동작을 확인해보도록 할 것이다.