以下のバージョンで検証しました。
pom.xml
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.14</version>
</dependency>
ConnectableFlowable
ConnectableFlowableはHotなストリームで、複数のSubscriberに同じストリームを購読させるときに利用するクラス。
参考 ConnectableFlowable Javadoc
publish(), connect()
publish()
を利用するとCold -> Hotなストリームに変換できる。 ConnectableFlowableはconnect()
が呼ばれた時点からデータを流す。
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(2)
.publish();
flowable.subscribe(new DebugSubscriber());
flowable.subscribe(new DebugSubscriber());
flowable.connect();
TimeUnit.MILLISECONDS.sleep(10000);
}
09:11:31.433 [main] INFO DebugSubscriber - onSubscribe
09:11:31.438 [main] INFO DebugSubscriber - onSubscribe
09:11:31.550 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
09:11:31.550 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
09:11:31.649 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
09:11:31.649 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
09:11:31.651 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
09:11:31.651 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
connect()
しないと、データが流れない。
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(2)
.publish();
flowable.subscribe(new DebugSubscriber());
// flowable.connect();
TimeUnit.MILLISECONDS.sleep(10000);
}
16:40:27.981 [main] INFO DebugSubscriber - onSubscribe
すでにconnect()
で処理が開始しているストリームをsubscribe()
すると、ストリームの途中からデータを受け取ることになる。
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish();
flowable.connect();
TimeUnit.MILLISECONDS.sleep(200);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
16:42:15.367 [main] INFO DebugSubscriber - onSubscribe
16:42:15.466 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
16:42:15.467 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
処理が終わったストリームをsubscribe()
した場合、何もデータが流れない。
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish();
flowable.connect();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(400);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
17:13:12.205 [main] INFO DebugSubscriber - onSubscribe
17:13:12.304 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
17:13:12.403 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
17:13:12.508 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
17:13:12.510 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
17:13:12.609 [main] INFO DebugSubscriber - onSubscribe
再度connect()
すると、初めからデータが流れる。
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish();
flowable.connect();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(400);
flowable.connect();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
18:20:07.100 [main] INFO DebugSubscriber - onSubscribe
18:20:07.198 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
18:20:07.297 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
18:20:07.397 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
18:20:07.398 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
18:20:07.503 [main] INFO DebugSubscriber - onSubscribe
18:20:07.604 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0
18:20:07.704 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1
18:20:07.805 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2
18:20:07.805 [RxComputationThreadPool-2] INFO DebugSubscriber - onComplete
connect()
の戻り値であるDisposableを利用してdispose()
すると、ストリームを破棄できる。
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.publish();
flowable.subscribe(new DebugSubscriber());
Disposable disposable = flowable.connect();
TimeUnit.MILLISECONDS.sleep(200);
disposable.dispose();
flowable.connect();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
17:07:51.631 [main] INFO DebugSubscriber - onSubscribe
17:07:51.758 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
17:07:51.858 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
17:07:51.858 [main] INFO DebugSubscriber - onSubscribe
17:07:51.959 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0
17:07:52.060 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1
17:07:52.168 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2
replay()
publish()
で生成したストリームは、subscribe()
した時点からのデータしかSubscriberに流せない。ストリームのデータをバッファしておき、subscribe()
にバッファしたデータも返したいときは、replay()
を利用する。
)
@Test
public void publish() throws InterruptedException {
ConnectableFlowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.replay();
flowable.connect();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(200);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
09:51:37.879 [main] INFO DebugSubscriber - onSubscribe
09:51:37.977 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
09:51:38.077 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
09:51:38.083 [main] INFO DebugSubscriber - onSubscribe
09:51:38.083 [main] INFO DebugSubscriber - onNext 0
09:51:38.083 [main] INFO DebugSubscriber - onNext 1
09:51:38.176 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
09:51:38.176 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
09:51:38.177 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
09:51:38.177 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
バッファのポリシは、
バッファサイズや
時刻で指定できる。
デフォルトでは無制限にバッファがキャッシュされるみたいなので、ちょうどいいバッファポリシを選択する必要がある。
ConnectableFlowable -> Flowable
ConnectableFlowableはconnect()
を呼ばないとデータが流れない。connect()
の呼び出しを自動でやってくれるFlowableに変換するメソッドが複数用意されている。
refCount()
Subscriberがsubscribe()
したときにconnect()
を呼び出すFlowableを生成する。
@Test
public void refCount() throws InterruptedException {
Flowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish().refCount();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(200);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
10:14:43.912 [main] INFO DebugSubscriber - onSubscribe
10:14:44.027 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:14:44.125 [main] INFO DebugSubscriber - onSubscribe
10:14:44.126 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:14:44.126 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:14:44.227 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:14:44.227 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:14:44.228 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
10:14:44.229 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
いくつのSubscriberがsubscribe()
したときにconnect()
するかを、調整できる。
@Test
public void refCount() throws InterruptedException {
Flowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish().refCount(2);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(400);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
10:26:58.425 [main] INFO DebugSubscriber - onSubscribe
10:26:58.829 [main] INFO DebugSubscriber - onSubscribe
10:26:58.945 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:26:58.945 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:26:59.044 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:26:59.044 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:26:59.144 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:26:59.144 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:26:59.146 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
10:26:59.146 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
処理が終了したストリームをsubscribe()
すると、初めからデータを流し直す。(Subscriberが0になると、disposeする。Subscriber数がしきい値を超えるとconnectする)
@Test
public void refCount() throws InterruptedException {
Flowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish().refCount();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(400);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
10:22:59.397 [main] INFO DebugSubscriber - onSubscribe
10:22:59.511 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:22:59.610 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:22:59.712 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:22:59.714 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
10:22:59.812 [main] INFO DebugSubscriber - onSubscribe
10:22:59.913 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 0
10:23:00.014 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 1
10:23:00.113 [RxComputationThreadPool-2] INFO DebugSubscriber - onNext 2
10:23:00.113 [RxComputationThreadPool-2] INFO DebugSubscriber - onComplete
autoConnect()
ほぼrefCount()
と同じ。
ただし処理が完了したストリームにsubscribe()
してもデータを流し直さない。(再度connectしない)
@Test
public void autoConnect() throws InterruptedException {
Flowable flowable = Flowable
.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.publish().autoConnect();
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(400);
flowable.subscribe(new DebugSubscriber());
TimeUnit.MILLISECONDS.sleep(10000);
}
10:52:27.423 [main] INFO DebugSubscriber - onSubscribe
10:52:27.534 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 0
10:52:27.634 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 1
10:52:27.740 [RxComputationThreadPool-1] INFO DebugSubscriber - onNext 2
10:52:27.742 [RxComputationThreadPool-1] INFO DebugSubscriber - onComplete
10:52:27.835 [main] INFO DebugSubscriber - onSubscribe
share()
Flowable.publish().refCount()
のエイリアス。