課題
Rxjavaで以下のようなコードを実行すると、
@Test
public void 実行スレッドを切り替える() throws InterruptedException {
Flowable.just("hello")
.doOnNext(val -> logger.info("emit"))
.map(o -> o)
.doOnNext(val -> logger.info("map"))
.subscribe(o -> logger.info("subscribe"));
}
すべて同じスレッドで実行される。
14:19:26.038 [main] INFO TestRxJava - emit
14:19:26.043 [main] INFO TestRxJava - map
14:19:26.043 [main] INFO TestRxJava - subscribe
各オペレータの実行スレッドを指定したい。
検証バージョン
pom.xml
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.14</version>
</dependency>
解決方法
- オペレータの先頭から実行スレッドを切り替えるときは
subscribeOn
- 現在のオペレータ以降から実行スレッドを切り替えるときは
observeOn
- 実行スレッドの指定は
Schedulers
Schedulers
SchedulersはSchedulerのファクトリで、実行スレッドの指定に使う。
参考 Schedulers Javadoc
用途ごとに複数のSchedulerが用意されている。
メソッド 用途 computation() Returns a default, shared Scheduler instance intended for computational work. from(Executor executor) Wraps an Executor into a new Scheduler instance and delegates schedule() calls to it. io() Returns a default, shared Scheduler instance intended for IO-bound work. newThread() Returns a default, shared Scheduler instance that creates a new Thread for each unit of work. single() Returns a default, shared, single-thread-backed Scheduler instance for work requiring strongly-sequential execution on the same background thread trampoline() Returns a default, shared Scheduler instance whose Scheduler.Worker instances queue work and execute them in a FIFO manner on one of the participating threads.
computation()
はJavadocの以下の記載のとおり、JVMが認識しているプロセッサ数と同じスレッド数のみキャッシュする。
The default instance has a backing pool of single-threaded ScheduledExecutorService instances equal to the number of available processors (Runtime.availableProcessors()) to the Java VM.
そのため、cpu処理がメインの場合は効果的(コンテキストスイッチが減る)だが、ブロッキング時間が長い処理(外部APIとのやり取りなど)に使うと実行スレッドがすぐに枯渇する。ブロッキング時間が長い処理にはcomputation()
ではなくio()
を利用することが推奨されている。
It is not recommended to perform blocking, IO-bound work on this scheduler. Use io() instead.
subscribeOn
オペレータの先頭から実行スレッドを切り替える。
補足: 矢印の色が実行スレッドを表している
subscribeOn(Schedulers.computation())
を利用すると、
@Test
public void 実行スレッドを切り替える() throws InterruptedException {
Flowable.just("hello") //ここからスレッドが切り替わる
.doOnNext(val -> logger.info("emit"))
.subscribeOn(Schedulers.computation())
.map(o -> o)
.doOnNext(val -> logger.info("map"))
.subscribe(o -> logger.info("subscribe"));
TimeUnit.SECONDS.sleep(10);
}
実行スレッドが切り替わる。
14:57:50.395 [RxComputationThreadPool-1] INFO TestRxJava - emit
14:57:50.398 [RxComputationThreadPool-1] INFO TestRxJava - map
14:57:50.398 [RxComputationThreadPool-1] INFO TestRxJava - subscribe
注意点 その1
subscribeOn
が複数呼ばれた場合は、最初に呼ばれたものだけが有効になる。
@Test
public void 実行スレッドを切り替える() throws InterruptedException {
Flowable.just("hello")
.doOnNext(val -> logger.info("emit"))
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.map(o -> o)
.doOnNext(val -> logger.info("map"))
.subscribe(o -> logger.info("subscribe"));
TimeUnit.SECONDS.sleep(10);
}
15:24:13.682 [RxComputationThreadPool-1] INFO TestRxJava - emit
15:24:13.688 [RxComputationThreadPool-1] INFO TestRxJava - map
15:24:13.688 [RxComputationThreadPool-1] INFO TestRxJava - subscribe
注意点 その2
オペレータにはデフォルトのSchedulerが指定されているものがある。そのため、オペレータによっては、subscribeOn
が無視される。
@Test
public void 実行スレッドを切り替える() throws InterruptedException {
Flowable.interval(1000,TimeUnit.MILLISECONDS)
.doOnNext(val -> logger.info("emit"))
.subscribeOn(Schedulers.io())
.map(o -> o)
.doOnNext(val -> logger.info("map"))
.subscribe(o -> logger.info("subscribe"));
TimeUnit.SECONDS.sleep(10);
}
16:04:08.288 [RxComputationThreadPool-1] INFO TestRxJava - emit
16:04:08.297 [RxComputationThreadPool-1] INFO TestRxJava - map
16:04:08.297 [RxComputationThreadPool-1] INFO TestRxJava - subscribe
...
デフォルトのSchedulerは@SchedulerSupport
で明示され、この値が@SchedulerSupport(value="none")
以外のときはsubscribeOn
が無視される。
参考 SchedulerSupport Javadoc
代わりに、Schedulerを指定するためのメソッドが用意されている。
@Test
public void 実行スレッドを切り替える() throws InterruptedException {
Flowable.interval(1000,TimeUnit.MILLISECONDS, Schedulers.io())
.doOnNext(val -> logger.info("emit"))
.map(o -> o)
.doOnNext(val -> logger.info("map"))
.subscribe(o -> logger.info("subscribe"));
TimeUnit.SECONDS.sleep(10);
}
17:20:14.921 [RxCachedThreadScheduler-1] INFO TestRxJava - emit
17:20:14.924 [RxCachedThreadScheduler-1] INFO TestRxJava - map
17:20:14.924 [RxCachedThreadScheduler-1] INFO TestRxJava - subscribe
...
observeOn
現在のオペレータ以降の実行スレッドを切り替える。
補足: 矢印の色が実行スレッドを表している
observeOn
は複数実行できる。
@Test
public void 実行スレッドを切り替える() throws InterruptedException {
Flowable.just("hello")
.doOnNext(val -> logger.info("emit"))
.observeOn(Schedulers.computation()) //以降のスレッドがSchedulers.computation()になる
.map(o -> o)
.doOnNext(val -> logger.info("map"))
.observeOn(Schedulers.io()) //以降のスレッドがSchedulers.io()になる
.subscribe(o -> logger.info("subscribe"));
TimeUnit.SECONDS.sleep(10);
}
17:46:00.632 [main] INFO TestRxJava - emit
17:46:00.636 [RxComputationThreadPool-1] INFO TestRxJava - map
17:46:00.637 [RxCachedThreadScheduler-1] INFO TestRxJava - subscribe