前提知識
RxJavaにはバックプレッシャと呼ばれる、流量制御の仕組みがある。
参考 詳解RxJava2:Backpressureで流速制御
onBackpressureBuffer
PublisherがSubscriberの消費スピードよりも早くデータを作った場合、バッファに生成されたデータを溜めておきたい。 このために、onBackpressureBufferメソッドが存在する。
このとき、引数でバッファの上限を設定できる。 onBackpressureBuffer(int capacity)
observeOn
Subscriberの実行スレッドを切り替えたい。 このためにobserveOnメソッドが存在する。
第3引数にbufferSizeが指定でき、**request(bufferSize)**が裏で実行される。
observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
サンプル
Publisherが100ms毎にデータを生成し、Subscriberが1000msかけて消費する。
@Test
public void バッファが溢れる() throws InterruptedException {
Flowable.interval(100, TimeUnit.MILLISECONDS)
//.doOnNext(x -> logger.info("1 doOnNext " + x))
//.doOnRequest(t -> logger.info("1 doOnRequest " + t))
//.doOnError(e -> logger.info("1 doOnError " + e))
.onBackpressureBuffer(5)
//.doOnNext(x -> logger.info("2 doOnNext " + x))
//.doOnRequest(t -> logger.info("2 doOnRequest " + t))
//.doOnError(e -> logger.info("2 doOnError " + e))
.observeOn(Schedulers.computation(), false, 3)
//.doOnNext(x -> logger.info("3 doOnNext " + x))
//.doOnRequest(t -> logger.info("3 doOnRequest " + t))
//.doOnError(e -> logger.info("3 doOnError " + e))
.subscribe(e -> {
TimeUnit.SECONDS.sleep(1);
}, Throwable::printStackTrace);
TimeUnit.MINUTES.sleep(10);
}
図にすると以下のようになる。
これを実行すると、以下のようなログが出力される。
22:11:12.850 [main] INFO TestRxJava - 3 doOnRequest 9223372036854775807
22:11:12.856 [main] INFO TestRxJava - 2 doOnRequest 3
22:11:12.856 [main] INFO TestRxJava - 1 doOnRequest 9223372036854775807
22:11:12.958 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 0
22:11:12.959 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 0
22:11:12.959 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 0
22:11:13.058 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 1
22:11:13.058 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 1
22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 2
22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2
22:11:13.258 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 3
22:11:13.359 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 4
22:11:13.460 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 5
22:11:13.558 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 6
22:11:13.659 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 7
22:11:13.759 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 8
22:11:13.859 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 9
22:11:13.959 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 10
22:11:13.960 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 1
22:11:14.059 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 11
22:11:45.094 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 2
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:99)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93)
at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
22:11:47.421 [RxComputationThreadPool-1] INFO TestRxJava - 2 doOnRequest 3
22:11:47.422 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
22:11:47.422 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
ログから、11のデータが生成されたときにBuffer is fullになっている。
しかし、Subscriberは2のデータまでしか消費していない。 22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2
したがって、onBackpressureBufferに9のデータを溜めようとしたときにエラーになっている。 設定したcapacityは5なのに・・・。
onBackpressureBuffer(int capacity)
バッファが溢れるタイミング
onBackpressureBufferメソッドは裏でFlowableOnBackpressureBufferを作る。 このとき、内部でバッファ管理するために以下のようにQueueが作られていた。
FlowableOnBackpressureBuffer.javaのGithubソースコード
で、このQueue実装がこんなかんじになっていて、キューサイズの指定が2のx乗(1,2,4,8,16,32…)に切り上げた値になっていた。なので今回の場合は、4 < 5 < 8 で実際のキャパシティが8。
別パターンで試す。onBackpressureBuffer(10)
にすると、 8 < 10 < 16のため、16に切り上げられるはず。
@Test
public void バッファが溢れる() throws InterruptedException {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(x -> logger.info("1 doOnNext " + x))
.doOnRequest(t -> logger.info("1 doOnRequest " + t))
.doOnError(e -> logger.info("1 doOnError " + e))
.onBackpressureBuffer(10)
.doOnNext(x -> logger.info("2 doOnNext " + x))
.doOnRequest(t -> logger.info("2 doOnRequest " + t))
.doOnError(e -> logger.info("2 doOnError " + e))
.observeOn(Schedulers.computation(), false, 3)
.doOnNext(x -> logger.info("3 doOnNext " + x))
.doOnRequest(t -> logger.info("3 doOnRequest " + t))
.doOnError(e -> logger.info("3 doOnError " + e))
.subscribe(e -> {
TimeUnit.SECONDS.sleep(1);
}, Throwable::printStackTrace);
TimeUnit.MINUTES.sleep(10);
}
ログを見ると、17こめのデータをバッファに溜めるときに例外が出ているのがわかる。
07:55:03.742 [main] INFO TestRxJava - 3 doOnRequest 9223372036854775807
07:55:03.742 [main] INFO TestRxJava - 2 doOnRequest 3
07:55:03.742 [main] INFO TestRxJava - 1 doOnRequest 9223372036854775807
07:55:03.852 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 0
07:55:03.852 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 0
07:55:03.852 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 0
07:55:03.977 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 1
07:55:03.977 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 1
07:55:04.071 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 2
07:55:04.071 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2
07:55:04.180 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 3
07:55:04.258 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 4
07:55:04.352 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 5
07:55:04.461 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 6
07:55:04.555 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 7
07:55:04.658 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 8
07:55:04.752 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 9
07:55:04.861 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 1
07:55:04.861 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 10
07:55:04.950 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 11
07:55:05.054 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 12
07:55:05.151 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 13
07:55:05.264 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 14
07:55:05.357 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 15
07:55:05.451 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 16
07:55:05.561 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 17
07:55:05.657 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 18
07:55:05.751 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 19
07:55:05.751 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
07:55:05.876 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:99)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93)
at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
結論
onBackpressureBuffer の capacity は2のx乗(1,2,4,8,16,32…)に切り上げるため、厳密な値ではないらしい。