2015-12-14 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 1)の振り返り
Learning Reactive Programming With Java 8
- 作者: Nickolay Tsvetinov
- 出版社/メーカー: Packt Publishing
- 発売日: 2015/06/24
- メディア: ペーパーバック
- この商品を含むブログを見る
前回の続き(Chapter 1)
Implementing the reactive sum
次の条件を満たすReactiveな足し算(a+b)をするプログラムを実装してみる
- ターミナルで動くアプリケーション
- ユーザが「exit」を入力すまでプログラムは軌道し続ける
- 「a:数値」を入力すると結果が更新される
- 「b:数値」を入力すると結果が更新される
- それ以外が入力されても何もしない
実行例:
Reacitve Sum. Type 'a: <number>' and 'b: <number>' to try it. a:1 b:2 update : a + b = 3.0 a:5 update : a + b = 7.0 b:3 update : a + b = 8.0
実装の手順としては
- 入力値を受け取るObsevableインスタンスを生成
- 入力値を分析しaもしくはbの値を取得するObservableインスタンスを生成
- aとbのObservableインスタンスを使って
a + b
を行うObserverインスタンスを生成 - 入力値を受け取るObsevableインスタンスのconnectメソッドを呼び出し処理を開始する
ただし、このChapter 1のサンプルでは入力があるまでスレッドが止まることになるが、この回避方法はChapter 6で説明しているらしい。
入力値を受け取るObsevableインスタンス
private static ConnectableObservable<String> from( final BufferedReader reader) { // Observableインスタンスの生成。 return Observable.create(new OnSubscribe<String>() { @Override public void call( Subscriber<? super String> subscriber) { // Observableの登録から解除されている場合は何もしない if (subscriber.isUnsubscribed()) { return; } try { String line; // 入力取得ループ while (!subscriber.isUnsubscribed() && (line = reader.readLine()) != null) { // 入力文字列が「exit」の場合、ループから抜ける if (line == null || line.equals("exit")) { break; } // 取得した文字列を発行する subscriber.onNext(line); } } catch (IOException e) { // 例外時の通知を発行する subscriber.onError(e); } // ループを抜けたらSubscriberを終了 if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } }) // Observable → ConnectableObservable .publish(); }
コード上、入力値を受け取るObsevableインスタンスはConnectableObservableが明示的に指定されている。これはconnectメソッドが呼ばれて初めてイベントを開始するものである。
まず、Observable.create(OnSubcribe)
メソッドでObservableインスタンスを生成している。またこのObservableインスタンスがpublishメソッドを事項することでこのObservableインスタンスからConnectableObservableインスタンスを生成している。
メッセージの通知は、Observable#createメソッドのOnSubcribeインターフェイスが持つcallメソッドの引数のSubscriberインスタンスにて行う。SubscriberインスタンスのonNextメソッドに値を渡すことで、変更があったことを通知する。
また、このSubscriberに対する登録の解除が既にされていた場合は何もしないし、入力を待っている状態から抜けたらSubscriberの解除をonCompletedメソッドを呼ぶことで行っている。
入力値の取得
private static Observable<Double> varStream( final String varName, Observable<String> input) { return input.map(new Func1<String, Matcher>() { @Override public Matcher call(String str) { return 正規表現で対象の文字列を解析するMatcher } }).filter(new Func1<Matcher, Boolean>() { @Override public Boolean call(Matcher matcher) { return 正規表現で値が取得できるかチェック }; }).map(new Func1<Matcher, Double>() { @Override public Double call(Matcher matcher) { return 文字列からDoubleに変換 } }); }
入力値を受け取るConnectableObservableインスタンスからmapやfilterメソッドを使ってaもしくはbの入力値を取りDoubleの値を取得するObserverを生成している。
実装的には正規表現を使って対象の文字列を取ってきてDoubleに変換するObservableインスタンスを生成している。
また、これはラムダ式を使えばもっと簡素に書けると思うが、このあたりはChapter 2でJava 8の話をしているようなので、あえてこう書いているものと思われる。実際、このほうが何のインターフェースを使っているのかわかりやすいので初見の人には優しいと思われる。
private static Observable<Double> varStream( final String varName, Observable<String> input) { return input.map(str -> 正規表現で対象の文字列を解析するMatcher) .filter(matcher -> 正規表現で値が取得できるかチェック) .map(matcher -> 文字列からDoubleに変換); }
aとbのObservableインスタンスを使ってa + b
を行うObserverインスタンス
final class ReactiveSum implements Observer<Double> { private double sum; public ReactiveSum(Observable<Double> a, Observable<Double> b) { this.sum = 0; // a と bを受け取りFunc2の処理を実行するObservableインスタンスを生成 Observable.combineLatest(a, b, new Func2<Double, Double, Double>() { @Override public Double call(Double a, Double b) { return a + b; } }) // Observableインスタンスのメッセージを受け取るObserverを登録する .subscribe(this); } @Override public void onCompleted() { System.out .println("Exisiting last sum was : " + this.sum); } @Override public void onError(Throwable e) { System.err.println("Got an error!"); } @Override public void onNext(Double sum) { this.sum = sum; System.out.println("update : a + b = " + sum); } }
ここではObservableの変更が通知されたら何らかの処理を行うObserverオブジェクトを実装する。ObserverにはObservable#subscrive(Observer)
メソッドでメッセージを受信するように登録することができる。これでObserverインスタンスはObserveble内のSubscriberがメッセージを発行した際にその情報が通知されることになる。
またa + b
を行うためaの値およびbの値を取得するObservaleインスタンスを保持しそれらの2つから処理を行う新たなObservableインスタンスを生成している。これはObserveble#combineLateset(Observable<A>, Observable<B>, Func2<A, B, R>)
メソッドで生成できる。このObservableインスタンスにObserverインスタンスを登録することで、a + b
が実行されるたびにObserverインスタンスに通知が行くことになる。
また、onErrorメソッドで例外時の処理をonCompletedメソッドで終了時の処理を実装している。
Summary
- この例を見ると複雑で混乱するように見えるが、実際にやってることはシンプル
- 新しいことが多いが、Chapterを進めるごとに詳細を説明していくので問題ない
- もっとReactive Programmingについて知りたいならReactive Programming in the Netflix API with RxJava by Ben Christensen and Jafar HusainとReactive Programming with Rx at QConSF 2014 by Ben Christensenがおすすめ
※ ちなみにBen Christensenによるプレゼンは下記にいろいろある