현재 약 2년동안 실무에 RxJava를 사용하고 있지만, 아직까지(부끄럽게도) 항상 동작이 헷갈리는 함수 2개가 있다.
바로 subscribeOn() 과 observeOn()이다. 이미 많은 블로그들의 글을 읽어봤지만, 여전히 헷갈리는 듯하여 RxJava 내부적인 구현의 관점에서 나름대로 한번 정리를 해볼려고 한다.
Reactive Streams
먼저 RxJava의 subscribeOn() 과 observeOn()의 동작을 이해하기 위해서는, Reactive Streams 의 기본 메커니즘을 이해하여야 한다. RxJava도 결국 이 Reactive Streams의 규격을 따르는 구현체의 하나일 뿐이기 때문이다.
API Specification
Reactive Streams API 명세를 살펴보면, 아래와 같이 3개의 interface Publisher, Subscription, Subscriber 가 있다. (사실 Processor도 있지만, 여기서는 무시)
public interface Publisher<T> {
// 새로운 Subscription을 하나 만들어 Subscribe의 onSubscribe() 호출
public void subscribe(Subscriber<? super T> s);
}
public interface Subscription {
// Publisher에게 n개의 데이터 발행 요청
public void request(long n);
// Publisher에게 데이터 발행 중단 요청
public void cancel();
}
public interface Subscriber<T> {
// Subscription의 request() 호출
// Publisher는 그에 대한 응답을 내려줌
public void onSubscribe(Subscription s);
// Subscription의 request() 요청에 대한 Publisher의 데이터 응답
public void onNext(T t)
// Publisher의 데이터 발행 중의 에러 공지
public void onError(Throwable t);
// Publisher의 데이터 발행 완료 공지
public void onComplete();
}
Flow

Subscriber생성하여Publisher의subscribe(Subscriber s)실행Publisher의subscribe(Subscriber s):Subscription생성하여Subscriber의onSubscribe(Subscription s)실행Subscriber의onSubscribe(Subscription s):Subscription의request()실행Subscription의request():Publisher에게 데이터 요청Publisher은Subscription을 통해Subscriber의onNext()/onError()/onComplete()실행
RxJava2의 Flowable 구현
코드를 통해 직접 확인해보기 위해 Reactive Streams의 규격을 잘 따르고 있는 RxJava2의 Flowable의 구현을 한번 살펴보자.
(Single, Mabye 의 경우는 Reactive Streams의 규격을 따르지 않고 있음.)
Rxjava2 2.2.19 버젼을 사용하였습니다.
subscribeOn
Flowable.just(1)
.subscribeOn(Schedulers.io()) // HERE
.subcribe()
subscribeOn(Schedulers.io()) 코드의 구현을 따라가다 보면, SubscribeOnSubscriber이라는 FlowableSubscribeOn 클래스의 내부 클래스를 만나게 된다.
내부 코드 일부분을 살펴보면,
@Override
public void onNext(T t) {
downstream.onNext(t); // NOTE
}
@Override
public void onError(Throwable t) {
downstream.onError(t); // NOTE
worker.dispose();
}
@Override
public void onComplete() {
downstream.onComplete(); // NOTE
worker.dispose();
}
일단 downstream의 Subscriber의 동작을 실행하는 부분에는 설정한 Scheduler로 별도로 스케줄링하지 않고 있다.
@Override
public void request(final long n) {
if (SubscriptionHelper.validate(n)) {
Subscription s = this.upstream.get();
if (s != null) {
requestUpstream(n, s); // NOTE
} else {
// ... 생략 ...
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n)); // NOTE
}
}
static final class Request implements Runnable {
final Subscription upstream;
final long n;
Request(Subscription s, long n) {
this.upstream = s;
this.n = n;
}
@Override
public void run() {
upstream.request(n); // NOTE
}
}
request() 함수를 따라가보면, scheduler.createWorker()(위에 코드는 넣지 않았지만)로 생성한 Worker에 의해 upstream Subscription의 request()이 스케쥴링됨을 확인할 수 있다.
observeOn
Flowable.just(1)
.observeOn(Schedulers.io()) // HERE
.subcribe()
마찬가지로 observeOn(Schedulers.io()) 코드의 구현을 따라가다 보면, ObserveOnSubscriber이라는 FlowableObserveOn 클래스의 내부 클래스를 만나게 된다. 그리고 이 ObserveOnSubscriber 상속하고 있는 추상 클래스 BaseObserveOnSubscriber 를 살펴보면,
@Override public final void onNext(T t) {
// ... 생략 ...
trySchedule(); // NOTE
}
@Override
public final void onError(Throwable t) {
// ... 생략 ...
trySchedule(); // NOTE
}
@Override
public final void onComplete() {
// ... 생략 ...
trySchedule(); // NOTE
}
Subscriber 모든 함수의 마지막에 trySchedule()가 실행됨을 볼 수 있다. trySchedule() 의 구현을 보면,
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this); // NOTE
}
역시 scheduler.createWorker()를 통해 생성된 Worker를 통해 실행되는데, 여기서 this는 Runnable 를 구현한 자신이다.
그럼 Runnable를 구현한 부분을 보면,
@Override
public final void run()
// ... 생략 ...
runAsync(); // NOTE
}
runAsync()이라는 추상 메쏘드를 실행한다. 처음으로 돌아와서 이 추상 클래스를 상속한 ObserveOnSubscriber에서 구현한 runAsync() 을 확인해보면,
@Override
void runAsync() {
// ... 생략 ...
final Subscriber<? super T> a = downstream;
final SimpleQueue<T> q = queue;
// ... 생략 ...
for (;;) {
try {
v = q.poll();
} catch (Throwable ex) {
// ... 생략 ...
a.onError(ex); // NOTE
worker.dispose();
return;
}
boolean empty = v == null;
// NOTE: checkTerminated() 반환값이 true인 경우, 반환하기 전에 함수 내에서 a.onError() 또는 a.onComplete()실행됨
if (checkTerminated(d, empty, a)) {
return;
}
// ... 생략 ...
a.onNext(v); // NOTE
}
}
무한 반복문을 돌면서, queue에 있는 value들을 하나씩 처리하면서 적절한 downstream Subscriber의 함수를 실행하게 된다.
즉, downstream Subscriber의 함수들은 설정한 Schedule의 Worker에 의해 스케쥴링된다.
정리
구현에서 중요한 부분만 최대한 요약할려고 했음에도, 설명이 다소 길어졌던 것 같다. 구현 부분에 대한 설명이 길었지만, 확인한 결과를 간단히 요약해서 정리하면(Flowable 기준),
subscribeOn(Scheduler scheduler): upstreamSubscription의reqeust()를 실행할Scheduler를 설정observeOn(Scheduler scheduler): downstreamSubscriber의 동작(onNext(),onError(),onComplete()) 을 실행할Scheduler를 설정
이 2가지만 기억한다면, 하나 이상의 subscribeOn() 또는 observeOn()을 붙였을 때 일어나는 스케쥴링 동작들을 이해하는데 큰 도움이 될 것이다.
아래는 추가로 Reactive Streams의 규격을 따르고 있지는 않지만, Rxjava2의 Single, Maybe에서의 동작에 대해서도 적어보았다.
Reactive Streams의 Publiser, Subscriber가 각각 SingleSource(또는 MaybeSource), SingleObserver(또는 MaybeObserver)에 대응된다고 생각하면 Flowable과 거의 유사하다.
subscribeOn(Scheduler scheduler): upstreamSingleSource(또는MaybeSource)의subscribe()를 실행할Scheduler를 설정observeOn(Scheduler scheduler): downstreamSingleObserver(또는MaybeObserver)의 동작(onSuccess(),onError(),onComplete()) 을 실행할Scheduler를 설정
다음 글 에서는 실제 로그를 통해 이 동작을 확인해보도록 할 것이다.