SE情報技術研究会’s blog

http://se-info-tech.connpass.com

2015-12-14 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 1)の振り返り

Learning Reactive Programming With Java 8

Learning Reactive Programming With Java 8

前回の続き(Chapter 1)

Implementing the reactive sum

次の条件を満たすReactiveな足し算(a+b)をするプログラムを実装してみる

  • ターミナルで動くアプリケーション
  • ユーザが「exit」を入力すまでプログラムは軌道し続ける
  • 「a:数値」を入力すると結果が更新される
  • 「b:数値」を入力すると結果が更新される
  • それ以外が入力されても何もしない

実行例:

Reacitve Sum. Type 'a: <number>' and 'b: <number>' to try it.
a:1
b:2
update : a + b = 3.0
a:5
update : a + b = 7.0
b:3
update : a + b = 8.0

実装の手順としては

  1. 入力値を受け取るObsevableインスタンスを生成
  2. 入力値を分析しaもしくはbの値を取得するObservableインスタンスを生成
  3. aとbのObservableインスタンスを使ってa + bを行うObserverインスタンスを生成
  4. 入力値を受け取るObsevableインスタンスのconnectメソッドを呼び出し処理を開始する

ただし、このChapter 1のサンプルでは入力があるまでスレッドが止まることになるが、この回避方法はChapter 6で説明しているらしい。

入力値を受け取るObsevableインスタンス

  private static ConnectableObservable<String> from(
      final BufferedReader reader) {
    // Observableインスタンスの生成。
    return Observable.create(new OnSubscribe<String>() {
      @Override
      public void call(
          Subscriber<? super String> subscriber) {
        // Observableの登録から解除されている場合は何もしない
        if (subscriber.isUnsubscribed()) {
          return;
        }

        try {
          String line;
          // 入力取得ループ
          while (!subscriber.isUnsubscribed()
              && (line = reader.readLine()) != null) {
            // 入力文字列が「exit」の場合、ループから抜ける
            if (line == null || line.equals("exit")) {
              break;
            }
            // 取得した文字列を発行する
            subscriber.onNext(line);
          }
        } catch (IOException e) {
          // 例外時の通知を発行する
          subscriber.onError(e);
        }

        // ループを抜けたらSubscriberを終了
        if (!subscriber.isUnsubscribed()) {
          subscriber.onCompleted();
        }
      }
    })
    // Observable → ConnectableObservable
    .publish(); 
  }

コード上、入力値を受け取るObsevableインスタンスはConnectableObservableが明示的に指定されている。これはconnectメソッドが呼ばれて初めてイベントを開始するものである。

まず、Observable.create(OnSubcribe)メソッドでObservableインスタンスを生成している。またこのObservableインスタンスがpublishメソッドを事項することでこのObservableインスタンスからConnectableObservableインスタンスを生成している。

メッセージの通知は、Observable#createメソッドのOnSubcribeインターフェイスが持つcallメソッドの引数のSubscriberインスタンスにて行う。SubscriberインスタンスのonNextメソッドに値を渡すことで、変更があったことを通知する。

また、このSubscriberに対する登録の解除が既にされていた場合は何もしないし、入力を待っている状態から抜けたらSubscriberの解除をonCompletedメソッドを呼ぶことで行っている。

入力値の取得

  private static Observable<Double> varStream(
      final String varName, Observable<String> input) {
    return input.map(new Func1<String, Matcher>() {
      @Override
      public Matcher call(String str) {
        return 正規表現で対象の文字列を解析するMatcher
      }
    }).filter(new Func1<Matcher, Boolean>() {
      @Override
      public Boolean call(Matcher matcher) {
        return 正規表現で値が取得できるかチェック
      };
    }).map(new Func1<Matcher, Double>() {
      @Override
      public Double call(Matcher matcher) {
        return 文字列からDoubleに変換
      }
    });
  }

入力値を受け取るConnectableObservableインスタンスからmapやfilterメソッドを使ってaもしくはbの入力値を取りDoubleの値を取得するObserverを生成している。

実装的には正規表現を使って対象の文字列を取ってきてDoubleに変換するObservableインスタンスを生成している。

また、これはラムダ式を使えばもっと簡素に書けると思うが、このあたりはChapter 2でJava 8の話をしているようなので、あえてこう書いているものと思われる。実際、このほうが何のインターフェースを使っているのかわかりやすいので初見の人には優しいと思われる。

  private static Observable<Double> varStream(
      final String varName, Observable<String> input) {
    return input.map(str -> 正規表現で対象の文字列を解析するMatcher)
                .filter(matcher -> 正規表現で値が取得できるかチェック)
                .map(matcher -> 文字列からDoubleに変換);
  }

aとbのObservableインスタンスを使ってa + bを行うObserverインスタンス

final class ReactiveSum implements Observer<Double> {

  private double sum;

  public ReactiveSum(Observable<Double> a,
      Observable<Double> b) {
    this.sum = 0;
    // a と bを受け取りFunc2の処理を実行するObservableインスタンスを生成
    Observable.combineLatest(a, b,
        new Func2<Double, Double, Double>() {
          @Override
          public Double call(Double a, Double b) {
            return a + b;
          }
        })
        // Observableインスタンスのメッセージを受け取るObserverを登録する
        .subscribe(this);
  }

  @Override
  public void onCompleted() {
    System.out
        .println("Exisiting last sum was : " + this.sum);
  }

  @Override
  public void onError(Throwable e) {
    System.err.println("Got an error!");
  }

  @Override
  public void onNext(Double sum) {
    this.sum = sum;
    System.out.println("update : a + b = " + sum);
  }
}

ここではObservableの変更が通知されたら何らかの処理を行うObserverオブジェクトを実装する。ObserverにはObservable#subscrive(Observer)メソッドでメッセージを受信するように登録することができる。これでObserverインスタンスはObserveble内のSubscriberがメッセージを発行した際にその情報が通知されることになる。

またa + bを行うためaの値およびbの値を取得するObservaleインスタンスを保持しそれらの2つから処理を行う新たなObservableインスタンスを生成している。これはObserveble#combineLateset(Observable<A>, Observable<B>, Func2<A, B, R>)メソッドで生成できる。このObservableインスタンスにObserverインスタンスを登録することで、a + bが実行されるたびにObserverインスタンスに通知が行くことになる。

また、onErrorメソッドで例外時の処理をonCompletedメソッドで終了時の処理を実装している。

Summary

※ ちなみにBen Christensenによるプレゼンは下記にいろいろある

https://speakerdeck.com/benjchristensen