SE情報技術研究会’s blog

http://se-info-tech.connpass.com

2015-12-26 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 4)の振り返り

Learning Reactive Programming With Java 8

Learning Reactive Programming With Java 8

Chapter 4: Transforming, Filtering, and Accumulating Your Data

Introduction

この章で見ていくこと

  • マーブルダイアグラムの紹介
  • マッピングを使ったデータ整形
  • データのフィルタリング
  • scanオペレータを使った値の集計

Observable transformations

Transformations with the various flatMap operators

flatMap

  • flatMapオペレータは受け取った値をfunctionを通してObservableインスタンスに変換する
  • functionを通って結果として生成されたいくつものObservaleインスタンスは一つのObservableインスタンスにマージされる
  • 結果のObservableインスタンスが持つ値は順序通りなっていない可能性があることに注意する

サンプルコード

本ではresourcesフォルダのlorem.txtとletters.txtの2ファイルを読み込み内容を出力するサンプルコードが提示されている。

listFolderメソッド

  • 対象の2ファイルのDirectoryStreamから生成したObservableインスタンスを生成する
  • Subsrcriber#add(Subscription)でSubscriberがunsubcribeされたらSubscritionを実行するようにしている
  • ここではfinlly句のようにSubscriptionをStream#closeを呼ぶために使っている

fromメソッド

  • Pathからファイルを読み込むObservableインスタンスを生成する
  • onNextメソッドに用見込んだ行を渡している
  • ここも同様にSubscriptionはReader#closeを呼び出すfinally句の用に使われている

実行パート

    Observable<String> fsObs = listFolder(
        Paths.get("src", "resources"),
        "{lorem.txt,letters.txt}")
            .flatMap(path -> from(path));
    Helpers.subscribePrint(fsObs, "FS");
  1. listFolderのPathを要素に持つObservableインスタンスが生成される
  2. flatMapで要素のPathをfromメソッドに渡している
  3. Helpers#subscribePrintメソッドで受けとったObservableを起動し、onNextの値を標準出力している

※ Chpter 6で説明するようだが、Schedularを使ったObservableインスタンスの場合、出力結果が順不同になる

flatMapのオーバーロードメソッド

  • flatMap(Action1 onNext, Action1 onError, Action0 onCompleted)
  • flatMap(Func1<T, Observable<U>>, Func2<T, U, R>)

flatMap(Func1<T, Observable<U>>, Func2<T, U, R>)

    Observable<Integer> flatMapped = Observable.just(5, 432)
        .flatMap(v -> Observable.range(v + 1, 3),
            (x, y) -> {
              System.out.println("x=" + x + ", y=" + y);
              return x + y;
            });
    Helpers.subscribePrint(flatMapped, "flatMap");

※ 本のサンプルを若干修正

実行結果

x=5, y=6
flatMap:11
x=5, y=7
flatMap:12
x=5, y=8
flatMap:13
x=432, y=433
flatMap:865
x=432, y=434
flatMap:866
x=432, y=435
flatMap:867
flatMap ended!
  1. 5と432の要素を持つObservableインスタンスを生成
  2. 5の要素が渡されたときrangeメソッドで{6, 7, 8}の要素を持つObservableインスタンスを生成
  3. 2のObservableインスタンスの要素がx+yを実行される関数に渡されxにjustメソッドの値、yにranngeの値が渡される

  4. このflatMapのオーバーロードはソースとなった要素にアクセスしないといけない場合に使いやすい(タプルなどに値を格納してそれを通じて値を渡さなくてよい)

flatMapIterable

    Observable<?> flatMapped = Observable
        .just(Arrays.asList(2, 4),
            Arrays.asList("two", "four"))
        .flatMapIterable(v -> {
          System.out.println("v=" + v);
          return v;
        });
    Helpers.subscribePrint(flatMapped, "flatMapIterable");

※ 本のサンプルを若干修正

実行結果

v=[2, 4]
flatMapIterable:2
flatMapIterable:4
v=[two, four]
flatMapIterable:two
flatMapIterable:four
flatMapIterable ended!
  • flatMap(v -> Observable.from(v))と同じ

concatMap

A significant different between the flatMap and concatMap operator is that the flatMap operator uses the inner Observable instances in parallel, whereas the concatMap operator only subscribes to one of the Observable instances at a time.

  • flatMapとの違いはflatMapが内部のObservableインスタンスを並列に使うのに対し、concatMapはObservableインスタンスを一つずつ使う
    → とあるが、flatMapも一つずつやってるようにしか見えないのだが?
    Observable<Integer> flatMapped = Observable
        .from(Arrays.asList(5, 6, 7)).flatMap(v -> {
          try {
            Thread.sleep(1000L);
            return Observable.just(v * 2);
          } catch (InterruptedException e) {
            e.printStackTrace();
            return Observable.empty();
          }
        });
    Helpers.subscribePrintWithThreadName(flatMapped,
        "flatMap");

    Observable<Integer> concatMapped = Observable
        .from(Arrays.asList(5, 6, 7)).concatMap(v -> {
          try {
            Thread.sleep(1000L);
            return Observable.just(v * 2);
          } catch (InterruptedException e) {
            e.printStackTrace();
            return Observable.empty();
          }
        });
    Helpers.subscribePrintWithThreadName(concatMapped,
        "concatMap");

実行結果

main:flatMap:10
main:flatMap:12
main:flatMap:14
flatMap ended!
main:concatMap:10
main:concatMap:12
main:concatMap:14
concatMap ended!

と思っていたのだが、intervalから生成したObservableインスタンスの場合、結果が違っていた。

    Observable<Object> concatMapped = Observable
        .interval(1000L, TimeUnit.MILLISECONDS)
        .concatMap(v -> Observable
            .interval(300L, TimeUnit.MILLISECONDS)
            .map(u -> "Observable <" + (v) + "> : "
                + (v + u)));
    Subscription subscription = Helpers
        .subscribePrint(concatMapped, "concatMap");

    Thread.sleep(5000L);
    subscription.unsubscribe();

    Observable<Object> flatMapped = Observable
        .interval(1000L, TimeUnit.MILLISECONDS)
        .flatMap(v -> Observable
            .interval(300L, TimeUnit.MILLISECONDS)
            .map(u -> "Observable <" + (v) + "> : "
                + (v + u)));
    Helpers.subscribePrint(flatMapped, "flatMap");
    Thread.sleep(5000L);

実行結果

concatMap:Observable <0> : 0
concatMap:Observable <0> : 1
concatMap:Observable <0> : 2
concatMap:Observable <0> : 3
concatMap:Observable <0> : 4
concatMap:Observable <0> : 5
concatMap:Observable <0> : 6
concatMap:Observable <0> : 7
concatMap:Observable <0> : 8
concatMap:Observable <0> : 9
concatMap:Observable <0> : 10
concatMap:Observable <0> : 11
concatMap:Observable <0> : 12
flatMap:Observable <0> : 0
flatMap:Observable <0> : 1
flatMap:Observable <0> : 2
flatMap:Observable <0> : 3
flatMap:Observable <1> : 1
flatMap:Observable <0> : 4
flatMap:Observable <1> : 2
flatMap:Observable <0> : 5
flatMap:Observable <1> : 3
flatMap:Observable <0> : 6
flatMap:Observable <1> : 4
flatMap:Observable <2> : 2
flatMap:Observable <0> : 7
flatMap:Observable <1> : 5
flatMap:Observable <2> : 3
flatMap:Observable <0> : 8
flatMap:Observable <1> : 6
flatMap:Observable <2> : 4
flatMap:Observable <0> : 9
flatMap:Observable <1> : 7
flatMap:Observable <2> : 5
flatMap:Observable <3> : 3
flatMap:Observable <0> : 10
flatMap:Observable <1> : 8
flatMap:Observable <2> : 6
flatMap:Observable <3> : 4
flatMap:Observable <0> : 11
flatMap:Observable <1> : 9
flatMap:Observable <2> : 7
flatMap:Observable <3> : 5
flatMap:Observable <0> : 12
flatMap:Observable <1> : 10

このようにconcatMapの場合、最初の要素が終わらないと次の要素の処理を始めない。

switchMap

  • flatMapとの違いはswitchMapはソースのObservableインスタンスから新しい要素が発行されるとその前の要素の処理をやめてしまう
    Observable<Object> switchMapped = Observable
        .interval(1000L, TimeUnit.MILLISECONDS)
        .switchMap(v -> Observable
            .interval(300L, TimeUnit.MILLISECONDS)
            .map(u -> "Observable <" + (v) + "> : "
                + (v + u)));
    Subscription subscription = Helpers
        .subscribePrint(switchMapped, "switchMap");

    Thread.sleep(3000L);
    subscription.unsubscribe();

    Observable<Object> flatMapped = Observable
        .interval(1000L, TimeUnit.MILLISECONDS)
        .flatMap(v -> Observable
            .interval(300L, TimeUnit.MILLISECONDS)
            .map(u -> "Observable <" + (v) + "> : "
                + (v + u)));
    Helpers.subscribePrint(flatMapped, "flatMap");
    Thread.sleep(3000L);

実行結果

switchMap:Observable <0> : 0
switchMap:Observable <0> : 1
switchMap:Observable <0> : 2
switchMap:Observable <1> : 1
switchMap:Observable <1> : 2
switchMap:Observable <1> : 3
flatMap:Observable <0> : 0
flatMap:Observable <0> : 1
flatMap:Observable <0> : 2
flatMap:Observable <0> : 3
flatMap:Observable <1> : 1
flatMap:Observable <0> : 4
flatMap:Observable <1> : 2
flatMap:Observable <0> : 5
flatMap:Observable <1> : 3

Grouping items

groupBy(Func1)

  • groupByオペレータはキーでグループ化されたGroupedObservableインスタンスを生成する
  • 引数のFunc1でキーを生成する
  • GroupedObservableインスタンスを生成するとそのインスタンスは要素をバッファするため、気を付けないとメモリーリークが発生する
  • onNextで発行される順番はソースと同じだがGroupedObservbleインスタンスはキー毎に異なる

The order the items are emitted in is the same, but they are emitted by different GroupedObsevable instances.

※ とあるが実行結果のonCompleted時のメッセージがキー毎なのでキー毎に違うインスタンスと言いたいのでは?

  • ソースが終了した時にGroupedObservableも終了になる
    List<String> albums = Arrays.asList(
        "The Piper at the Gates of Dawn",
        "A Sauceful of Secrets", "More", "Ummagumma",
        "Atom Heart Mother", "Meddle", "Obscured by Clouds",
        "The Dark Side of the Moon", "Wish You Were Here",
        "Animals", "The Wall");
    Observable.from(albums)
        .groupBy(album -> album.split(" ").length)
        .subscribe(obs -> Helpers.subscribePrint(obs,
            obs.getKey() + " word(s)"));

    Thread.sleep(2000L);

実行結果

7 word(s):The Piper at the Gates of Dawn
4 word(s):A Sauceful of Secrets
1 word(s):More
1 word(s):Ummagumma
3 word(s):Atom Heart Mother
1 word(s):Meddle
3 word(s):Obscured by Clouds
6 word(s):The Dark Side of the Moon
4 word(s):Wish You Were Here
1 word(s):Animals
2 word(s):The Wall
1 word(s) ended!
2 word(s) ended!
3 word(s) ended!
4 word(s) ended!
6 word(s) ended!
7 word(s) ended!

groupBy(Func1, Func2)

  • 第1引数でキーを作成
  • 第2引数で値を生成

Additional useful transformation operators

cast

  • 要素をキャストする
    List<Number> list = Arrays.asList(1, 2, 3);
    Observable<Integer> observable = Observable.from(list)
        .cast(Integer.class);

timetamp

    Observable<Timestamped<Integer>> observable = Observable
        .from(Arrays.asList(1, 2, 3)).timestamp();
    Helpers.subscribePrint(observable, "timestamp");

    observable.subscribe(item -> {
      long timestamp = item.getTimestampMillis();
      int value = item.getValue();

      System.out.println("timestamp = " + timestamp
          + ", value = " + value);
    });

timeinterval

Filtering data

filter

  • ソースから対象がtrueになる要素しか持たないObserverableインスタンスを生成する

その他のfilter系のメソッド

    Observable<Integer> observable = Observable.range(1, 5);

    // 複数要素を持つ可能性があるもの
    Helpers.subscribePrint(observable.takeLast(3),
        "takeLast");
    Helpers.subscribePrint(observable.take(3), "take");
    Helpers.subscribePrint(observable.skipLast(3),
        "skipLast");
    Helpers.subscribePrint(observable.skip(3), "skip");

    Helpers.subscribePrint(observable.takeLastBuffer(3),
        "takeLastBuffer"); // 要素はList<Integer>

    // 唯一の要素を取得するもの
    Helpers.subscribePrint(observable.last(), "last");
    Helpers.subscribePrint(observable.first(), "first");

    Helpers.subscribePrint(
        observable.takeFirst(value -> value % 2 == 0),
        "takeFirst");

    Helpers.subscribePrint(observable.elementAt(3),
        "elementAt");

    // デフォルト値ありのもの
    Helpers.subscribePrint(observable.lastOrDefault(99),
        "lastOrDefault:値あり");
    Helpers.subscribePrint(
        Observable.empty().lastOrDefault(99),
        "lastOrDefault:値なし");

    Helpers.subscribePrint(observable.firstOrDefault(99),
        "firstOrDefault:値あり");
    Helpers.subscribePrint(
        Observable.empty().firstOrDefault(99),
        "firstOrDefault:値なし");

    Helpers.subscribePrint(
        observable.elementAtOrDefault(3, 99),
        "elementAtOrDefault:値あり");
    Helpers.subscribePrint(
        observable.elementAtOrDefault(10, 99),
        "elementAtOrDefault:値なし");

    // 重複を除く
    Helpers.subscribePrint(
        Observable.from(Arrays.asList(1, 2, 3, 4, 1, 2, 3))
            .distinct(),
        "distinct");

    // 型のフィルター
    Helpers.subscribePrint(Observable
        .from(Arrays.asList("1", 2, 3L, 4.0, 1, "2", 3.0))
        .ofType(Integer.class), "ofType");

実行結果

takeLast:3
takeLast:4
takeLast:5
takeLast ended!
take:1
take:2
take:3
take ended!
skipLast:1
skipLast:2
skipLast ended!
skip:4
skip:5
skip ended!
takeLastBuffer:[3, 4, 5]
takeLastBuffer ended!
last:5
last ended!
first:1
first ended!
takeFirst:2
takeFirst ended!
elementAt:4
elementAt ended!
lastOrDefault:値あり:5
lastOrDefault:値あり ended!
lastOrDefault:値なし:99
lastOrDefault:値なし ended!
firstOrDefault:値あり:1
firstOrDefault:値あり ended!
firstOrDefault:値なし:99
firstOrDefault:値なし ended!
elementAtOrDefault:値あり:4
elementAtOrDefault:値あり ended!
elementAtOrDefault:値なし:99
elementAtOrDefault:値なし ended!
distinct:1
distinct:2
distinct:3
distinct:4
distinct ended!
ofType:2
ofType:1
ofType ended!

Accumulating data

scan

  • scanメソッドは2引数を取る関数を設定し要素を使った集計処理を行う。
  • Observableインスタンスは各要素ごとの結果を持ち、最後のものが最終的な集計結果
  • 関数の第1引数は以前までの結果(最初は第1要素)
  • 関数の第2引数はソースの要素
  • このような関数はaccumulatorと言われる
  • reduce(Func2)scan(Func2).lastと同じ

下記は1~10までの足し算をするサンプル

    Observable<Integer> observable = Observable.range(1, 10)
        .scan((p, v) -> {
          System.out.println("p=" + p + ", v=" + v);
          return p + v;
        });
    Helpers.subscribePrint(observable, "scan");

実行結果

scan:1
p=1, v=2
scan:3
p=3, v=3
scan:6
p=6, v=4
scan:10
p=10, v=5
scan:15
p=15, v=6
scan:21
p=21, v=7
scan:28
p=28, v=8
scan:36
p=36, v=9
scan:45
p=45, v=10
scan:55
scan ended!

コメント

  • マーブルダイアグラムは全部がわかりにくいわけではないが細かいことがわかりにくい(flatMapとconcatMapの違いとか)