2015-12-23 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 3)の振り返り
Learning Reactive Programming With Java 8
- 作者: Nickolay Tsvetinov
- 出版社/メーカー: Packt Publishing
- 発売日: 2015/06/24
- メディア: ペーパーバック
- この商品を含むブログを見る
Chapter 3: Creating and Connecting Observables, Observers, and Subjects
Introduction
学ぶ必要があるもの
- ユーザ入力だけではなく様々なデータをObservableインスタンスを通じて扱えるか?
- 振る舞いによってどのようなObservableインスタンスの種類があるのか?
- Observableインスタンスを、いつ、どのように登録解除(unsubscribe)すべきか?
- どのようにsubscriptionとObserverインスタンスを使うのか?
- Subject typeについて
この章では下記を学んでいく
- Observable のファクトリーメソッド
- Observer と Subscriber
- Hot Observable と Cold Observable:Connectable Observable
- Subjectとは?
- Observable の生成
The Observable.from method
引数の型
- Iterable
- 配列
Iterableオブジェクトによってはiteratorメソッドが一度しか呼ぶことができないものもあり、一度しか呼ぶことができないソースの場合、Observable#subscribeメソッドを呼ぶとエラーになる。
Listは複数回呼んでも問題ない
List<String> list = Arrays.asList("blue", "red", "green", "yellow", "Orange", "cyan", "purple"); Observable<String> observable = Observable.from(list); observable.subscribe( color -> System.out.print(color + "|"), System.out::println, System.out::println); observable .subscribe(color -> System.out.print(color + "/"));
DirectStreamは複数回呼べない
Path resources = Paths.get("src", "resources"); try (DirectoryStream<Path> dStream = Files .newDirectoryStream(resources)) { Observable<Path> dirObservable = Observable .from(dStream); dirObservable.subscribe(System.out::println); // 下記でエラーが発生する dirObservable.subscribe(System.out::println); } catch (IOException e) { e.printStackTrace(); }
http://docs.oracle.com/javase/jp/7/api/java/nio/file/DirectoryStream.html
DirectoryStream は Iterable を拡張しますが、それがサポートする Iterator は 1 つだけなので、それは汎用の Iterable ではありません。つまり、iterator メソッドを呼び出して、2 つめまたはそれに続くイテレータを取得すると、IllegalStateException がスローされます。
subscriveメソッド
この本で主に使われているものは下記の2パターン
public Subscription subscribe(Action1<? super T> onNext)
- onNextで実行する処理のみ定義
public Subscription subscribe(Action1<? super T> onNext, Action1<? super T> onError, Action0 onComplete)
- onNextで実行する処理を定義。※ 処理に渡される引数が1つある
- onErrorでエラー時の処理を定義。※ 処理に渡される引数が1つある
- onCompleteで処理終了時の処理を定義。※ 処理に渡される引数なし
The Observable.just method
- 対象がオブジェクトが複数データをあらわすのではなくひとつのオブジェクトをあらわす場合にObservableインスタンスを生成するメソッド
- justメソッドの引数には複数の9つまでの値を設定できる
- 複数の値を設定した場合はonCompleteメソッドは全引数の処理が終わった後に実行される
例
Observable.just('S').subscribe(System.out::println); Observable.just('R', 'x', 'J', 'a', 'v', 'a').subscribe( value -> System.out.print("*" + value + "*"), // System.err::println, // () -> System.out.println("/"));
実行結果
S *R**x**J**a**v**a*/
Other Observable factory methods
※ 実行する際に処理の最後にThread.sleepで処理を停止させないとObservableの処理が終わる前にメインスレッドが終了していしまい、結果が表示されないので注意
Observable#interval
- 指定したインターバルで処理を行うObservableインスタンスを生成する
- 処理を行うAction1の引数には0からのカウントが渡される
例
Helpers.subscribePrint( Observable.interval(1000L, TimeUnit.MILLISECONDS), "interval");
実行結果 ※ 1秒後に実行される
interval:0 interval:1 interval:2 …略
Observable#timer(long, long, TimeUnit)
public static final Observable<java.lang.Long> timer(long initialDelay, long period, java.util.concurrent.TimeUnit unit)
- 指定した待ち時間(initialDelay)を経過した後に指定したインターバル(period)で処理を行うObservableインスタンスを生成する
- 処理を行うAction1の引数には0からのカウントが渡される
ただしこのメソッドは今使っているバージョン(1.0.16)だとDeprecatedになっている。
※ が、WebにあるRxJavaのJavaDocだと何も書いていない
※ このサイトはバージョンも書いてないので、そもそもメンテされてない?
※ どうもObservable#interval
のほうに移動したもよう
ちなみに第1引数のperiodがないtimerメソッドはDeprecatedされていないので、こちらは今後も使われるもよう
Observable#timer(long, TimeUnit)
Observable#error
Observable#empty
- onNext処理が行われないObserverbleインスタンスを生成する
- onCompleted処理はすぐに実行される
Observable#never
- onNext処理もonCompleted処理も行われないObserverbleインスタンスを生成する
Observable#range
public static final Observable<Integer> range(int start, int count)
- subscribeされるとすぐにstratからcount分実行されるObserverインスタンスを生成する
- onNext処理に渡される値はstratから始まり1つずつ加算された値
例:
Helpers.subscribePrint(Observable.range(10, 3), "range");
実行結果:
range:10 range:11 range:12 range ended!
The Observable.create method
Observable.from(Iterableメソッドを見てみる
実際のソースとは違うが本によるとざっくりとした実装イメージは下記
<T> Observable<T> fromIterable(final Iterable<T> iterable) { return Observable.create(new OnSubscribe<T>(){ @Override public void call(Subscriber<? super T> subscriber){ try { Iterator<T> iterator = iterable.iterator(); while (iterator.hasNext()){ subscriber.onNext(iterator.next()); } subscriber.onCompleted(); } catch(Exception e){ subscriber.onError(e); } } }); }
※ この処理のままだとunsubscribeされた際の処理が抜けているので注意
Subscribing and unsubscribing
subscribeメソッド
subscribe()
Observable#create(OnSubscribe)
のOnSubscribeインスタンスのcallメソッドを呼ぶためだけのメソッド- OnErrorの通知があるとOnErrorNotImplementedExceptionがthrowされる
subscribe(Action1<? super T>)
OnSubscribe#call(Subscriber)
のsubscriber#onNextで実行する処理を第1引数のAction1にさせる- OnErrorの通知があるとOnErrorNotImplementedExceptionがthrowされる
- onCompletedの通知があっても何もしない
- 著者によるとエラーがないことを保証するのは厳しいので製品としてリリースするコードにはこのメソッドは使わないほうが良いらしい
実装:
public final Subscription subscribe( final Action1<? super T> onNext) { if (onNext == null) { throw new IllegalArgumentException( "onNext can not be null"); } subscribe(new Subscriber() { public final void onCompleted() { } public final void onError(Throwable e) { throw new OnErrorNotImplementedException(e); } public final void onNext(T args) { onNext.call(args); } }); }
subscribe(Action1<? super T>, Action1)
- subscriber#onNextで実行する処理を第1引数のAction1にさせる
- subscriber#onErrorで実行する処理を第2引数のAction1にさせる
subscribe(Action1<? super T>, Action1, Action0)
- subscriber#onNextで実行する処理を第1引数のAction1にさせる
- subscriber#onErrorで実行する処理を第2引数のAction1にさせる
- subscriber#onCompletedで実行する処理を第3引数のAction0にさせる
subscribe(Observer<? super T>)
Observable.create(new OnSubscribe<Integer>() { @Override public void call( Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable paramThrowable) { System.err.println("onError()"); } @Override public void onNext(Integer param) { System.out.println("onNext(" + param + ")"); } });
実行結果
onNext(1)
onCompleted()
subscribe(Subscripter<? super T>)
subscribe(Observer<? super T>)
と基本的には同じ- SubscripterはObserverインターフェイスを実装した抽象クラス
- Subscripterにはいくつかの有用な機能が装備されている
- ほとんどのsubscribeメソッドは内部でこのsubscribeメソッドを呼んでいる
- RxContractに沿った処理が保証される
- この本ではこれ以降の
Observable#subscribe
はこのメソッドを意味する
unsafeSubscribe(Subscripter<? super T>)
subscribe(Subscripter<? super T>)
と基本的には同じだがRxContractによる保護はない- 通常のsubscribeメソッドによるオーバーヘッドを軽減するために独自実装する際に使われる
- 基本使うべきではない
unsubscribeメソッドとisUnscribedメソッド
- subscribeメソッドの戻り値のSubscriptionインターフェイスのメソッド
- unsubscribeメソッドは登録解除を行う
- isUnscribedメソッドはunsubscribeされていたらtrueを返す
- unsubscribeメソッドが実行されるとonCompletedメソッドは実行されない
→ unsubscribeメソッドはOnCompletedの通知は飛ばさない
Observable.from(Iterableメソッドを見てみる
先に出ていたfromIterableメソッドの実装例にunsubscribeの対応を実装したイメージは下記
<T> Observable<T> fromIterable(final Iterable<T> iterable) { return Observable.create(new OnSubscribe<T>(){ @Override public void call(Subscriber<? super T> subscriber){ try { Iterator<T> iterator = iterable.iterator(); while (iterator.hasNext()){ if (subscriber.isUnscribed()){ return; } subscriber.onNext(iterator.next()); } if (!subscriber.isUnscribed()){ subscriber.onCompleted(); } } catch(Exception e){ if (!subscriber.isUnscribed()){ subscriber.onError(e); } } } }); }
- どのタイミングでunsubscribeされるかはわからないので全行程にunsubscribeのチェックはいれるべき
コメント
- 可能性とはしてはかなり低いがisUnscribed直後にunsubscribeされるとどうしようもない気がする
- そういうのが非同期のほぼ再現しないエラーになる気がする
Subscriber.add(Subscription)
- Subscriberがunsubscribeされると追加されたSubscriptionは全てunsucribeされる
※ 詳しくはChpter 8のリソース管理にて学ぶらしい
Hot and cold Observable instances
Cold Observableインスタンス
- Observableのcreateメソッド、fromメソッド、justメソッドはsubscribeメソッドは実行されるまで何もしない
- subscribeされるたびに毎回通知を発行する
- それぞれ独立して通知を受け処理をする
Hot Observableインスタンス
- 開始するとOnCompletedの通知がくるまで動き続けるObservableインスタンス
- subscriptionは気にしない
- 全てのSubscriberが同じSubscriptionを受け取る
- Chapter 1の入力を受け取るObservable
Observable<Double> a = varStream("a", input);
がHot Observableインスタンス
Cold と Hot のObservableインスタンスの例え
- Cold Observableインスタンスを音楽で例えるならCDでたくさんの人はCDを買うが聴くのはそれぞれ
- Hot Observableインスタンスを音楽で例えるならラジオでそのラジオを聴いている人は全て同じ曲を同じタイミングで聞いている
The ConnectableObservable class
- ConnectableObservableはconnectメソッドを呼ばれるまで何もしない
- どのObservableインスタンスでもpublishメソッドを実行するとConnectableObservableになる
- publishメソッドはColdからHotのObservableインスタンスを生成する
publishメソッド
本の例だとわかりづらかったのでちょっと変更
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); ConnectableObservable<Long> published = interval .publish(); Subscription sub1 = Helpers.subscribePrint(published, "First"); Subscription sub2 = Helpers.subscribePrint(published, "Second"); Thread.sleep(3000L); published.connect(); Thread.sleep(5000L); sub1.unsubscribe(); Subscription sub3 = Helpers.subscribePrint(published, "Third"); Thread.sleep(3000L); sub2.unsubscribe(); sub3.unsubscribe();
実行結果
First:0 Second:0 First:1 Second:1 First:2 Second:2 First:3 Second:3 First:4 Second:4 Second:5 Third:5 Second:6 Third:6 Second:7 Third:7
- connectメソッドを呼ばれた後にsubscribeメソッドを呼び出すと既に動いている状態でSubscriptionが生成されている
- publishedが発行しているOnNextの通知が複数で受け取ることができている
replayメソッド
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); ConnectableObservable<Long> replay = interval.replay(); Subscription sub1 = Helpers.subscribePrint(replay, "First"); Thread.sleep(3000L); replay.connect(); Thread.sleep(5000L); sub1.unsubscribe(); Subscription sub2 = Helpers.subscribePrint(replay, "Second"); Thread.sleep(3000L); sub2.unsubscribe();
実行結果
First:0 First:1 First:2 First:3 First:4 Second:0 Second:1 Second:2 Second:3 Second:4 Second:5 Second:6 Second:7
refCountメソッド
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); Observable<Long> refCount = interval.publish() .refCount(); Subscription sub1 = Helpers.subscribePrint(refCount, "First"); Subscription sub2 = Helpers.subscribePrint(refCount, "Second"); Thread.sleep(3000L); sub1.unsubscribe(); sub2.unsubscribe(); Subscription sub3 = Helpers.subscribePrint(refCount, "Third"); Thread.sleep(5000L); sub3.unsubscribe();
実行結果:
First:0 Second:0 First:1 Second:1 First:2 Second:2 Third:0 Third:1 Third:2 Third:3 Third:4
shareメソッド
- refCountメソッドのAlias
public final Observable<T> share() { return publish().refCount(); }
The Subject instances
- ObservableインスタンスでありObserverインスタンスでもある
- Observableインスタンスのように複数のObserverインスタンスを持つことができる
- ObserverインスタンスのようにonNextメソッド、onErrorメソッド,onCompletedメソッドを実行できる
- 基本的にSubjectは使わないほうが良い
- 使う必要があるなら最低でもObservableを返すメソッドを用意すべき(asObserableメソッドでObservableを生成できる)
4種類のSubjectがある
- PublishSubject
- ReplaySubject
- BehaviorSubject
- AsyncSubject
PublishSubject
- publishメソッドで作られたConnectableObservableのように振る舞う
本のサンプルだとわかりづらかったので少し変更
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); Subject<Long, Long> pubishSubject = PublishSubject .create(); interval.subscribe(pubishSubject); Subscription sub1 = Helpers .subscribePrint(pubishSubject, "First"); Thread.sleep(3000L); Subscription sub2 = Helpers .subscribePrint(pubishSubject, "Second"); Thread.sleep(3000L); pubishSubject.onNext(555L); sub1.unsubscribe(); Subscription sub3 = Helpers .subscribePrint(pubishSubject, "Third"); Thread.sleep(3000L); sub2.unsubscribe(); sub3.unsubscribe();
実行結果:
First:0 First:1 First:2 First:3 Second:3 First:4 Second:4 First:5 Second:5 First:555 Second:555 Second:6 Third:6 Second:7 Third:7 Second:8 Third:8
ReplaySubject
- replayメソッドで作られたConnectableObservableのように振る舞い過去に発行された通知を新しいSubjectが開始時に受け取ることができる
- ReplaySubjectはいくつものファクトリーメソッドを持つ
- デフォルトのファクトリーメソッドは全ての通知を覚えているためメモリを食う
- 元となるObservableインスタンスがなくても生成可能
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); Subject<Long, Long> subject = ReplaySubject.create(); interval.subscribe(subject); Subscription sub1 = Helpers.subscribePrint(subject, "First"); Thread.sleep(3000L); Subscription sub2 = Helpers.subscribePrint(subject, "Second"); Thread.sleep(3000L); subject.onNext(555L); sub1.unsubscribe(); Subscription sub3 = Helpers.subscribePrint(subject, "Third"); Thread.sleep(3000L); sub2.unsubscribe(); sub3.unsubscribe();
実行結果
First:0 First:1 First:2 Second:0 Second:1 Second:2 First:3 Second:3 First:4 Second:4 First:5 Second:5 First:555 Second:555 Third:0 Third:1 Third:2 Third:3 Third:4 Third:5 Third:555 Second:6 Third:6 Second:7 Third:7 Second:8 Third:8
BehaviorSubject
- キャッシュサイズが1しかないReplaySubjectと同じように振る舞う
- 元となるObservableインスタンスがなくても生成可能
Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); Subject<Long, Long> subject = BehaviorSubject.create(); interval.subscribe(subject); Subscription sub1 = Helpers.subscribePrint(subject, "First"); Thread.sleep(3000L); Subscription sub2 = Helpers.subscribePrint(subject, "Second"); Thread.sleep(3000L); subject.onNext(555L); sub1.unsubscribe(); Subscription sub3 = Helpers.subscribePrint(subject, "Third"); Thread.sleep(3000L); sub2.unsubscribe(); sub3.unsubscribe();
実行結果
First:0 First:1 First:2 Second:2 First:3 Second:3 First:4 Second:4 First:5 Second:5 First:555 Second:555 Third:555 Second:6 Third:6 Second:7 Third:7 Second:8 Third:8
AsyncSubject
- 元のObservableがOnCompletedが実行された後に起動するSubject
- Observableインスタンスの最後の値がonNextに渡される
- 元のObservableが処理が終わらない場合、何も起動されない
- on*メソッドを使って、元となるObservableインスタンスがなくても生成可能
// intervaleのSubjectはonCompletedを呼ぶまで処理が終わらないので実行されない Observable<Long> interval = Observable.interval(1L, TimeUnit.SECONDS); Helpers.subscribePrint(interval, "intervale"); Subject<Long, Long> intervalSubject = AsyncSubject .create(); interval.subscribe(intervalSubject); Helpers.subscribePrint(intervalSubject, "intervalSubject"); // 下記のSubjectは処理終了後に実行される Long[] array = { 1L, 2L, 3L }; Observable<Long> observable = Observable.from(array); observable.subscribe(value -> { try { Thread.sleep(1000L); System.out.println("value=" + value); } catch (Exception e) { } } , System.err::println, () -> System.out.println("onCompleted called")); Subject<Long, Long> asyncSubject = AsyncSubject .create(); observable.subscribe(asyncSubject); Helpers.subscribePrint(asyncSubject, "Async1"); Helpers.subscribePrint(asyncSubject, "Async2"); Thread.sleep(5000L);
実行結果
intervale:0 value=1 intervale:1 value=2 intervale:2 value=3 onCompleted called Async1:3 Async1 ended! Async2:3 Async2 ended! intervale:3 intervale:4 intervale:5 intervale:6 intervale:7