RxJava의 subscribeOn와 observeOn (2)
이전 글 에서 RxJava의 subscribeOn()
, observeOn()
의 내부 구현을 한번 살펴보며 쓰레드의 동작에 대해 정리해보았다. 이번 글에서는 로그를 남겨보며 이 실제 동작을 한번 살펴 볼것이다.
Rxjava2 2.2.19 버젼을 사용하였습니다.
준비하기
먼저 편리한 테스트를 위해, 아래와 같이 JUnit Test 클래스 안에 private method, class를 준비했다.
private Scheduler newScheduler(int nThreads, String pattern) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(pattern).build();
ExecutorService pool = Executors.newFixedThreadPool(nThreads, threadFactory);
return Schedulers.from(pool);
}
private static class LogManager {
private long start;
public LogManager(long start) {
this.start = start;
}
private void log(String message) {
System.out.println(System.currentTimeMillis() - start + "\t| " +
Thread.currentThread().getName() + "\t| " + message);
}
}
TEST1: suscribeOn, observeOn 미사용
일단 하나의 Flowable
를 구독하는 간단한 테스트 method를 작성해보았다.
@Test
public void test1() throws Exception {
LogManager logManager = new LogManager(System.currentTimeMillis());
logManager.log("Starting");
Flowable.just(1, 2, 3)
.doOnRequest(r -> logManager.log("request-2: " + r))
.doOnNext(i -> logManager.log("onNext-1: " + i))
.flatMap(i -> Flowable.fromCallable(() -> {
logManager.log("flatMap: " + i);
Thread.sleep(300);
return i * 5;
}))
.doOnRequest(r -> logManager.log("request-1: " + r))
.doOnNext(i -> logManager.log("onNext-2: " + i))
.subscribe(i -> logManager.log("Got " + i),
t -> logManager.log("Error"),
() -> logManager.log("Completed"));
logManager.log("Exiting");
}
하나의 flatMap()
연산만이 있는 간단한 코드이다. 중간중간 request()
, onNext()
가 동작하는 쓰레드를 확인하기 위해 doOnRequest()
, doOnNext()
안에 로그를 남겼고, flatMap()
이 시간이 오래 걸리는 상황이라고 가정하기 위해 임의로 Thread.sleep(300)
을 넣었다.
테스트를 실행하면,
0 | main | Starting
161 | main | request-1: 9223372036854775807
161 | main | request-2: 128
161 | main | onNext-1: 1
162 | main | flatMap: 1
466 | main | onNext-2: 5
466 | main | Got 5
466 | main | onNext-1: 2
466 | main | flatMap: 2
767 | main | onNext-2: 10
767 | main | Got 10
767 | main | onNext-1: 3
767 | main | flatMap: 3
1068 | main | onNext-2: 15
1069 | main | Got 15
1071 | main | Completed
1071 | main | Exiting
어떤 Scheduler
설정도 하지 않았기 때문에, 모든 연산이 main
쓰레드에서 돌아감을 확인할 수 있다. 하나의 쓰레드에서 모든 것을 하고 있기 때문에, 연산들이 모두 순서대로 동작하고, 약 1000ms 뒤에 구독이 완전히 완료된 후에야 "Exiting" 로그가 main
쓰레드에 의해 출력됨을 볼 수 있다.
추가로, doOnRequest()
로그를 통해 subscribe()
는 기본적으로 upstream에 9223372036854775807(= Long.MAX_VALUE
) 개의 item을 request()
로 요청하고, flatMap()
은 upstream에 128개의 item을 request()
로 요청한다는 것도 알수 있다.
TEST2: subscribeOn 1개 사용
위의 테스트에서 subscribeOn()
을 subscribe()
위에 하나 넣어보았다.
@Test
public void test2() throws Exception {
Scheduler schedulerA = newScheduler(1, "Sched-A-%d"); // NEW!
LogManager logManager = new LogManager(System.currentTimeMillis());
logManager.log("Starting");
Flowable.just(1, 2, 3)
.doOnRequest(r -> logManager.log("request-2: " + r))
.doOnNext(i -> logManager.log("onNext-1: " + i))
.flatMap(i -> Flowable.fromCallable(() -> {
logManager.log("flatMap: " + i);
Thread.sleep(300);
return i * 5;
}))
.doOnRequest(r -> logManager.log("request-1: " + r))
.subscribeOn(schedulerA) // NEW!
.doOnNext(i -> logManager.log("onNext-2: " + i))
.subscribe(i -> logManager.log("Got " + i),
t -> logManager.log("Error"),
() -> logManager.log("Completed"));
logManager.log("Exiting");
Thread.sleep(2000); // main thread 종료 방지
}
0 | main | Starting
144 | main | Exiting
148 | Sched-A-0 | request-1: 9223372036854775807
148 | Sched-A-0 | request-2: 128
148 | Sched-A-0 | onNext-1: 1
149 | Sched-A-0 | flatMap: 1
454 | Sched-A-0 | onNext-2: 5
454 | Sched-A-0 | Got 5
454 | Sched-A-0 | onNext-1: 2
454 | Sched-A-0 | flatMap: 2
759 | Sched-A-0 | onNext-2: 10
759 | Sched-A-0 | Got 10
759 | Sched-A-0 | onNext-1: 3
759 | Sched-A-0 | flatMap: 3
1064 | Sched-A-0 | onNext-2: 15
1064 | Sched-A-0 | Got 15
1066 | Sched-A-0 | Completed
Flowable
를 생성한 후 구독완료와 상관없이 main
쓰레드에서 바로 "Exiting" 로그를 출력한다. 그리고 request()
, onNext()
가 모두 Sched-A-0
쓰레드에서 동작함을 확인할 수 있다.
이번엔 subscribeOn()
의 위치를 위로 옮겨보자.
@Test
public void test2() throws Exception {
Scheduler schedulerA = newScheduler(1, "Sched-A-%d"); // NEW!
LogManager logManager = new LogManager(System.currentTimeMillis());
logManager.log("Starting");
Flowable.just(1, 2, 3)
.doOnRequest(r -> logManager.log("request:" + r))
.subscribeOn(schedulerA) // NEW!
.doOnNext(i -> logManager.log("onNext:" + i))
.flatMap(i -> Flowable.fromCallable(() -> {
logManager.log("flatMap:" + i);
Thread.sleep(300);
return i * 5;
}))
.doOnRequest(r -> logManager.log("request:" + r))
.doOnNext(i -> logManager.log("onNext:" + i))
.subscribe(i -> logManager.log("Got " + i),
t -> logManager.log("Error"),
() -> logManager.log("Completed"));
logManager.log("Exiting");
Thread.sleep(2000); // main thread 종료 방지
}
0 | main | Starting
141 | main | request:9223372036854775807
142 | main | Exiting
143 | Sched-A-0 | request:128
143 | Sched-A-0 | onNext:1
144 | Sched-A-0 | flatMap:1
446 | Sched-A-0 | onNext:5
446 | Sched-A-0 | Got 5
446 | Sched-A-0 | onNext:2
447 | Sched-A-0 | flatMap:2
749 | Sched-A-0 | onNext:10
749 | Sched-A-0 | Got 10
749 | Sched-A-0 | onNext:3
750 | Sched-A-0 | flatMap:3
1052 | Sched-A-0 | onNext:15
1052 | Sched-A-0 | Got 15
1054 | Sched-A-0 | Completed
첫번째 request()
요청까지는 main
쓰레드에서 실행되고 이후에는 모두 Sched-A-0
쓰레드에서 실행된다.
TEST3: subscribeOn 2개 사용
@Test
public void test3() throws Exception {
Scheduler schedulerA = newScheduler(1, "Sched-A-%d");
Scheduler schedulerB = newScheduler(1, "Sched-B-%d"); // NEW!
LogManager logManager = new LogManager(System.currentTimeMillis());
logManager.log("Starting");
Flowable.just(1, 2, 3)
.doOnRequest(r -> logManager.log("request-2: " + r))
.subscribeOn(schedulerB) // NEW!
.doOnNext(i -> logManager.log("onNext-1: " + i))
.flatMap(i -> Flowable.fromCallable(() -> {
logManager.log("flatMap: " + i);
Thread.sleep(300);
return i * 5;
}))
.doOnRequest(r -> logManager.log("request-1: " + r))
.subscribeOn(schedulerA)
.doOnNext(i -> logManager.log("onNext-2: " + i))
.subscribe(i -> logManager.log("Got " + i),
t -> logManager.log("Error"),
() -> logManager.log("Completed"));
logManager.log("Exiting");
Thread.sleep(2000); // main thread 종료 방지
}
0 | main | Starting
150 | main | Exiting
154 | Sched-A-0 | request-1: 9223372036854775807
155 | Sched-B-0 | request-2: 128
155 | Sched-B-0 | onNext-1: 1
156 | Sched-B-0 | flatMap: 1
460 | Sched-B-0 | onNext-2: 5
460 | Sched-B-0 | Got 5
460 | Sched-B-0 | onNext-1: 2
460 | Sched-B-0 | flatMap: 2
761 | Sched-B-0 | onNext-2: 10
761 | Sched-B-0 | Got 10
761 | Sched-B-0 | onNext-1: 3
761 | Sched-B-0 | flatMap: 3
1063 | Sched-B-0 | onNext-2: 15
1063 | Sched-B-0 | Got 15
1065 | Sched-B-0 | Completed
downstream에서 위로 올라가며 subscribeOn()
를 만날때마다 request()
가 실행되는 쓰레드가 변경됨을 확인할 수 있다. 최상단의 upstream에 도달해서 downstream으로 onNext()
가 실행되기 시작할때는 결국 마지막으로 request()
가 실행된 쓰레드, Sched-B-0
에서 모두 동작한다.
이러한 이유로 최상단의 subscribeOn()
외에 아래에 추가적으로 subscribeOn()
을 넣는 것은 큰 의미가 없다.
TEST4: subscribeOn, observeOn 모두 사용
이번에는 중간에 observeOn()
도 넣어보았다.
@Test
public void test4() throws Exception {
Scheduler schedulerA = newScheduler(1, "Sched-A-%d");
Scheduler schedulerB = newScheduler(1, "Sched-B-%d");
Scheduler schedulerC = newScheduler(1, "Sched-C-%d");
Scheduler schedulerD = newScheduler(1, "Sched-D-%d");
LogManager logManager = new LogManager(System.currentTimeMillis());
logManager.log("Starting");
Flowable.just(1, 2, 3)
.doOnRequest(r -> logManager.log("request-2: " + r))
.subscribeOn(schedulerB)
.observeOn(schedulerC)
.doOnNext(i -> logManager.log("onNext-1: " + i))
.flatMap(i -> Flowable.fromCallable(() -> {
logManager.log("flatMap: " + i);
Thread.sleep(300);
return i * 5;
}))
.doOnRequest(r -> logManager.log("request-1: " + r))
.subscribeOn(schedulerA)
.observeOn(schedulerD)
.doOnNext(i -> logManager.log("onNext-2: " + i))
.subscribe(i -> logManager.log("Got " + i),
t -> logManager.log("Error"),
() -> logManager.log("Completed"));
logManager.log("Exiting");
Thread.sleep(5000); // main thread 종료 방지
}
1 | main | Starting
142 | main | Exiting
144 | Sched-A-0 | request-1: 128
145 | Sched-B-0 | request-2: 128
145 | Sched-C-0 | onNext-1: 1
146 | Sched-C-0 | flatMap: 1
447 | Sched-C-0 | onNext-1: 2
447 | Sched-D-0 | onNext-2: 5
447 | Sched-C-0 | flatMap: 2
447 | Sched-D-0 | Got 5
747 | Sched-C-0 | onNext-1: 3
747 | Sched-D-0 | onNext-2: 10
747 | Sched-C-0 | flatMap: 3
747 | Sched-D-0 | Got 10
1047 | Sched-D-0 | onNext-2: 15
1047 | Sched-D-0 | Got 15
1049 | Sched-D-0 | Completed
upstream에서 아래로 내려가며 observeOn()
를 만날때마다 onNext()
가 실행되는 쓰레드가 변경됨을 확인할 수 있다. flatMap()
까지는 Sched-C-0
쓰레드에서 실행되고, 그 아래 downstream의 onNext()
부터는 Sched-D-0
쓰레드에서 독립적으로 실행된다.
추가로 observeOn()
은 기본적으로 upstream에 128개의 item을 request()로 요청한다는 것도 알수 있다.
TEST5: flatMap 안에 subscribeOn 사용
전체 실행시간이 약 1000ms가 나오는 이유는 flatMap()
을 실행하는 쓰레드가 오직 하나(TEST4에서는 Sched-C-0
)이기 때문에 sleep 300ms에 의해 다음 item에 대한 onNext()
가 바로 실행되지 못하고 block되기 때문이다.
이러한 문제를 개선하기 위해선, flatMap()
안의 Flowable
에도 subscribeOn()
을 적용해 별도의 쓰레드에서 돌아가게 하면 된다.
@Test
public void test5() throws Exception {
Scheduler schedulerA = newScheduler(1, "Sched-A-%d");
Scheduler schedulerB = newScheduler(1, "Sched-B-%d");
Scheduler schedulerC = newScheduler(1, "Sched-C-%d");
Scheduler schedulerD = newScheduler(1, "Sched-D-%d");
Scheduler schedulerE = newScheduler(1, "Sched-E-%d"); // NEW!
LogManager logManager = new LogManager(System.currentTimeMillis());
logManager.log("Starting");
Flowable.just(1, 2, 3)
.doOnRequest(r -> logManager.log("request-2: " + r))
.subscribeOn(schedulerB)
.observeOn(schedulerC)
.doOnNext(i -> logManager.log("onNext-1: " + i))
.flatMap(i -> Flowable.fromCallable(() -> {
logManager.log("flatMap: " + i);
Thread.sleep(300);
return i * 5;
})
.doOnRequest(r -> logManager.log("flatMap request: " + r)) // NEW!
.doOnNext(x -> logManager.log("flatMap onNext: " + x)) // NEW!
.subscribeOn(schedulerE)) // NEW!
.doOnRequest(r -> logManager.log("request-1: " + r))
.subscribeOn(schedulerA)
.observeOn(schedulerD)
.doOnNext(i -> logManager.log("onNext-2: " + i))
.subscribe(i -> logManager.log("Got " + i),
t -> logManager.log("Error"),
() -> logManager.log("Completed"));
logManager.log("Exiting");
Thread.sleep(2000); // main thread 종료 방지
}
0 | main | Starting
139 | main | Exiting
141 | Sched-A-0 | request-1: 128
143 | Sched-B-0 | request-2: 128
143 | Sched-C-0 | onNext-1: 1
145 | Sched-C-0 | onNext-1: 2
145 | Sched-C-0 | onNext-1: 3
145 | Sched-E-0 | flatMap request: 128
146 | Sched-E-0 | flatMap: 1
447 | Sched-E-0 | flatMap onNext: 5
447 | Sched-E-0 | flatMap request: 128
447 | Sched-D-0 | onNext-2: 5
447 | Sched-E-0 | flatMap: 2
447 | Sched-D-0 | Got 5
747 | Sched-E-0 | flatMap onNext: 10
748 | Sched-D-0 | onNext-2: 10
748 | Sched-D-0 | Got 10
748 | Sched-E-0 | flatMap request: 128
748 | Sched-E-0 | flatMap: 3
1051 | Sched-E-0 | flatMap onNext: 15
1051 | Sched-D-0 | onNext-2: 15
1051 | Sched-D-0 | Got 15
1052 | Sched-D-0 | Completed
이렇게 돌리면 당연히 실행시간은 변화가 없다. 왜냐하면 schedulerE
의 쓰레드 수가 1개이기 때문에 결국 sleep을 만났을 때 block 될수 밖에 없다.
아래처럼 schedulerE
의 쓰레드 수를 item 수에 맞게 3으로 늘려서 다시 돌려보자.
Scheduler schedulerE = newScheduler(3, "Sched-E-%d");
0 | main | Starting
149 | main | Exiting
152 | Sched-A-0 | request-1: 128
153 | Sched-B-0 | request-2: 128
153 | Sched-C-0 | onNext-1: 1
155 | Sched-C-0 | onNext-1: 2
155 | Sched-C-0 | onNext-1: 3
155 | Sched-E-0 | flatMap request: 128
155 | Sched-E-1 | flatMap request: 128
155 | Sched-E-1 | flatMap: 2
155 | Sched-E-0 | flatMap: 1
155 | Sched-E-2 | flatMap request: 128
155 | Sched-E-2 | flatMap: 3
459 | Sched-E-1 | flatMap onNext: 10
459 | Sched-D-0 | onNext-2: 10
459 | Sched-D-0 | Got 10
460 | Sched-E-0 | flatMap onNext: 5
460 | Sched-E-2 | flatMap onNext: 15
460 | Sched-D-0 | onNext-2: 5
460 | Sched-D-0 | Got 5
460 | Sched-D-0 | onNext-2: 15
460 | Sched-D-0 | Got 15
461 | Sched-D-0 | Completed
Sched-E-0
, Sched-E-1
, Sched-E-2
3개 쓰레드에서 병렬로 flatMap()
이 돌아가게 되며 실행시간이 단축됨을 볼 수 있다.