2015-12-29 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 5)の振り返り
Learning Reactive Programming With Java 8
- 作者: Nickolay Tsvetinov
- 出版社/メーカー: Packt Publishing
- 発売日: 2015/06/24
- メディア: ペーパーバック
- この商品を含むブログを見る
Chapter 5: Combinators, Conditionals, and Error Handling
Introduction
- 多くのプログラムでは複数の違うソースからのデータをまとめて使うことがある
- プログラムではこれらの違うソースからのデータ同士の依存関係は必要になることがある
- エラーに対して対応できるプログラムは耐性が強いものである
この章で学ぶことは下記のことである
Combining the Observable Operator
※ Combinatorsはマーブルダイアグラム見たほうがわかりやすいので本で確認しならがら読んだほうが良い
The zip operator
- 複数のOservableインスタンスを要素の順毎に組み合わせて処理をしていくメソッド
- 生成される組み合わせは2つのみだけではない
- ひとつのObservableインスタンスが全ての要素を発行してもObservableインスタンスのその順(Index)の要素が揃うまで待つ
- ひとつのObservableインスタンスが要素を全て使ったら処理は終わり
Observable<String> zip = Observable.zip( // Observable.from(Arrays.asList("Z", "I", "P", "P")), // Observable.interval(1L, TimeUnit.SECONDS), // (a, b) -> a + "/" + b); Helpers.subscribePrint(zip, "zip"); Thread.sleep(5000L);
実行結果
zip:Z/0 zip:I/1 zip:P/2 zip:P/3 zip ended!
Observable<String> zipWith = Observable .from(Arrays.asList("a", "b"))// .zipWith(Observable.interval(1L, TimeUnit.SECONDS), // (a, b) -> a + "/" + b); Helpers.subscribePrint(zipWith, "zip"); Thread.sleep(5000L);
実行結果
zip:a/0 zip:b/1 zip ended!
The combineLatest operator
- 複数のOservableインスタンスを要素を発行されるたびに組み合わせて処理をしていくメソッド
- zipと違い要素の順ではなく処理時点のすれぞれの要素を使う
- Oservableインスタンスの要素が揃う前に既に発行された要素は使われない
Observable<String> combineLatest = Observable .combineLatest( // Observable .from(Arrays.asList("A", "B", "C", "D")), // Observable.interval(1L, TimeUnit.SECONDS), // (a, b) -> a + "/" + b); Helpers.subscribePrint(combineLatest, "combineLatest"); Thread.sleep(5000L);
実行結果
combineLatest:D/0 combineLatest:D/1 combineLatest:D/2 combineLatest:D/3 combineLatest:D/4
上の例ではintervalメソッドのObservableインスタンスが最初の要素を発行する前にfromメソッドのObservableインスタンスは既にすべての要素を発行しているので、最後の要素は「D」になっている。
- 本のサンプルも長いが十分わかりやすい
T The merge operator
- それぞれのObservableインスタンスが発行する要素を発行した順に一つObservableインスタンスにまとめるメソッド
- Observableインスタンスの一つが途中でOnErrorになるとエラー処理をしmerge処理を終了する
- エラーが発生してもそれ以外のObservableインスタンスの処理は続けたい場合は
mergeDelayError
メソッドを使う
public static void main(String[] args) throws InterruptedException { Observable<String> alphabets = Observable .just("A", "B", "C", "D") .zipWith(Observable.interval(1L, TimeUnit.SECONDS), // MergeSample::onlyFirstArg); Observable<?> merge = Observable.merge(alphabets, Observable.interval(400L, TimeUnit.MILLISECONDS)); Helpers.subscribePrint(merge, "merge"); TimeUnit.SECONDS.sleep(5L); } static <T, R> T onlyFirstArg(T arg1, R arg2) { return arg1; }
実行結果
merge:0 merge:1 merge:A merge:2 merge:3 merge:4 merge:B merge:5 merge:6 merge:C merge:7 merge:8 merge:9 merge:D merge:10 merge:11
concat
- 引数のObservableインスタンスの要素を途中で混ぜることなく1つにまとめたObservableインスタンスを生成する
- [1,2,3]の要素と['a','b','c']の要素を持つObservableインスタンスがある場合、concatは[1,2,3,'a','b','c']の要素を持つObservableインスタンスを生成する
startWith
- 呼び出し元のObservableインスタンスに引数のObservableインスタンスの要素を追加したObservableインスタンスを生成する。
- よくある使われ方としてObservableインスタンスに初期値の要素を追加する場合に使われる
The conditional operators
- Observableインスタンスが他のObservableインスタンスが処理を行っているときにしか処理をしないなどのケースがある
- コンディショナルなメソッドは特定の条件で実行するしないを管理することができる
The amb operator
コメント
- このメソッドの使い道がわからない…
The takeUntil(), takeWhile(), skipUntil(), and skipWhile() conditional operators
The takeUntil
Observable<String> words = Observable .just("one", "way", "or", "another", "I'll", "learn", "Rxjava") .zipWith(Observable.interval(500L, TimeUnit.MILLISECONDS), (x, y) -> x); Observable<Long> interval = Observable.interval(2000L, TimeUnit.MILLISECONDS); Helpers.subscribePrint(words.takeUntil(interval), "takeUntil"); TimeUnit.SECONDS.sleep(5L);
実行結果
takeUntil:one takeUntil:way takeUntil:or takeUntil ended!
※ intervaleが2秒後に要素を発行するためwords.takeUntilの処理は終了する
takeWhile
- 引数の関数がtrueを開けす限り処理を実行する
Observable<String> words = Observable .just("one", "way", "or", "another", "I'll", "learn", "Rxjava") .zipWith(Observable.interval(500L, TimeUnit.MILLISECONDS), (x, y) -> x); Helpers.subscribePrint( words.takeWhile(word -> word.length() > 2), "takeWhile"); TimeUnit.SECONDS.sleep(5L);
実行結果
takeWhile:one takeWhile:way takeWhile ended!
skipUntil
- 引数のObservableインスタンスが最初の要素を発行するまで要素は発行しない
- 引数のObservableインスタンスが最初の要素を発行したら元のObservableインスタンスの要素がそれまで発行していた分を除いて発行される
Observable<String> words = Observable .just("one", "way", "or", "another", "I'll", "learn", "Rxjava") .zipWith(Observable.interval(500L, TimeUnit.MILLISECONDS), (x, y) -> x); Observable<Long> interval = Observable.interval(2000L, TimeUnit.MILLISECONDS); Helpers.subscribePrint(words.skipUntil(interval), "skipUntil"); TimeUnit.SECONDS.sleep(5L);
実行結果
skipUntil:another skipUntil:I'll skipUntil:learn skipUntil:Rxjava skipUntil ended!
※ 最初の要素からではなく、それまで処理していたと思われるのを除いて、結果が発行される
コメント
The defaultEmpty() operator
Handling errors
- エラーが発生するとObservableインスタンスは処理のチェインを終了する
- ただし、通常のプログラムのcatchブロックでの処理のように、エラー時に何かするロジックは実行可能
主なメソッドは3つに分類できる
- return*
- retry*
- resume*
The return and resume operators
onErrorReturn
- エラー発生時にOnErrorに行かずにonErrorReturnメソッドで処理を行い、Observableインスタスの処理を終了する
p.87のサンプル
誤:Observable<String> numbers = … ↓ 正:Observable<Integer> numbers = …
Observable<Integer> numbers = Observable .just("1", "2", "Three", "4") // .map(Integer::parseInt) // .onErrorReturn(e -> -1); Helpers.subscribePrint(numbers, "Error returned");
実行結果
Error returned:1 Error returned:2 Error returned:-1 Error returned ended!
onExceptionResumeNext
- 例外発生時にOnErrorに行かずにonExceptionResumeNextメソッドの引数のObservableインスタンスの要素を使ってそれ以降の処理を行う
- 対応するのはExceptionとそのサブクラスのみ
- ErrorはonErrorに通知される
Observable<Integer> numbers = Observable .just("1", "2", "Three", "4") // .map(Integer::parseInt) // .onExceptionResumeNext( Observable.just(50, 40, 30, 20, 10)); Helpers.subscribePrint(numbers, "Exception resume");
実行結果
Exception resume:1 Exception resume:2 Exception resume:50 Exception resume:40 Exception resume:30 Exception resume:20 Exception resume:10 Exception resume ended!
onErrorResumeNext
その他の副作用オペレータ(side effect operator)
- 要素の変更は行わない
主に次のものがある
- doOnNext
- doOnError
- doOnCompleted
- finallyDo
コメント
- finallyDo以外はOnNext、OnError、OnCompletedらの処理をに追加されるので煩雑になるので?(ロジックが一か所ではない)
The retry technique
- 例えば複数サーバが並列的に稼働している場合、ネットワークの問題などのエラーで全ての処理が行えなくなるより、別のサーバに再実行するなど重要な技術である
retry
- エラーが発生したらObservableインスタンスの処理のチェインを最初から行う
- retryメソッドに引数がない場合、永遠に再実行する
- retryメソッドの引数に数値を渡した場合、その回数だけ再実行する
retryWhen
- 引数にエラー情報を持ったObservableインスタンスが与えられ、それを使ったRetryロジックを持ったObservableインスタンスを返すことで最初から再実行するメソッド
- 返すObservableインスタンスが空だった場合、処理は終わる
retry(Func2)
- 第1引数が再実行回数
- 第2引数がエラー情報
- 戻り値に再実行ならtrue、OnErrorで終了ならfalseを返す
An Http Client Example
下記のJarが必要
- gson
- http-async-client
- rx-apache-http
- commons-logging
- http-client
- http-core
コメント
- 意図した挙動にならない際のデバッグが難しい