SE情報技術研究会’s blog

http://se-info-tech.connpass.com

2015-12-23 『Learning Reactive Programming with Java 8』 もくもく読書会(Chapter 3)の振り返り

Learning Reactive Programming With Java 8

Learning Reactive Programming With Java 8

Chapter 3: Creating and Connecting Observables, Observers, and Subjects

Introduction

学ぶ必要があるもの

  • ユーザ入力だけではなく様々なデータをObservableインスタンスを通じて扱えるか?
  • 振る舞いによってどのようなObservableインスタンスの種類があるのか?
  • Observableインスタンスを、いつ、どのように登録解除(unsubscribe)すべきか?
  • どのようにsubscriptionとObserverインスタンスを使うのか?
  • Subject typeについて

この章では下記を学んでいく

  • Observable のファクトリーメソッド
  • Observer と Subscriber
  • Hot Observable と Cold Observable:Connectable Observable
  • Subjectとは?
  • Observable の生成

The Observable.from method

引数の型

  • Iterable
  • 配列

Iterableオブジェクトによってはiteratorメソッドが一度しか呼ぶことができないものもあり、一度しか呼ぶことができないソースの場合、Observable#subscribeメソッドを呼ぶとエラーになる。

Listは複数回呼んでも問題ない

    List<String> list = Arrays.asList("blue", "red",
        "green", "yellow", "Orange", "cyan", "purple");
    Observable<String> observable = Observable.from(list);
    observable.subscribe(
        color -> System.out.print(color + "|"),
        System.out::println, System.out::println);
    observable
        .subscribe(color -> System.out.print(color + "/"));

DirectStreamは複数回呼べない

    Path resources = Paths.get("src", "resources");
    try (DirectoryStream<Path> dStream = Files
        .newDirectoryStream(resources)) {
      Observable<Path> dirObservable = Observable
          .from(dStream);
      dirObservable.subscribe(System.out::println);
      // 下記でエラーが発生する
      dirObservable.subscribe(System.out::println); 
    } catch (IOException e) {
      e.printStackTrace();
    }

http://docs.oracle.com/javase/jp/7/api/java/nio/file/DirectoryStream.html

DirectoryStream は Iterable を拡張しますが、それがサポートする Iterator は 1 つだけなので、それは汎用の Iterable ではありません。つまり、iterator メソッドを呼び出して、2 つめまたはそれに続くイテレータを取得すると、IllegalStateException がスローされます。

subscriveメソッド

この本で主に使われているものは下記の2パターン

public Subscription subscribe(Action1<? super T> onNext)
  • onNextで実行する処理のみ定義
public Subscription subscribe(Action1<? super T> onNext, Action1<? super T> onError, Action0 onComplete)
  • onNextで実行する処理を定義。※ 処理に渡される引数が1つある
  • onErrorでエラー時の処理を定義。※ 処理に渡される引数が1つある
  • onCompleteで処理終了時の処理を定義。※ 処理に渡される引数なし

The Observable.just method

  • 対象がオブジェクトが複数データをあらわすのではなくひとつのオブジェクトをあらわす場合にObservableインスタンスを生成するメソッド
  • justメソッドの引数には複数の9つまでの値を設定できる
  • 複数の値を設定した場合はonCompleteメソッドは全引数の処理が終わった後に実行される

    Observable.just('S').subscribe(System.out::println);
    Observable.just('R', 'x', 'J', 'a', 'v', 'a').subscribe(
        value -> System.out.print("*" + value + "*"), //
        System.err::println, //
        () -> System.out.println("/"));

実行結果

S
*R**x**J**a**v**a*/

Other Observable factory methods

※ 実行する際に処理の最後にThread.sleepで処理を停止させないとObservableの処理が終わる前にメインスレッドが終了していしまい、結果が表示されないので注意

Observable#interval

  • 指定したインターバルで処理を行うObservableインスタンスを生成する
  • 処理を行うAction1の引数には0からのカウントが渡される

    Helpers.subscribePrint(
        Observable.interval(1000L, TimeUnit.MILLISECONDS),
        "interval");

実行結果 ※ 1秒後に実行される

interval:0
interval:1
interval:2
…略

Observable#timer(long, long, TimeUnit)

public static final Observable<java.lang.Long> timer(long initialDelay,
                               long period,
                               java.util.concurrent.TimeUnit unit)
  • 指定した待ち時間(initialDelay)を経過した後に指定したインターバル(period)で処理を行うObservableインスタンスを生成する
  • 処理を行うAction1の引数には0からのカウントが渡される

ただしこのメソッドは今使っているバージョン(1.0.16)だとDeprecatedになっている。

※ が、WebにあるRxJavaのJavaDocだと何も書いていない
※ このサイトはバージョンも書いてないので、そもそもメンテされてない?
※ どうもObservable#intervalのほうに移動したもよう

ちなみに第1引数のperiodがないtimerメソッドはDeprecatedされていないので、こちらは今後も使われるもよう

Observable#timer(long, TimeUnit)

  • 第1引数の待ち時間経過後、1度だけ実行されるObservableインスタンスを生成する
  • 実行メソッドに渡される引数の値は「0」である

Observable#error

Observable#empty

  • onNext処理が行われないObserverbleインスタンスを生成する
  • onCompleted処理はすぐに実行される

Observable#never

  • onNext処理もonCompleted処理も行われないObserverbleインスタンスを生成する

Observable#range

public static final Observable<Integer> range(int start, int count)
  • subscribeされるとすぐにstratからcount分実行されるObserverインスタンスを生成する
  • onNext処理に渡される値はstratから始まり1つずつ加算された値

例:

Helpers.subscribePrint(Observable.range(10, 3), "range");

実行結果:

range:10
range:11
range:12
range ended!

The Observable.create method

Observable.from(Iterableメソッドを見てみる

実際のソースとは違うが本によるとざっくりとした実装イメージは下記

<T> Observable<T> fromIterable(final Iterable<T> iterable) {
  return Observable.create(new OnSubscribe<T>(){
    @Override
    public void call(Subscriber<? super T> subscriber){
      try {
        Iterator<T> iterator = iterable.iterator();
        while (iterator.hasNext()){
          subscriber.onNext(iterator.next());
        }
        subscriber.onCompleted();
      }
      catch(Exception e){
        subscriber.onError(e);
      }
    }
  });
}

※ この処理のままだとunsubscribeされた際の処理が抜けているので注意

Subscribing and unsubscribing

subscribeメソッド

subscribe()

subscribe(Action1<? super T>)

  • OnSubscribe#call(Subscriber)のsubscriber#onNextで実行する処理を第1引数のAction1にさせる
  • OnErrorの通知があるとOnErrorNotImplementedExceptionがthrowされる
  • onCompletedの通知があっても何もしない
  • 著者によるとエラーがないことを保証するのは厳しいので製品としてリリースするコードにはこのメソッドは使わないほうが良いらしい

実装:

  public final Subscription subscribe(
      final Action1<? super T> onNext) {
    if (onNext == null) {
      throw new IllegalArgumentException(
          "onNext can not be null");
    }
    subscribe(new Subscriber() {
      public final void onCompleted() {
      }

      public final void onError(Throwable e) {
        throw new OnErrorNotImplementedException(e);
      }

      public final void onNext(T args) {
        onNext.call(args);
      }
    });
  }

subscribe(Action1<? super T>, Action1)

  • subscriber#onNextで実行する処理を第1引数のAction1にさせる
  • subscriber#onErrorで実行する処理を第2引数のAction1にさせる

subscribe(Action1<? super T>, Action1, Action0)

  • subscriber#onNextで実行する処理を第1引数のAction1にさせる
  • subscriber#onErrorで実行する処理を第2引数のAction1にさせる
  • subscriber#onCompletedで実行する処理を第3引数のAction0にさせる

subscribe(Observer<? super T>)

    Observable.create(new OnSubscribe<Integer>() {
      @Override
      public void call(
          Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
      }
    }).subscribe(new Observer<Integer>() {

      @Override
      public void onCompleted() {
        System.out.println("onCompleted()");
      }

      @Override
      public void onError(Throwable paramThrowable) {
        System.err.println("onError()");
      }

      @Override
      public void onNext(Integer param) {
        System.out.println("onNext(" + param + ")");
      }
    });

実行結果

onNext(1)
onCompleted()

subscribe(Subscripter<? super T>)

  • subscribe(Observer<? super T>)と基本的には同じ
  • SubscripterはObserverインターフェイスを実装した抽象クラス
  • Subscripterにはいくつかの有用な機能が装備されている
  • ほとんどのsubscribeメソッドは内部でこのsubscribeメソッドを呼んでいる
  • RxContractに沿った処理が保証される
  • この本ではこれ以降のObservable#subscribeはこのメソッドを意味する

unsafeSubscribe(Subscripter<? super T>)

  • subscribe(Subscripter<? super T>)と基本的には同じだがRxContractによる保護はない
  • 通常のsubscribeメソッドによるオーバーヘッドを軽減するために独自実装する際に使われる
  • 基本使うべきではない

unsubscribeメソッドとisUnscribedメソッド

Observable.from(Iterableメソッドを見てみる

先に出ていたfromIterableメソッドの実装例にunsubscribeの対応を実装したイメージは下記

<T> Observable<T> fromIterable(final Iterable<T> iterable) {
  return Observable.create(new OnSubscribe<T>(){
    @Override
    public void call(Subscriber<? super T> subscriber){
      try {
        Iterator<T> iterator = iterable.iterator();
        while (iterator.hasNext()){
          if (subscriber.isUnscribed()){
            return;
          }
          
          subscriber.onNext(iterator.next());
        }
        if (!subscriber.isUnscribed()){
          subscriber.onCompleted();
        }
      }
      catch(Exception e){
        if (!subscriber.isUnscribed()){
          subscriber.onError(e);
        }
      }
    }
  });
}
  • どのタイミングでunsubscribeされるかはわからないので全行程にunsubscribeのチェックはいれるべき
    コメント
  • 可能性とはしてはかなり低いがisUnscribed直後にunsubscribeされるとどうしようもない気がする
  • そういうのが非同期のほぼ再現しないエラーになる気がする

Subscriber.add(Subscription)

  • Subscriberがunsubscribeされると追加されたSubscriptionは全てunsucribeされる

※ 詳しくはChpter 8のリソース管理にて学ぶらしい

Hot and cold Observable instances

Cold Observableインスタンス

  • Observableのcreateメソッド、fromメソッド、justメソッドはsubscribeメソッドは実行されるまで何もしない
  • subscribeされるたびに毎回通知を発行する
  • それぞれ独立して通知を受け処理をする

Hot Observableインスタンス

  • 開始するとOnCompletedの通知がくるまで動き続けるObservableインスタンス
  • subscriptionは気にしない
  • 全てのSubscriberが同じSubscriptionを受け取る
  • Chapter 1の入力を受け取るObservableObservable<Double> a = varStream("a", input);がHot Observableインスタンス

Cold と Hot のObservableインスタンスの例え

  • Cold Observableインスタンスを音楽で例えるならCDでたくさんの人はCDを買うが聴くのはそれぞれ
  • Hot Observableインスタンスを音楽で例えるならラジオでそのラジオを聴いている人は全て同じ曲を同じタイミングで聞いている

The ConnectableObservable class

publishメソッド

本の例だとわかりづらかったのでちょっと変更

    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);

    ConnectableObservable<Long> published = interval
        .publish();
    Subscription sub1 = Helpers.subscribePrint(published,
        "First");
    Subscription sub2 = Helpers.subscribePrint(published,
        "Second");

    Thread.sleep(3000L);

    published.connect();
    Thread.sleep(5000L);
    sub1.unsubscribe();

    Subscription sub3 = Helpers.subscribePrint(published,
        "Third");
    Thread.sleep(3000L);

    sub2.unsubscribe();
    sub3.unsubscribe();

実行結果

First:0
Second:0
First:1
Second:1
First:2
Second:2
First:3
Second:3
First:4
Second:4
Second:5
Third:5
Second:6
Third:6
Second:7
Third:7
  • connectメソッドを呼ばれた後にsubscribeメソッドを呼び出すと既に動いている状態でSubscriptionが生成されている
  • publishedが発行しているOnNextの通知が複数で受け取ることができている

replayメソッド

  • publishメソッドとほぼ同じだが既に開始しているConnectableObservableインスタンスから新たにsubscribeした際にそれまでの通知を全て受け取ることできる
    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);

    ConnectableObservable<Long> replay = interval.replay();
    Subscription sub1 = Helpers.subscribePrint(replay,
        "First");
    Thread.sleep(3000L);

    replay.connect();
    Thread.sleep(5000L);
    sub1.unsubscribe();

    Subscription sub2 = Helpers.subscribePrint(replay,
        "Second");
    Thread.sleep(3000L);

    sub2.unsubscribe();

実行結果

First:0
First:1
First:2
First:3
First:4
Second:0
Second:1
Second:2
Second:3
Second:4
Second:5
Second:6
Second:7

refCountメソッド

  • subscribeしている全てのObservableインスタンスがunsubscribeされた後にそのインスタンスをsubscribeしたら、今までの通知がなくなり最初から通知を始める
    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);

    Observable<Long> refCount = interval.publish()
        .refCount();
    Subscription sub1 = Helpers.subscribePrint(refCount,
        "First");
    Subscription sub2 = Helpers.subscribePrint(refCount,
        "Second");

    Thread.sleep(3000L);
    sub1.unsubscribe();
    sub2.unsubscribe();

    Subscription sub3 = Helpers.subscribePrint(refCount,
        "Third");

    Thread.sleep(5000L);
    sub3.unsubscribe();

実行結果:

First:0
Second:0
First:1
Second:1
First:2
Second:2
Third:0
Third:1
Third:2
Third:3
Third:4

shareメソッド

  public final Observable<T> share() {
    return publish().refCount();
  }

The Subject instances

4種類のSubjectがある

  • PublishSubject
  • ReplaySubject
  • BehaviorSubject
  • AsyncSubject

PublishSubject

  • publishメソッドで作られたConnectableObservableのように振る舞う

本のサンプルだとわかりづらかったので少し変更

    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);
    Subject<Long, Long> pubishSubject = PublishSubject
        .create();
    interval.subscribe(pubishSubject);

    Subscription sub1 = Helpers
        .subscribePrint(pubishSubject, "First");
    Thread.sleep(3000L);

    Subscription sub2 = Helpers
        .subscribePrint(pubishSubject, "Second");

    Thread.sleep(3000L);

    pubishSubject.onNext(555L);

    sub1.unsubscribe();

    Subscription sub3 = Helpers
        .subscribePrint(pubishSubject, "Third");
    Thread.sleep(3000L);

    sub2.unsubscribe();
    sub3.unsubscribe();

実行結果:

First:0
First:1
First:2
First:3
Second:3
First:4
Second:4
First:5
Second:5
First:555
Second:555
Second:6
Third:6
Second:7
Third:7
Second:8
Third:8

ReplaySubject

  • replayメソッドで作られたConnectableObservableのように振る舞い過去に発行された通知を新しいSubjectが開始時に受け取ることができる
  • ReplaySubjectはいくつものファクトリーメソッドを持つ
  • デフォルトのファクトリーメソッドは全ての通知を覚えているためメモリを食う
  • 元となるObservableインスタンスがなくても生成可能
    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);
    Subject<Long, Long> subject = ReplaySubject.create();
    interval.subscribe(subject);

    Subscription sub1 = Helpers.subscribePrint(subject,
        "First");
    Thread.sleep(3000L);

    Subscription sub2 = Helpers.subscribePrint(subject,
        "Second");
    Thread.sleep(3000L);

    subject.onNext(555L);
    sub1.unsubscribe();

    Subscription sub3 = Helpers.subscribePrint(subject,
        "Third");
    Thread.sleep(3000L);

    sub2.unsubscribe();
    sub3.unsubscribe();

実行結果

First:0
First:1
First:2
Second:0
Second:1
Second:2
First:3
Second:3
First:4
Second:4
First:5
Second:5
First:555
Second:555
Third:0
Third:1
Third:2
Third:3
Third:4
Third:5
Third:555
Second:6
Third:6
Second:7
Third:7
Second:8
Third:8

BehaviorSubject

  • キャッシュサイズが1しかないReplaySubjectと同じように振る舞う
  • 元となるObservableインスタンスがなくても生成可能
    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);
    Subject<Long, Long> subject = BehaviorSubject.create();
    interval.subscribe(subject);

    Subscription sub1 = Helpers.subscribePrint(subject,
        "First");
    Thread.sleep(3000L);

    Subscription sub2 = Helpers.subscribePrint(subject,
        "Second");
    Thread.sleep(3000L);

    subject.onNext(555L);
    sub1.unsubscribe();

    Subscription sub3 = Helpers.subscribePrint(subject,
        "Third");
    Thread.sleep(3000L);

    sub2.unsubscribe();
    sub3.unsubscribe();

実行結果

First:0
First:1
First:2
Second:2
First:3
Second:3
First:4
Second:4
First:5
Second:5
First:555
Second:555
Third:555
Second:6
Third:6
Second:7
Third:7
Second:8
Third:8

AsyncSubject

  • 元のObservableがOnCompletedが実行された後に起動するSubject
  • Observableインスタンスの最後の値がonNextに渡される
  • 元のObservableが処理が終わらない場合、何も起動されない
  • on*メソッドを使って、元となるObservableインスタンスがなくても生成可能
    // intervaleのSubjectはonCompletedを呼ぶまで処理が終わらないので実行されない
    Observable<Long> interval = Observable.interval(1L,
        TimeUnit.SECONDS);
    Helpers.subscribePrint(interval, "intervale");
    Subject<Long, Long> intervalSubject = AsyncSubject
        .create();
    interval.subscribe(intervalSubject);

    Helpers.subscribePrint(intervalSubject,
        "intervalSubject");

    // 下記のSubjectは処理終了後に実行される
    Long[] array = { 1L, 2L, 3L };
    Observable<Long> observable = Observable.from(array);
    observable.subscribe(value -> {
      try {
        Thread.sleep(1000L);
        System.out.println("value=" + value);
      } catch (Exception e) {
      }
    } , System.err::println,
        () -> System.out.println("onCompleted called"));
    Subject<Long, Long> asyncSubject = AsyncSubject
        .create();
    observable.subscribe(asyncSubject);

    Helpers.subscribePrint(asyncSubject, "Async1");
    Helpers.subscribePrint(asyncSubject, "Async2");

    Thread.sleep(5000L);

実行結果

intervale:0
value=1
intervale:1
value=2
intervale:2
value=3
onCompleted called
Async1:3
Async1 ended!
Async2:3
Async2 ended!
intervale:3
intervale:4
intervale:5
intervale:6
intervale:7

Subjectの危険性

  • onNextメソッド、onErrorメソッド、onCompletedメソッドへの直接アクセスができるため、ロジックが汚くなり、間違った使い方がされる可能性が高い