Angular上級 #4 RxJS 深掘り — カスタム operator と Scheduler

読了 10分

中級 #3 RxJS 基礎 で Observable の構造、map/filter/switchMap のような基本 operator、Subject/BehaviorSubject までを整理しました。そこまでが RxJS の半分なら、今回扱うのが残りの半分です。higher-order Observable の本来の姿、並行性戦略を決定する 4 つの *Map operator、自分で作る operator、Scheduler で時間を統制する方法、そして marble testing まで — 実戦で RxJS が真の武器に変わる地点です。

Higher-order Observable とは #

Observable が値として また別の Observable を流したらどうなるでしょうか? これが higher-order Observable です。Observable<Observable<T>> の形です。初めて見ると「なぜこんなものが必要なのか」と思いますが、実は私たちは既に毎日作っています。

src/app/higher-order-demo.ts
import { fromEvent, map, Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// click$ は Observable<Event>
const click$ = fromEvent(document, 'click');

// クリックが発生するたびに 'ajax' Observable を作って流します。
// 結果の型: Observable<Observable<Response>>
const requests$$: Observable<Observable<unknown>> = click$.pipe(
  map(() => ajax('/api/me')),
);

requests$$ をそのまま購読すると流れてくるのはレスポンスデータではなく レスポンス Observable そのもの です。私たちが望むのは内側の Observable を解いて本当の値を取り出すことです。これを flattening と呼びます。

flattening 戦略は 4 つあります。

  • mergeAll — 入ってくる内側の Observable をすべて同時に購読。結果が入り混じっても構わないとき。
  • switchAll — 新しい内側の Observable が来たら以前のものは即座にキャンセルし、新しいものだけを見る。
  • concatAll — 一度に 1 つずつ、前のものが終わってから次のものを始める。
  • exhaustAll — 進行中の内側の Observable があれば、新たに入ってくるものは無視。

実際には map + *All を毎回一緒に使うので、2 つを合わせた短縮形が私たちがいつも使っている mergeMap/switchMap/concatMap/exhaustMap です。つまり — switchMapmap(...) + switchAll の短縮 です。

concatMap vs mergeMap vs switchMap vs exhaustMap #

名前が似ているので慣れるまで混同します。並行性をどう扱うかで区別すると明確になります。

operator新しい値が来たとき使用パターン
concatMap並べて 順番どおりに 処理順序が重要な作業 (保存キュー、トランザクション)
mergeMap並列で 同時に進める順序無関の並列リクエスト (バルクアップロード)
switchMap以前のものを キャンセル、新しいものだけ検索オートコンプリート、ルートパラメータの変化
exhaustMap進行中なら 無視フォーム submit のダブルクリック防止、ログインボタン

特に exhaustMap は意外とよく出会う状況を最もすっきりと解きます。ユーザーがログインボタンを素早く 2 回押したとき、2 回目のクリックは そのまま捨てるのが 正しいです。

src/app/login.component.ts
import { Component, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Subject, exhaustMap } from 'rxjs';
import { AuthService } from './auth.service';

@Component({ /* ... */ })
export class LoginComponent {
  private auth = inject(AuthService);
  private submit$ = new Subject<Credentials>();

  constructor() {
    this.submit$
      .pipe(
        exhaustMap(creds => this.auth.login(creds)),
        takeUntilDestroyed(),
      )
      .subscribe(user => { /* ルーティングなど */ });
  }

  onSubmit(creds: Credentials) {
    this.submit$.next(creds);
  }
}

mergeMap だったら 2 回ログインが飛びます。switchMap だったら 1 回目のリクエストが応答直前にキャンセルされ、ユーザー体験が不自然になります。exhaustMap は「もう送っているから君は無視」になります。

ヒント
実戦の判断基準: 最後のものだけ意味があるかswitchMapすべて送らなければならないかmergeMap順序が重要かconcatMap重複発射を防ぎたいかexhaustMap。迷ったときはこの 4 行で 90% をカバーできます。

カスタム operator を作る #

同じパイプライン断片を複数のコンポーネントで繰り返しているなら、関数として切り出すときです。RxJS の operator は結局 Observable<T> を受け取って Observable<R> を返す関数 に過ぎません。型の名前もそのままです — OperatorFunction<T, R>

検索パターン(debounce → distinct → trim → 長さフィルター)を毎回書いているとしましょう。

src/app/operators/search-input.ts
import { OperatorFunction, pipe } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, filter } from 'rxjs';

export function searchInput(
  debounceMs = 300,
  minLength = 2,
): OperatorFunction<string, string> {
  return pipe(
    debounceTime(debounceMs),
    map(q => q.trim()),
    distinctUntilChanged(),
    filter(q => q.length >= minLength),
  );
}

pipe() は関数合成のツールです。operator たちを受け取って 1 つの operator にまとめてくれます。使用箇所はシンプルになります。

src/app/search.component.ts
results = toSignal(
  this.query.valueChanges.pipe(
    searchInput(300, 2),
    switchMap(q => this.userService.search(q)),
  ),
  { initialValue: [] },
);

型推論もそのまま流れます。stringstring というシグネチャが明示されているので、次のステップの switchMapq は自然に string として捕まえられます。チームで RxJS に慣れた人が少ないほど、こういう意味のある名前を持った operator がコードの可読性を大きく引き上げます。

Scheduler — 非同期の時間制御 #

RxJS のほぼすべての operator は 2 番目の引数として Scheduler を受け取れます。Scheduler は「この作業をいつ、どのキューに乗せて実行するか」を決定するオブジェクトです。何も指定しないと RxJS が同期的/マイクロタスクベースで自動的に回します。

よく使う Scheduler は 5 つです。

  • asyncSchedulersetTimeout ベース。マクロタスク。
  • asapSchedulerPromise.resolve() ベース。マイクロタスク。
  • queueScheduler — 同期、FIFO キュー。再帰を平坦化。
  • animationFrameSchedulerrequestAnimationFrame ベース。60fps に合わせて動く流れに。
  • VirtualTimeScheduler — テスト用。時間を仮想的に流します。

実戦例 1 つ。1 万個のデータを一度に描画するとメインスレッドが止まってしまうので、フレーム単位で区切って漸進的にレンダリング したいです。

src/app/lib/chunked-render.ts
import { from, observeOn, animationFrameScheduler, bufferCount } from 'rxjs';

export function renderInChunks<T>(items: T[], chunkSize = 100) {
  return from(items).pipe(
    bufferCount(chunkSize),
    observeOn(animationFrameScheduler),
  );
}

bufferCount で 100 個ずつまとめ、observeOn がその束の emit を毎アニメーションフレームに移します。UI は滑らかに埋まり、メインスレッドは余裕を持ちます。

エラー処理パターン #

基礎で catchError だけ短く扱いました。実戦ではリトライポリシーがほぼ常にセットで登場します。昔のコードでは retryWhen をよく見ていましたが、RxJS 7 から retry({ count, delay }) 形式が標準です。retryWhen は deprecated に近いです。

src/app/services/api.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, retry, timer, catchError, throwError } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class ApiService {
  private http = inject(HttpClient);

  get<T>(url: string): Observable<T> {
    return this.http.get<T>(url).pipe(
      retry({
        count: 3,
        delay: (err: HttpErrorResponse, attempt) => {
          // 4xx はリトライしても無駄なので即座に諦める
          if (err.status >= 400 && err.status < 500) {
            return throwError(() => err);
          }
          // 5xx とネットワークエラーは指数バックオフ
          return timer(2 ** attempt * 500);
        },
        resetOnSuccess: true,
      }),
      catchError(err => {
        // すべてのリトライ失敗後の最終フォールバック
        console.error('API 失敗', err);
        return throwError(() => err);
      }),
    );
  }
}

ポイントは delay コールバックです。エラーの種類によってリトライの可否を別にしたいときにこのパターンをそのまま持ってきて使います。resetOnSuccess: true は一度成功するとカウンタをリセットするので、長期購読でも安全です。

マルチキャスト — 一度流して複数箇所で受け取る #

基本の Observable は cold です。購読するたびに最初から再実行されます。HTTP リクエストなら — 同じデータを 2 つのコンポーネントが購読したときにリクエストが 2 回飛びます。意図したものでなければマルチキャストでブロックする必要があります。

shareReplay が最もよく使う道具です。

src/app/services/config.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { shareReplay } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class ConfigService {
  private http = inject(HttpClient);

  config$ = this.http.get<AppConfig>('/api/config').pipe(
    shareReplay({ bufferSize: 1, refCount: true }),
  );
}
  • bufferSize: 1 — 最後の 1 個の値を覚えておき、遅れて購読した人にも即座に流します。
  • refCount: true — 購読者が 0 になると内部購読を片付けます。これがないと一度生き返った流れが永遠にメモリに残ります。

refCount を切ると(デフォルト)、Observable は一度始まったら絶対に止まりません。ほぼ常に refCount: true で点けておいてください。切るのが意図のときだけ切るのです。

share()shareReplay よりも軽い兄弟です。値を覚えずに、流す時点からの値だけを共有します。イベントストリームに適しています。

Subject の変形 — ReplaySubjectAsyncSubject #

中級で SubjectBehaviorSubject だけ扱いましたが、2 つの変形がさらにあります。

ReplaySubject — 最近 N 個の値を覚えます。BehaviorSubject が 1 個だけ覚えるなら、ReplaySubject は N 個を覚えるわけです。初期値がなくても大丈夫です。

const log$ = new ReplaySubject<string>(5);
log$.next('a');
log$.next('b');
log$.next('c');
// 新しい購読者も a、b、c をすべて受け取る
log$.subscribe(console.log);

セッションログ、最近の通知 N 個、チャット履歴のようなものに馴染みます。

AsyncSubjectcomplete される瞬間の 最後の値 1 つだけ 流します。完了前は何も流しません。一度に結果 1 つだけ意味のある作業 — 例えば大きな計算の最終結果 — に適しているのですが、実はそういうケースは Promise や単発 Observable で解けることが多く、頻度は最も低いです。「こういうものがある」程度に覚えておけばよいです。

Marble testing — 時間を仮想的に #

非同期な流れはテストが難しいです。setTimeout を本当に待つとテストが遅くなり、タイミングがかみ合うバグは再現すら難しいです。TestScheduler を使えば時間を 仮想的に 流せます。RxJS 陣営ではこれを marble diagram の文法と一緒に使います。

src/app/operators/search-input.spec.ts
import { TestScheduler } from 'rxjs/testing';
import { searchInput } from './search-input';

describe('searchInput', () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('debounce 後に trim・distinct・minLength を通過', () => {
    scheduler.run(({ cold, expectObservable }) => {
      // ' a ' (0ms) → 'ab' (10ms) → 'ab' (20ms) → 'ab ' (40ms)
      const input$ = cold('a-b-b--c|', { a: ' a ', b: 'ab', c: 'ab ' });
      const expected =    '----------- 290ms b|';
      expectObservable(input$.pipe(searchInput(300, 2))).toBe(expected, { b: 'ab' });
    });
  });
});

- は 1 フレーム(仮想の 1ms)、文字は emit、| は complete を意味します。本当に 300ms を待たなくても debounce の動作を検証できます。詳細な文法は最初は不慣れでも、ファイル 1 つを丸ごと一度書いてみると素早く慣れます。

cold は購読時点から流れる Observable で、hot は既に流れている Observable です。HTTP レスポンスのような cold ストリームには cold、マウスイベントや WebSocket のような hot ストリームには hot を使います。

よくある落とし穴 #

長く使っていると出会う足跡をいくつか整理しておきます。

1. subscribe の中に subscribe — operator で解くべき流れをコールバックの入れ子で書くパターンです。ほぼ常に switchMapmergeMap で平坦化されます。

// 悪い例
this.userId$.subscribe(id => {
  this.api.getUser(id).subscribe(user => {
    this.user = user;
  });
});

// 良い例
this.user$ = this.userId$.pipe(switchMap(id => this.api.getUser(id)));

2. switchMap の中の raceswitchMap は以前の内側の Observable をキャンセルしますが、以前の effect の副作用はキャンセルしません。キャッシュに書いたりグローバル状態に触れる作業が内側にあれば、キャンセルされた流れが作った副作用がそのまま残る可能性があります。

3. cold/hot の混同http.get() は cold なので subscribe のたびにリクエストが新たに飛びます。2 箇所で購読すれば呼び出しが 2 回飛びます。これを知らないと「なぜ API が 2 回呼ばれるんですか?」と長い間悩みます。shareReplay({ bufferSize: 1, refCount: true }) で hot にするか、サービスで一度だけ購読した結果を BehaviorSubject で配れば大丈夫です。

4. takeUntilDestroyed 抜け — 直接 subscribe する所で takeUntilDestroyed を忘れるとメモリリークです。コードレビューのチェックリストに 1 行打ち込んでおいてください。

注記
手が慣れると subscribe をほとんど使わなくなります。テンプレートでは async pipetoSignal が自動購読・解除を担当し、コンポーネントクラスでは effecttoObservable でシグナルとつなぎます。subscribe が増えるなら「operator で解けるものはないか?」と一度疑ってみてください。

まとめ #

今回は RxJS 深掘りトピックを整理しました。

  • Higher-order Observable と 4 つの flattening — mergeAll/switchAll/concatAll/exhaustAll
  • 並行性の決定基準: concatMap(順序) / mergeMap(並列) / switchMap(最後) / exhaustMap(重複無視)
  • カスタム operator は OperatorFunction<T, R> を返す関数。pipe で合成
  • Scheduler で時間制御 — animationFrameScheduler で漸進レンダー、VirtualTimeScheduler でテスト
  • retry({ count, delay }) が標準。retryWhen はもう使いません
  • マルチキャストは shareReplay({ bufferSize, refCount: true })refCount はほぼ常に点ける
  • ReplaySubject で最近 N 個、AsyncSubject はほぼ使わない
  • TestScheduler と marble 文法で仮想時間テスト
  • 落とし穴: subscribe の入れ子、switchMap の race、cold/hot の混同、takeUntilDestroyed 抜け

次回は 「Angular上級 #5 NgRx 入門」 です。コンポーネントツリーを越えるグローバルな状態 — Store、Action、Reducer、Selector、Effects — の構造と、「本当に NgRx が必要か?」という問いを一緒に扱います。

X