SIer だけど技術やりたいブログ

Rxjava subscribeOnとobserveOnで実行スレッドを指定する

課題

Rxjavaで以下のようなコードを実行すると、

    @Test
    public void 実行スレッドを切り替える() throws InterruptedException {
        Flowable.just("hello")
                .doOnNext(val -> logger.info("emit"))
                .map(o -> o)
                .doOnNext(val -> logger.info("map"))
                .subscribe(o -> logger.info("subscribe"));
    }

すべて同じスレッドで実行される。

14:19:26.038 [main] INFO TestRxJava - emit
14:19:26.043 [main] INFO TestRxJava - map
14:19:26.043 [main] INFO TestRxJava - subscribe

各オペレータの実行スレッドを指定したい。

検証バージョン

pom.xml

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>

解決方法

  • オペレータの先頭から実行スレッドを切り替えるときはsubscribeOn
  • 現在のオペレータ以降から実行スレッドを切り替えるときはobserveOn
  • 実行スレッドの指定はSchedulers

Schedulers

SchedulersはSchedulerのファクトリで、実行スレッドの指定に使う。
参考 Schedulers Javadoc

用途ごとに複数のSchedulerが用意されている。

メソッド用途
computation()Returns a default, shared Scheduler instance intended for computational work.
from(Executor executor)Wraps an Executor into a new Scheduler instance and delegates schedule() calls to it.
io()Returns a default, shared Scheduler instance intended for IO-bound work.
newThread()Returns a default, shared Scheduler instance that creates a new Thread for each unit of work.
single()Returns a default, shared, single-thread-backed Scheduler instance for work requiring strongly-sequential execution on the same background thread
trampoline()Returns a default, shared Scheduler instance whose Scheduler.Worker instances queue work and execute them in a FIFO manner on one of the participating threads.

computation()はJavadocの以下の記載のとおり、JVMが認識しているプロセッサ数と同じスレッド数のみキャッシュする。

The default instance has a backing pool of single-threaded ScheduledExecutorService instances equal to the number of available processors (Runtime.availableProcessors()) to the Java VM.

そのため、cpu処理がメインの場合は効果的(コンテキストスイッチが減る)だが、ブロッキング時間が長い処理(外部APIとのやり取りなど)に使うと実行スレッドがすぐに枯渇する。ブロッキング時間が長い処理にはcomputation()ではなくio()を利用することが推奨されている。

It is not recommended to perform blocking, IO-bound work on this scheduler. Use io() instead.

subscribeOn

オペレータの先頭から実行スレッドを切り替える。

subscribeOn
補足: 矢印の色が実行スレッドを表している

subscribeOn(Schedulers.computation())を利用すると、

    @Test
    public void 実行スレッドを切り替える() throws InterruptedException {
        Flowable.just("hello") //ここからスレッドが切り替わる
                .doOnNext(val -> logger.info("emit"))
                .subscribeOn(Schedulers.computation())
                .map(o -> o)
                .doOnNext(val -> logger.info("map"))
                .subscribe(o -> logger.info("subscribe"));
        TimeUnit.SECONDS.sleep(10);
    }

実行スレッドが切り替わる。

14:57:50.395 [RxComputationThreadPool-1] INFO TestRxJava - emit
14:57:50.398 [RxComputationThreadPool-1] INFO TestRxJava - map
14:57:50.398 [RxComputationThreadPool-1] INFO TestRxJava - subscribe

注意点 その1

subscribeOnが複数呼ばれた場合は、最初に呼ばれたものだけが有効になる。

    @Test
    public void 実行スレッドを切り替える() throws InterruptedException {
        Flowable.just("hello")
                .doOnNext(val -> logger.info("emit"))
                .subscribeOn(Schedulers.computation())
                .subscribeOn(Schedulers.io())
                .map(o -> o)
                .doOnNext(val -> logger.info("map"))
                .subscribe(o -> logger.info("subscribe"));
        TimeUnit.SECONDS.sleep(10);
    }
15:24:13.682 [RxComputationThreadPool-1] INFO TestRxJava - emit
15:24:13.688 [RxComputationThreadPool-1] INFO TestRxJava - map
15:24:13.688 [RxComputationThreadPool-1] INFO TestRxJava - subscribe

注意点 その2

オペレータにはデフォルトのSchedulerが指定されているものがある。そのため、オペレータによっては、subscribeOnが無視される。

    @Test
    public void 実行スレッドを切り替える() throws InterruptedException {
        Flowable.interval(1000,TimeUnit.MILLISECONDS)
                .doOnNext(val -> logger.info("emit"))
                .subscribeOn(Schedulers.io())
                .map(o -> o)
                .doOnNext(val -> logger.info("map"))
                .subscribe(o -> logger.info("subscribe"));
        TimeUnit.SECONDS.sleep(10);
    }
16:04:08.288 [RxComputationThreadPool-1] INFO TestRxJava - emit
16:04:08.297 [RxComputationThreadPool-1] INFO TestRxJava - map
16:04:08.297 [RxComputationThreadPool-1] INFO TestRxJava - subscribe
...

デフォルトのSchedulerは@SchedulerSupportで明示され、この値が@SchedulerSupport(value="none")以外のときはsubscribeOnが無視される。
参考 SchedulerSupport Javadoc

代わりに、Schedulerを指定するためのメソッドが用意されている。

    @Test
    public void 実行スレッドを切り替える() throws InterruptedException {
        Flowable.interval(1000,TimeUnit.MILLISECONDS, Schedulers.io())
                .doOnNext(val -> logger.info("emit"))
                .map(o -> o)
                .doOnNext(val -> logger.info("map"))
                .subscribe(o -> logger.info("subscribe"));
        TimeUnit.SECONDS.sleep(10);
    }
17:20:14.921 [RxCachedThreadScheduler-1] INFO TestRxJava - emit
17:20:14.924 [RxCachedThreadScheduler-1] INFO TestRxJava - map
17:20:14.924 [RxCachedThreadScheduler-1] INFO TestRxJava - subscribe
...

observeOn

現在のオペレータ以降の実行スレッドを切り替える。

observeOn 補足: 矢印の色が実行スレッドを表している

observeOnは複数実行できる。

    @Test
    public void 実行スレッドを切り替える() throws InterruptedException {
        Flowable.just("hello")
                .doOnNext(val -> logger.info("emit"))
                .observeOn(Schedulers.computation()) //以降のスレッドがSchedulers.computation()になる
                .map(o -> o)
                .doOnNext(val -> logger.info("map"))
                .observeOn(Schedulers.io()) //以降のスレッドがSchedulers.io()になる
                .subscribe(o -> logger.info("subscribe"));
        TimeUnit.SECONDS.sleep(10);
    }
17:46:00.632 [main] INFO TestRxJava - emit
17:46:00.636 [RxComputationThreadPool-1] INFO TestRxJava - map
17:46:00.637 [RxCachedThreadScheduler-1] INFO TestRxJava - subscribe