RxJava의 subscribeOn와 observeOn (2)

이전 글 에서 RxJava의 subscribeOn(), observeOn()의 내부 구현을 한번 살펴보며 쓰레드의 동작에 대해 정리해보았다. 이번 글에서는 로그를 남겨보며 이 실제 동작을 한번 살펴 볼것이다.

Rxjava2 2.2.19 버젼을 사용하였습니다.

준비하기

먼저 편리한 테스트를 위해, 아래와 같이 JUnit Test 클래스 안에 private method, class를 준비했다.

private Scheduler newScheduler(int nThreads, String pattern) {  
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(pattern).build();
    ExecutorService pool = Executors.newFixedThreadPool(nThreads, threadFactory);
    return Schedulers.from(pool);
}

private static class LogManager {  
    private long start;

    public LogManager(long start) {
        this.start = start;
    }

    private void log(String message) {
        System.out.println(System.currentTimeMillis() - start + "\t| " +
                           Thread.currentThread().getName() + "\t| " + message);
    }
}

TEST1: suscribeOn, observeOn 미사용

일단 하나의 Flowable를 구독하는 간단한 테스트 method를 작성해보았다.

@Test
public void test1() throws Exception {  
    LogManager logManager = new LogManager(System.currentTimeMillis());
    logManager.log("Starting");
    Flowable.just(1, 2, 3)
            .doOnRequest(r -> logManager.log("request-2: " + r))
            .doOnNext(i -> logManager.log("onNext-1: " + i))
            .flatMap(i -> Flowable.fromCallable(() -> {
                logManager.log("flatMap: " + i);
                Thread.sleep(300);
                return i * 5;
            }))
            .doOnRequest(r -> logManager.log("request-1: " + r))
            .doOnNext(i -> logManager.log("onNext-2: " + i))
            .subscribe(i -> logManager.log("Got " + i),
                       t -> logManager.log("Error"),
                       () -> logManager.log("Completed"));
    logManager.log("Exiting");
}

하나의 flatMap() 연산만이 있는 간단한 코드이다. 중간중간 request(), onNext()가 동작하는 쓰레드를 확인하기 위해 doOnRequest(), doOnNext() 안에 로그를 남겼고, flatMap()이 시간이 오래 걸리는 상황이라고 가정하기 위해 임의로 Thread.sleep(300)을 넣었다.

테스트를 실행하면,

0    | main  | Starting  
161    | main  | request-1: 9223372036854775807  
161    | main  | request-2: 128  
161    | main  | onNext-1: 1  
162    | main  | flatMap: 1  
466    | main  | onNext-2: 5  
466    | main  | Got 5  
466    | main  | onNext-1: 2  
466    | main  | flatMap: 2  
767    | main  | onNext-2: 10  
767    | main  | Got 10  
767    | main  | onNext-1: 3  
767    | main  | flatMap: 3  
1068    | main  | onNext-2: 15  
1069    | main  | Got 15  
1071    | main  | Completed  
1071    | main  | Exiting  

어떤 Scheduler 설정도 하지 않았기 때문에, 모든 연산이 main 쓰레드에서 돌아감을 확인할 수 있다. 하나의 쓰레드에서 모든 것을 하고 있기 때문에, 연산들이 모두 순서대로 동작하고, 약 1000ms 뒤에 구독이 완전히 완료된 후에야 "Exiting" 로그가 main 쓰레드에 의해 출력됨을 볼 수 있다.

추가로, doOnRequest() 로그를 통해 subscribe()는 기본적으로 upstream에 9223372036854775807(= Long.MAX_VALUE) 개의 item을 request()로 요청하고, flatMap()은 upstream에 128개의 item을 request()로 요청한다는 것도 알수 있다.


TEST2: subscribeOn 1개 사용

위의 테스트에서 subscribeOn()subscribe() 위에 하나 넣어보았다.

@Test
public void test2() throws Exception {  
    Scheduler schedulerA = newScheduler(1, "Sched-A-%d"); // NEW!

    LogManager logManager = new LogManager(System.currentTimeMillis());
    logManager.log("Starting");
    Flowable.just(1, 2, 3)
            .doOnRequest(r -> logManager.log("request-2: " + r))
            .doOnNext(i -> logManager.log("onNext-1: " + i))
            .flatMap(i -> Flowable.fromCallable(() -> {
                logManager.log("flatMap: " + i);
                Thread.sleep(300);
                return i * 5;
            }))
            .doOnRequest(r -> logManager.log("request-1: " + r))
            .subscribeOn(schedulerA) // NEW!
            .doOnNext(i -> logManager.log("onNext-2: " + i))
            .subscribe(i -> logManager.log("Got " + i),
                       t -> logManager.log("Error"),
                       () -> logManager.log("Completed"));
    logManager.log("Exiting");

    Thread.sleep(2000); // main thread 종료 방지
}
0    | main  | Starting  
144    | main  | Exiting  
148    | Sched-A-0 | request-1: 9223372036854775807  
148    | Sched-A-0 | request-2: 128  
148    | Sched-A-0 | onNext-1: 1  
149    | Sched-A-0 | flatMap: 1  
454    | Sched-A-0 | onNext-2: 5  
454    | Sched-A-0 | Got 5  
454    | Sched-A-0 | onNext-1: 2  
454    | Sched-A-0 | flatMap: 2  
759    | Sched-A-0 | onNext-2: 10  
759    | Sched-A-0 | Got 10  
759    | Sched-A-0 | onNext-1: 3  
759    | Sched-A-0 | flatMap: 3  
1064    | Sched-A-0 | onNext-2: 15  
1064    | Sched-A-0 | Got 15  
1066    | Sched-A-0 | Completed  

Flowable를 생성한 후 구독완료와 상관없이 main 쓰레드에서 바로 "Exiting" 로그를 출력한다. 그리고 request(), onNext()가 모두 Sched-A-0 쓰레드에서 동작함을 확인할 수 있다.

이번엔 subscribeOn()의 위치를 위로 옮겨보자.

@Test
public void test2() throws Exception {  
    Scheduler schedulerA = newScheduler(1, "Sched-A-%d"); // NEW!

    LogManager logManager = new LogManager(System.currentTimeMillis());
    logManager.log("Starting");
    Flowable.just(1, 2, 3)
            .doOnRequest(r -> logManager.log("request:" + r))
            .subscribeOn(schedulerA) // NEW!
            .doOnNext(i -> logManager.log("onNext:" + i))
            .flatMap(i -> Flowable.fromCallable(() -> {
                logManager.log("flatMap:" + i);
                Thread.sleep(300);
                return i * 5;
            }))
            .doOnRequest(r -> logManager.log("request:" + r))
            .doOnNext(i -> logManager.log("onNext:" + i))
            .subscribe(i -> logManager.log("Got " + i),
                       t -> logManager.log("Error"),
                       () -> logManager.log("Completed"));
    logManager.log("Exiting");

    Thread.sleep(2000); // main thread 종료 방지
}
0    | main  | Starting  
141    | main  | request:9223372036854775807  
142    | main  | Exiting  
143    | Sched-A-0 | request:128  
143    | Sched-A-0 | onNext:1  
144    | Sched-A-0 | flatMap:1  
446    | Sched-A-0 | onNext:5  
446    | Sched-A-0 | Got 5  
446    | Sched-A-0 | onNext:2  
447    | Sched-A-0 | flatMap:2  
749    | Sched-A-0 | onNext:10  
749    | Sched-A-0 | Got 10  
749    | Sched-A-0 | onNext:3  
750    | Sched-A-0 | flatMap:3  
1052    | Sched-A-0 | onNext:15  
1052    | Sched-A-0 | Got 15  
1054    | Sched-A-0 | Completed  

첫번째 request() 요청까지는 main 쓰레드에서 실행되고 이후에는 모두 Sched-A-0 쓰레드에서 실행된다.


TEST3: subscribeOn 2개 사용

@Test
public void test3() throws Exception {  
    Scheduler schedulerA = newScheduler(1, "Sched-A-%d");
    Scheduler schedulerB = newScheduler(1, "Sched-B-%d"); // NEW!

    LogManager logManager = new LogManager(System.currentTimeMillis());
    logManager.log("Starting");
    Flowable.just(1, 2, 3)
            .doOnRequest(r -> logManager.log("request-2: " + r))
            .subscribeOn(schedulerB) // NEW!
            .doOnNext(i -> logManager.log("onNext-1: " + i))
            .flatMap(i -> Flowable.fromCallable(() -> {
                logManager.log("flatMap: " + i);
                Thread.sleep(300);
                return i * 5;
            }))
            .doOnRequest(r -> logManager.log("request-1: " + r))
            .subscribeOn(schedulerA)
            .doOnNext(i -> logManager.log("onNext-2: " + i))
            .subscribe(i -> logManager.log("Got " + i),
                       t -> logManager.log("Error"),
                       () -> logManager.log("Completed"));
    logManager.log("Exiting");

    Thread.sleep(2000); // main thread 종료 방지
}
0    | main  | Starting  
150    | main  | Exiting  
154    | Sched-A-0 | request-1: 9223372036854775807  
155    | Sched-B-0 | request-2: 128  
155    | Sched-B-0 | onNext-1: 1  
156    | Sched-B-0 | flatMap: 1  
460    | Sched-B-0 | onNext-2: 5  
460    | Sched-B-0 | Got 5  
460    | Sched-B-0 | onNext-1: 2  
460    | Sched-B-0 | flatMap: 2  
761    | Sched-B-0 | onNext-2: 10  
761    | Sched-B-0 | Got 10  
761    | Sched-B-0 | onNext-1: 3  
761    | Sched-B-0 | flatMap: 3  
1063    | Sched-B-0 | onNext-2: 15  
1063    | Sched-B-0 | Got 15  
1065    | Sched-B-0 | Completed  

downstream에서 위로 올라가며 subscribeOn()를 만날때마다 request()가 실행되는 쓰레드가 변경됨을 확인할 수 있다. 최상단의 upstream에 도달해서 downstream으로 onNext()가 실행되기 시작할때는 결국 마지막으로 request()가 실행된 쓰레드, Sched-B-0에서 모두 동작한다.

이러한 이유로 최상단의 subscribeOn() 외에 아래에 추가적으로 subscribeOn()을 넣는 것은 큰 의미가 없다.


TEST4: subscribeOn, observeOn 모두 사용

이번에는 중간에 observeOn()도 넣어보았다.

@Test
public void test4() throws Exception {  
    Scheduler schedulerA = newScheduler(1, "Sched-A-%d");
    Scheduler schedulerB = newScheduler(1, "Sched-B-%d");
    Scheduler schedulerC = newScheduler(1, "Sched-C-%d");
    Scheduler schedulerD = newScheduler(1, "Sched-D-%d");

    LogManager logManager = new LogManager(System.currentTimeMillis());
    logManager.log("Starting");
    Flowable.just(1, 2, 3)
            .doOnRequest(r -> logManager.log("request-2: " + r))
            .subscribeOn(schedulerB)
            .observeOn(schedulerC)
            .doOnNext(i -> logManager.log("onNext-1: " + i))
            .flatMap(i -> Flowable.fromCallable(() -> {
                logManager.log("flatMap: " + i);
                Thread.sleep(300);
                return i * 5;
            }))
            .doOnRequest(r -> logManager.log("request-1: " + r))
            .subscribeOn(schedulerA)
            .observeOn(schedulerD)
            .doOnNext(i -> logManager.log("onNext-2: " + i))
            .subscribe(i -> logManager.log("Got " + i),
                       t -> logManager.log("Error"),
                       () -> logManager.log("Completed"));
    logManager.log("Exiting");

    Thread.sleep(5000); // main thread 종료 방지
}
1    | main  | Starting  
142    | main  | Exiting  
144    | Sched-A-0 | request-1: 128  
145    | Sched-B-0 | request-2: 128  
145    | Sched-C-0 | onNext-1: 1  
146    | Sched-C-0 | flatMap: 1  
447    | Sched-C-0 | onNext-1: 2  
447    | Sched-D-0 | onNext-2: 5  
447    | Sched-C-0 | flatMap: 2  
447    | Sched-D-0 | Got 5  
747    | Sched-C-0 | onNext-1: 3  
747    | Sched-D-0 | onNext-2: 10  
747    | Sched-C-0 | flatMap: 3  
747    | Sched-D-0 | Got 10  
1047    | Sched-D-0 | onNext-2: 15  
1047    | Sched-D-0 | Got 15  
1049    | Sched-D-0 | Completed  

upstream에서 아래로 내려가며 observeOn()를 만날때마다 onNext()가 실행되는 쓰레드가 변경됨을 확인할 수 있다. flatMap()까지는 Sched-C-0 쓰레드에서 실행되고, 그 아래 downstream의 onNext()부터는 Sched-D-0 쓰레드에서 독립적으로 실행된다.

추가로 observeOn()은 기본적으로 upstream에 128개의 item을 request()로 요청한다는 것도 알수 있다.


TEST5: flatMap 안에 subscribeOn 사용

전체 실행시간이 약 1000ms가 나오는 이유는 flatMap()을 실행하는 쓰레드가 오직 하나(TEST4에서는 Sched-C-0)이기 때문에 sleep 300ms에 의해 다음 item에 대한 onNext()가 바로 실행되지 못하고 block되기 때문이다.

이러한 문제를 개선하기 위해선, flatMap() 안의 Flowable 에도 subscribeOn() 을 적용해 별도의 쓰레드에서 돌아가게 하면 된다.

@Test
public void test5() throws Exception {  
    Scheduler schedulerA = newScheduler(1, "Sched-A-%d");
    Scheduler schedulerB = newScheduler(1, "Sched-B-%d");
    Scheduler schedulerC = newScheduler(1, "Sched-C-%d");
    Scheduler schedulerD = newScheduler(1, "Sched-D-%d");
    Scheduler schedulerE = newScheduler(1, "Sched-E-%d"); // NEW!

    LogManager logManager = new LogManager(System.currentTimeMillis());
    logManager.log("Starting");
    Flowable.just(1, 2, 3)
            .doOnRequest(r -> logManager.log("request-2: " + r))
            .subscribeOn(schedulerB)
            .observeOn(schedulerC)
            .doOnNext(i -> logManager.log("onNext-1: " + i))
            .flatMap(i -> Flowable.fromCallable(() -> {
                logManager.log("flatMap: " + i);
                Thread.sleep(300);
                return i * 5;
            })
                                  .doOnRequest(r -> logManager.log("flatMap request: " + r)) // NEW!
                                  .doOnNext(x -> logManager.log("flatMap onNext: " + x)) // NEW!
                                  .subscribeOn(schedulerE))  // NEW!
            .doOnRequest(r -> logManager.log("request-1: " + r))
            .subscribeOn(schedulerA)
            .observeOn(schedulerD)
            .doOnNext(i -> logManager.log("onNext-2: " + i))
            .subscribe(i -> logManager.log("Got " + i),
                       t -> logManager.log("Error"),
                       () -> logManager.log("Completed"));
    logManager.log("Exiting");

    Thread.sleep(2000); // main thread 종료 방지
}
0    | main  | Starting  
139    | main  | Exiting  
141    | Sched-A-0 | request-1: 128  
143    | Sched-B-0 | request-2: 128  
143    | Sched-C-0 | onNext-1: 1  
145    | Sched-C-0 | onNext-1: 2  
145    | Sched-C-0 | onNext-1: 3  
145    | Sched-E-0 | flatMap request: 128  
146    | Sched-E-0 | flatMap: 1  
447    | Sched-E-0 | flatMap onNext: 5  
447    | Sched-E-0 | flatMap request: 128  
447    | Sched-D-0 | onNext-2: 5  
447    | Sched-E-0 | flatMap: 2  
447    | Sched-D-0 | Got 5  
747    | Sched-E-0 | flatMap onNext: 10  
748    | Sched-D-0 | onNext-2: 10  
748    | Sched-D-0 | Got 10  
748    | Sched-E-0 | flatMap request: 128  
748    | Sched-E-0 | flatMap: 3  
1051    | Sched-E-0 | flatMap onNext: 15  
1051    | Sched-D-0 | onNext-2: 15  
1051    | Sched-D-0 | Got 15  
1052    | Sched-D-0 | Completed  

이렇게 돌리면 당연히 실행시간은 변화가 없다. 왜냐하면 schedulerE의 쓰레드 수가 1개이기 때문에 결국 sleep을 만났을 때 block 될수 밖에 없다.

아래처럼 schedulerE의 쓰레드 수를 item 수에 맞게 3으로 늘려서 다시 돌려보자.

Scheduler schedulerE = newScheduler(3, "Sched-E-%d");  
0    | main  | Starting  
149    | main  | Exiting  
152    | Sched-A-0 | request-1: 128  
153    | Sched-B-0 | request-2: 128  
153    | Sched-C-0 | onNext-1: 1  
155    | Sched-C-0 | onNext-1: 2  
155    | Sched-C-0 | onNext-1: 3  
155    | Sched-E-0 | flatMap request: 128  
155    | Sched-E-1 | flatMap request: 128  
155    | Sched-E-1 | flatMap: 2  
155    | Sched-E-0 | flatMap: 1  
155    | Sched-E-2 | flatMap request: 128  
155    | Sched-E-2 | flatMap: 3  
459    | Sched-E-1 | flatMap onNext: 10  
459    | Sched-D-0 | onNext-2: 10  
459    | Sched-D-0 | Got 10  
460    | Sched-E-0 | flatMap onNext: 5  
460    | Sched-E-2 | flatMap onNext: 15  
460    | Sched-D-0 | onNext-2: 5  
460    | Sched-D-0 | Got 5  
460    | Sched-D-0 | onNext-2: 15  
460    | Sched-D-0 | Got 15  
461    | Sched-D-0 | Completed  

Sched-E-0, Sched-E-1, Sched-E-2 3개 쓰레드에서 병렬로 flatMap()이 돌아가게 되며 실행시간이 단축됨을 볼 수 있다.