SE情報技術研究会’s blog

http://se-info-tech.connpass.com

2015-12-29 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 5)の振り返り

Learning Reactive Programming With Java 8

Learning Reactive Programming With Java 8

Chapter 5: Combinators, Conditionals, and Error Handling

Introduction

  • 多くのプログラムでは複数の違うソースからのデータをまとめて使うことがある
  • プログラムではこれらの違うソースからのデータ同士の依存関係は必要になることがある
  • エラーに対して対応できるプログラムは耐性が強いものである

この章で学ぶことは下記のことである

Combining the Observable Operator

※ Combinatorsはマーブルダイアグラム見たほうがわかりやすいので本で確認しならがら読んだほうが良い

The zip operator

    Observable<String> zip = Observable.zip( //
        Observable.from(Arrays.asList("Z", "I", "P", "P")), //
        Observable.interval(1L, TimeUnit.SECONDS), //
        (a, b) -> a + "/" + b);
    Helpers.subscribePrint(zip, "zip");

    Thread.sleep(5000L);

実行結果

zip:Z/0
zip:I/1
zip:P/2
zip:P/3
zip ended!
    Observable<String> zipWith = Observable
        .from(Arrays.asList("a", "b"))//
        .zipWith(Observable.interval(1L, TimeUnit.SECONDS), //
            (a, b) -> a + "/" + b);
    Helpers.subscribePrint(zipWith, "zip");

    Thread.sleep(5000L);

実行結果

zip:a/0
zip:b/1
zip ended!

The combineLatest operator

  • 複数のOservableインスタンスを要素を発行されるたびに組み合わせて処理をしていくメソッド
  • zipと違い要素の順ではなく処理時点のすれぞれの要素を使う
  • Oservableインスタンスの要素が揃う前に既に発行された要素は使われない
    Observable<String> combineLatest = Observable
        .combineLatest( //
            Observable
                .from(Arrays.asList("A", "B", "C", "D")), //
            Observable.interval(1L, TimeUnit.SECONDS), //
            (a, b) -> a + "/" + b);
    Helpers.subscribePrint(combineLatest, "combineLatest");

    Thread.sleep(5000L);

実行結果

combineLatest:D/0
combineLatest:D/1
combineLatest:D/2
combineLatest:D/3
combineLatest:D/4

上の例ではintervalメソッドのObservableインスタンスが最初の要素を発行する前にfromメソッドのObservableインスタンスは既にすべての要素を発行しているので、最後の要素は「D」になっている。

  • 本のサンプルも長いが十分わかりやすい

T The merge operator

  public static void main(String[] args)
      throws InterruptedException {
    Observable<String> alphabets = Observable
        .just("A", "B", "C", "D")
        .zipWith(Observable.interval(1L, TimeUnit.SECONDS), //
            MergeSample::onlyFirstArg);

    Observable<?> merge = Observable.merge(alphabets,
        Observable.interval(400L, TimeUnit.MILLISECONDS));

    Helpers.subscribePrint(merge, "merge");

    TimeUnit.SECONDS.sleep(5L);
  }

  static <T, R> T onlyFirstArg(T arg1, R arg2) {
    return arg1;
  }

実行結果

merge:0
merge:1
merge:A
merge:2
merge:3
merge:4
merge:B
merge:5
merge:6
merge:C
merge:7
merge:8
merge:9
merge:D
merge:10
merge:11

concat

startWith

The conditional operators

  • Observableインスタンスが他のObservableインスタンスが処理を行っているときにしか処理をしないなどのケースがある
  • コンディショナルなメソッドは特定の条件で実行するしないを管理することができる

The amb operator

コメント

The takeUntil(), takeWhile(), skipUntil(), and skipWhile() conditional operators

The takeUntil

  • 引数のObservableインスタンスが最初の要素を発行するまで処理をし続ける* 引数のObservableインスタンスが最初の要素を発行したら処理は終了する
    Observable<String> words = Observable
        .just("one", "way", "or", "another", "I'll",
            "learn", "Rxjava")
        .zipWith(Observable.interval(500L,
            TimeUnit.MILLISECONDS), (x, y) -> x);

    Observable<Long> interval = Observable.interval(2000L,
        TimeUnit.MILLISECONDS);

    Helpers.subscribePrint(words.takeUntil(interval),
        "takeUntil");

    TimeUnit.SECONDS.sleep(5L);

実行結果

takeUntil:one
takeUntil:way
takeUntil:or
takeUntil ended!

※ intervaleが2秒後に要素を発行するためwords.takeUntilの処理は終了する

takeWhile

  • 引数の関数がtrueを開けす限り処理を実行する
    Observable<String> words = Observable
        .just("one", "way", "or", "another", "I'll",
            "learn", "Rxjava")
        .zipWith(Observable.interval(500L,
            TimeUnit.MILLISECONDS), (x, y) -> x);

    Helpers.subscribePrint(
        words.takeWhile(word -> word.length() > 2),
        "takeWhile");

    TimeUnit.SECONDS.sleep(5L);

実行結果

takeWhile:one
takeWhile:way
takeWhile ended!

skipUntil

  • 引数のObservableインスタンスが最初の要素を発行するまで要素は発行しない
  • 引数のObservableインスタンスが最初の要素を発行したら元のObservableインスタンスの要素がそれまで発行していた分を除いて発行される
    Observable<String> words = Observable
        .just("one", "way", "or", "another", "I'll",
            "learn", "Rxjava")
        .zipWith(Observable.interval(500L,
            TimeUnit.MILLISECONDS), (x, y) -> x);

    Observable<Long> interval = Observable.interval(2000L,
        TimeUnit.MILLISECONDS);

    Helpers.subscribePrint(words.skipUntil(interval),
        "skipUntil");

    TimeUnit.SECONDS.sleep(5L);

実行結果

skipUntil:another
skipUntil:I'll
skipUntil:learn
skipUntil:Rxjava
skipUntil ended!

※ 最初の要素からではなく、それまで処理していたと思われるのを除いて、結果が発行される

コメント
  • xxxUntil系のメソッドは最初の要素が発行されたタイミングなので注意(Observableインスタンスの処理が完了するまでではない)

The defaultEmpty() operator

Handling errors

  • エラーが発生するとObservableインスタンスは処理のチェインを終了する
  • ただし、通常のプログラムのcatchブロックでの処理のように、エラー時に何かするロジックは実行可能

主なメソッドは3つに分類できる

  • return*
  • retry*
  • resume*

The return and resume operators

onErrorReturn

  • エラー発生時にOnErrorに行かずにonErrorReturnメソッドで処理を行い、Observableインスタスの処理を終了する

p.87のサンプル

誤:Observable<String> numbers = …
↓
正:Observable<Integer> numbers = …
    Observable<Integer> numbers = Observable
        .just("1", "2", "Three", "4") //
        .map(Integer::parseInt) //
        .onErrorReturn(e -> -1);
    Helpers.subscribePrint(numbers, "Error returned");

実行結果

Error returned:1
Error returned:2
Error returned:-1
Error returned ended!

onExceptionResumeNext

  • 例外発生時にOnErrorに行かずにonExceptionResumeNextメソッドの引数のObservableインスタンスの要素を使ってそれ以降の処理を行う
  • 対応するのはExceptionとそのサブクラスのみ
  • ErrorはonErrorに通知される
    Observable<Integer> numbers = Observable
        .just("1", "2", "Three", "4") //
        .map(Integer::parseInt) //
        .onExceptionResumeNext(
            Observable.just(50, 40, 30, 20, 10));
    Helpers.subscribePrint(numbers, "Exception resume");

実行結果

Exception resume:1
Exception resume:2
Exception resume:50
Exception resume:40
Exception resume:30
Exception resume:20
Exception resume:10
Exception resume ended!

onErrorResumeNext

その他の副作用オペレータ(side effect operator)

  • 要素の変更は行わない

主に次のものがある

  • doOnNext
  • doOnError
  • doOnCompleted
  • finallyDo
コメント
  • finallyDo以外はOnNext、OnError、OnCompletedらの処理をに追加されるので煩雑になるので?(ロジックが一か所ではない)

The retry technique

  • 例えば複数サーバが並列的に稼働している場合、ネットワークの問題などのエラーで全ての処理が行えなくなるより、別のサーバに再実行するなど重要な技術である

retry

  • エラーが発生したらObservableインスタンスの処理のチェインを最初から行う
  • retryメソッドに引数がない場合、永遠に再実行する
  • retryメソッドの引数に数値を渡した場合、その回数だけ再実行する

retryWhen

retry(Func2)

  • 第1引数が再実行回数
  • 第2引数がエラー情報
  • 戻り値に再実行ならtrue、OnErrorで終了ならfalseを返す

An Http Client Example

下記のJarが必要

  • gson
  • http-async-client
  • rx-apache-http
  • commons-logging
  • http-client
  • http-core
コメント
  • 意図した挙動にならない際のデバッグが難しい