RxJava의 subscribeOn와 observeOn (1)

현재 약 2년동안 실무에 RxJava를 사용하고 있지만, 아직까지(부끄럽게도) 항상 동작이 헷갈리는 함수 2개가 있다.
바로 subscribeOn()observeOn()이다. 이미 많은 블로그들의 글을 읽어봤지만, 여전히 헷갈리는 듯하여 RxJava 내부적인 구현의 관점에서 나름대로 한번 정리를 해볼려고 한다.


Reactive Streams

먼저 RxJava의 subscribeOn()observeOn()의 동작을 이해하기 위해서는, Reactive Streams 의 기본 메커니즘을 이해하여야 한다. RxJava도 결국 이 Reactive Streams의 규격을 따르는 구현체의 하나일 뿐이기 때문이다.

API Specification

Reactive Streams API 명세를 살펴보면, 아래와 같이 3개의 interface Publisher, Subscription, Subscriber 가 있다. (사실 Processor도 있지만, 여기서는 무시)

public interface Publisher<T> {  
    // 새로운 Subscription을 하나 만들어 Subscribe의 onSubscribe() 호출 
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscription {  
    // Publisher에게 n개의 데이터 발행 요청
    public void request(long n);
    // Publisher에게 데이터 발행 중단 요청
    public void cancel();
}

public interface Subscriber<T> {  
    // Subscription의 request() 호출
    // Publisher는 그에 대한 응답을 내려줌
    public void onSubscribe(Subscription s);
    // Subscription의 request() 요청에 대한 Publisher의 데이터 응답
    public void onNext(T t)
    // Publisher의 데이터 발행 중의 에러 공지
    public void onError(Throwable t);
    // Publisher의 데이터 발행 완료 공지
    public void onComplete();
}


Flow

  1. Subscriber 생성하여 Publishersubscribe(Subscriber s) 실행
  2. Publishersubscribe(Subscriber s): Subscription 생성하여 SubscriberonSubscribe(Subscription s) 실행
  3. SubscriberonSubscribe(Subscription s): Subscriptionrequest() 실행
  4. Subscriptionrequest(): Publisher에게 데이터 요청
  5. PublisherSubscription을 통해 SubscriberonNext()/onError()/onComplete() 실행

RxJava2의 Flowable 구현

코드를 통해 직접 확인해보기 위해 Reactive Streams의 규격을 잘 따르고 있는 RxJava2의 Flowable의 구현을 한번 살펴보자.
(Single, Mabye 의 경우는 Reactive Streams의 규격을 따르지 않고 있음.)

Rxjava2 2.2.19 버젼을 사용하였습니다.

subscribeOn

Flowable.just(1)  
        .subscribeOn(Schedulers.io()) // HERE
        .subcribe()

subscribeOn(Schedulers.io()) 코드의 구현을 따라가다 보면, SubscribeOnSubscriber이라는 FlowableSubscribeOn 클래스의 내부 클래스를 만나게 된다.
내부 코드 일부분을 살펴보면,

@Override
public void onNext(T t) {  
    downstream.onNext(t); // NOTE
}

@Override
public void onError(Throwable t) {  
    downstream.onError(t); // NOTE
    worker.dispose();
}

@Override
public void onComplete() {  
    downstream.onComplete(); // NOTE
    worker.dispose();
}

일단 downstream의 Subscriber의 동작을 실행하는 부분에는 설정한 Scheduler로 별도로 스케줄링하지 않고 있다.

@Override
public void request(final long n) {  
    if (SubscriptionHelper.validate(n)) {
        Subscription s = this.upstream.get();
        if (s != null) {
            requestUpstream(n, s); // NOTE
        } else {
            // ... 생략 ...
        }
    }
}

void requestUpstream(final long n, final Subscription s) {  
    if (nonScheduledRequests || Thread.currentThread() == get()) {
        s.request(n);
    } else {
        worker.schedule(new Request(s, n)); // NOTE
    }
}

static final class Request implements Runnable {  
    final Subscription upstream;
    final long n;

    Request(Subscription s, long n) {
        this.upstream = s;
        this.n = n;
    }

    @Override
    public void run() {
        upstream.request(n); // NOTE
    }
}

request() 함수를 따라가보면, scheduler.createWorker()(위에 코드는 넣지 않았지만)로 생성한 Worker에 의해 upstream Subscriptionrequest()이 스케쥴링됨을 확인할 수 있다.

observeOn

Flowable.just(1)  
        .observeOn(Schedulers.io()) // HERE
        .subcribe()

마찬가지로 observeOn(Schedulers.io()) 코드의 구현을 따라가다 보면, ObserveOnSubscriber이라는 FlowableObserveOn 클래스의 내부 클래스를 만나게 된다. 그리고 이 ObserveOnSubscriber 상속하고 있는 추상 클래스 BaseObserveOnSubscriber 를 살펴보면,

@Override public final void onNext(T t) {
    // ... 생략 ...
    trySchedule(); // NOTE
}

@Override
public final void onError(Throwable t) {  
    // ... 생략 ...
    trySchedule(); // NOTE
}

@Override
public final void onComplete() {  
    // ... 생략 ...
    trySchedule(); // NOTE
}

Subscriber 모든 함수의 마지막에 trySchedule()가 실행됨을 볼 수 있다. trySchedule() 의 구현을 보면,

final void trySchedule() {  
    if (getAndIncrement() != 0) {
        return;
    }
    worker.schedule(this); // NOTE
}

역시 scheduler.createWorker()를 통해 생성된 Worker를 통해 실행되는데, 여기서 thisRunnable 를 구현한 자신이다.
그럼 Runnable를 구현한 부분을 보면,

@Override
public final void run()  
    // ... 생략 ...
    runAsync(); // NOTE
}

runAsync()이라는 추상 메쏘드를 실행한다. 처음으로 돌아와서 이 추상 클래스를 상속한 ObserveOnSubscriber에서 구현한 runAsync() 을 확인해보면,

@Override
void runAsync() {  
    // ... 생략 ...
    final Subscriber<? super T> a = downstream;
    final SimpleQueue<T> q = queue;
    // ... 생략 ...
    for (;;) {
        try {
            v = q.poll();
        } catch (Throwable ex) {
            // ... 생략 ...
            a.onError(ex); // NOTE
            worker.dispose();
            return;
        }

        boolean empty = v == null;

        // NOTE: checkTerminated() 반환값이 true인 경우, 반환하기 전에 함수 내에서 a.onError() 또는 a.onComplete()실행됨
        if (checkTerminated(d, empty, a)) {
            return;
        }
        // ... 생략 ...
        a.onNext(v); // NOTE
    }
}

무한 반복문을 돌면서, queue에 있는 value들을 하나씩 처리하면서 적절한 downstream Subscriber의 함수를 실행하게 된다.
즉, downstream Subscriber의 함수들은 설정한 ScheduleWorker에 의해 스케쥴링된다.


정리

구현에서 중요한 부분만 최대한 요약할려고 했음에도, 설명이 다소 길어졌던 것 같다. 구현 부분에 대한 설명이 길었지만, 확인한 결과를 간단히 요약해서 정리하면(Flowable 기준),

  • subscribeOn(Scheduler scheduler): upstream Subscriptionreqeust()를 실행할 Scheduler 를 설정
  • observeOn(Scheduler scheduler): downstream Subscriber 의 동작(onNext(), onError(), onComplete()) 을 실행할 Scheduler를 설정

이 2가지만 기억한다면, 하나 이상의 subscribeOn() 또는 observeOn()을 붙였을 때 일어나는 스케쥴링 동작들을 이해하는데 큰 도움이 될 것이다.

아래는 추가로 Reactive Streams의 규격을 따르고 있지는 않지만, Rxjava2의 Single, Maybe에서의 동작에 대해서도 적어보았다.
Reactive Streams의 Publiser, Subscriber가 각각 SingleSource(또는 MaybeSource), SingleObserver(또는 MaybeObserver)에 대응된다고 생각하면 Flowable과 거의 유사하다.

  • subscribeOn(Scheduler scheduler): upstream SingleSource(또는 MaybeSource)의 subscribe()를 실행할 Scheduler 를 설정
  • observeOn(Scheduler scheduler): downstream SingleObserver(또는 MaybeObserver)의 동작(onSuccess(), onError(), onComplete()) 을 실행할 Scheduler를 설정

다음 글 에서는 실제 로그를 통해 이 동작을 확인해보도록 할 것이다.


참고