Angular中級 #3 RxJS 基礎 — Observable と Operator

読了 8分

基礎 #7HttpClient を扱いながら Observable にちらっと触れました。そのときは「結果が Observable で来るので subscribe するか toSignal で変換して使おう」程度で済ませましたね。実は RxJS は Angular の半分を支える道具です。今回の記事では、保留にしていた RxJS の核となる概念を本格的にのぞいてみます。

Observable とは #

Observable は 時間軸の上に並ぶ値の流れ です。一度に 1 つではなく、複数の値が順に流れてきうるもので、流れてきた値は誰かが購読しているときにだけ流れていきます。

Promise と比較すると違いがはっきりします。

PromiseObservable
値の数ただ 1 つ0 個以上 (複数も可)
実行時点即時 (生成と同時)遅延 (購読時点)
キャンセル不可購読解除でキャンセル可能
演算子.then チェーンpipe() で多様な operator を組み合わせ

値が一度しか出ないのなら Promise で十分ですが、ユーザーの入力イベントや WebSocket メッセージのように時間とともに流れてくるデータは Observable のほうが自然です。HttpClient の GET リクエストも結果が一度来て終わりですが、Angular は一貫性のためにすべての非同期を Observable で統一しました。

src/app/observable-demo.ts
import { Observable } from 'rxjs';

const numbers$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

numbers$.subscribe({
  next: v => console.log('値:', v),
  complete: () => console.log('終わり'),
});
// 値: 1
// 値: 2
// 値: 3
// 終わり

subscribe を呼び出すまでは何も起こりません。呼び出した瞬間に初めて関数本体が実行されて値が流れ出すのです。Observable は「遅延評価される値のパイプライン」 だと考えると楽です。

注記
変数名の末尾に $ を付ける慣習をよく見かけます。「これは Observable だ」という印です。強制ではありませんが、Angular コミュニティで広く通用する慣習です。

subscribe と unsubscribe — メモリリークの原因 #

Observable は購読した分だけ生きています。コンポーネントが消えても購読を解かなければ流れがそのまま生き残り、メモリリーク が生じます。基礎 #7 で短く言及したあの問題です。

直接解きたいなら、subscribe が返した Subscriptionunsubscribe() を呼び出せば構いません。

良くない例 — 直接管理
private sub?: Subscription;

ngOnInit() {
  this.sub = interval(1000).subscribe(v => console.log(v));
}

ngOnDestroy() {
  this.sub?.unsubscribe();
}

問題は、コンポーネントごとに Subscription 変数と ngOnDestroy を用意するのがすぐに散らかってしまうことです。Angular 16 から入った takeUntilDestroyed がこの仕事を 1 行で終わらせてくれます。

src/app/clock.component.ts
import { Component, inject, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';

@Component({ /* ... */ })
export class ClockComponent {
  private destroyRef = inject(DestroyRef);

  constructor() {
    interval(1000)
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe(v => console.log(v));
  }
}

コンポーネントが消えると、takeUntilDestroyed が流れを自動で切ってくれます。ngOnDestroy も変数も必要ありません。

ヒント
新しいコードで直接 subscribe する必要があるなら、反射的に takeUntilDestroyed も一緒に書いてください。メモリリークの最もよくある出どころが消えます。

主要な operator (1) — 変換 #

operator は Observable の流れの上に挟み込む加工ステップです。pipe() の中に並べて適用します。最初に身につけるべき 3 つから見ていきます。

map — 値を別の値に変えます。

of(1, 2, 3).pipe(map(n => n * 10)).subscribe(console.log);
// 10, 20, 30

filter — 条件に合う値だけ通します。

of(1, 2, 3, 4).pipe(filter(n => n % 2 === 0)).subscribe(console.log);
// 2, 4

tap — 値を変えずに横目で見るだけです。デバッグや副作用に使います。

of(1, 2, 3).pipe(
  tap(v => console.log('流れた値:', v)),
  map(n => n * 2),
).subscribe();

配列の mapfilter と形が同じですね。ただし RxJS は 時間軸上の配列 に対して動作するという点が違います。

主要な operator (2) — 結合 #

複数の Observable を合わせたり、ある Observable の値を別の Observable で差し替えたりする operator たちです。実戦でもっともよく使われます。

switchMap — 新しい値が来たら、前の内部 Observable を キャンセル して新しいものに切り替えます。検索ワードが変わるたびに前の API 呼び出しを捨てるパターンにぴったりです。

mergeMap — 新しい値が来ても、前のものをキャンセルせずに 並列に 流します。すべての結果を受け取らなければならないときに使います。

combineLatest — 複数の Observable の 最新の値 を 1 つの束にまとめます。どちらか一方が新しい値を出すたびに、また 1 つの束が流れてきます。

検索入力 → API 呼び出しが、RxJS の真価が現れる定番の例です。

src/app/search.component.ts
import { Component, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { toSignal } from '@angular/core/rxjs-interop';
import { debounceTime, distinctUntilChanged, switchMap, startWith } from 'rxjs';
import { UserService } from './user.service';

@Component({
  selector: 'app-search',
  standalone: true,
  imports: [ReactiveFormsModule],
  template: `
    <input [formControl]="query" placeholder="検索ワード入力" />
    @for (user of results(); track user.id) {
      <div>{{ user.name }}</div>
    }
  `,
})
export class SearchComponent {
  private userService = inject(UserService);
  query = new FormControl('', { nonNullable: true });

  results = toSignal(
    this.query.valueChanges.pipe(
      startWith(''),
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(q => this.userService.search(q)),
    ),
    { initialValue: [] },
  );
}

流れを日本語でほどくとこうなります。

  1. ユーザーが入力するたびに valueChanges が新しい値を流します。
  2. debounceTime(300) が 300ms の静止を待ち、連打を束ねます。
  3. distinctUntilChanged が同じ検索ワードの繰り返しを除きます。
  4. switchMap が新しい検索ワードで API を呼び出し、前の呼び出しは自動でキャンセル します。
  5. 結果を toSignal がシグナルに変換し、テンプレートに流してくれます。

これを命令型で書くと、setTimeout、AbortController、状態変数 4〜5 個が必要です。operator では 7 行のパイプライン 1 つで終わります。

注記
switchMapmergeMap を取り違えると、結果が混ざったり、前のリクエストが後の結果を上書きしたりするバグが生まれます。検索・オートコンプリートのように「最後のものだけ使えばよい」 なら switchMap複数のリクエスト結果をすべて集める必要がある なら mergeMap です。

Subject と BehaviorSubject #

ここまでの Observable は、誰かが作ってくれた流れを受け取って使う側でした。自分から値を 押し込みたい (push) とき に使うのが Subject です。Subject は Observable であると同時にオブザーバーでもあるので、next() で値を流せます。

src/app/event-bus.ts
import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class EventBus {
  private events$ = new Subject<string>();
  on$ = this.events$.asObservable();

  emit(event: string) {
    this.events$.next(event);
  }
}

他のコンポーネントで bus.on$ を購読すると、bus.emit('login') が呼ばれるたびに通知を受け取れます。

BehaviorSubject はこれに 2 つの特徴が加わった変種です。

  1. 初期値が必須 です。
  2. 現在の値を常に記憶 します。新たに購読した者にも、すぐに最新の値を渡します。

状態ストアとして使うのによく合います。

src/app/auth.service.ts
import { BehaviorSubject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class AuthService {
  private user$$ = new BehaviorSubject<User | null>(null);
  user$ = this.user$$.asObservable();

  login(user: User) { this.user$$.next(user); }
  logout()          { this.user$$.next(null); }
  get current()     { return this.user$$.value; }
}

Subject だと、遅れて購読したコンポーネントは現在のログイン状態を知りません。BehaviorSubject は最後の値を記憶していて、すぐに流してくれます。

ヒント
Subject 変数は外部に公開しないでください。_$$ のような慣習で private に置き、外部には asObservable() で読み取り専用の Observable だけを公開します。そうすればどこからでも next() を呼べなくなります。

HttpClient との結合 #

operator たちが HttpClient と出会うと真価が現れます。検索・フィルタ・リトライを 1 つのパイプにまとめてみましょう。

src/app/user.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, retry, timer, catchError, of } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class UserService {
  private http = inject(HttpClient);

  search(q: string): Observable<User[]> {
    return this.http.get<User[]>('/api/users', { params: { q } }).pipe(
      retry({ count: 2, delay: (_, i) => timer(500 * (i + 1)) }),
      catchError(() => of([])),
    );
  }
}
  • retry が失敗したリクエストを 2 回までリトライします。delay でバックオフ (500ms、1000ms) を設定しました。
  • それでも失敗すれば、catchError が空配列にフォールバックします。UI は「結果なし」として自然に処理されます。

こういった流れを命令型で書くなら、try/catch のネストとカウンタ変数がしばらく必要ですが、operator 2 つで終わります。

async pipetoSignal #

テンプレートで Observable を扱う 2 つの標準的な方法があります。

async pipe は古くからの標準です。テンプレートが自動で購読し、コンポーネントが消えると自動で解除します。

async pipe
@Component({
  imports: [AsyncPipe],
  template: `
    @if (users$ | async; as list) {
      @for (u of list; track u.id) { <div>{{ u.name }}</div> }
    }
  `,
})
export class UserListComponent {
  private userService = inject(UserService);
  users$ = this.userService.getUsers();
}

toSignal は Angular 16+ の新しい標準です。Observable をシグナルに変換するので、テンプレートでは普通にシグナルのように呼び出せばよいです。

toSignal
users = toSignal(this.userService.getUsers(), { initialValue: [] });
// テンプレート: {{ users() }}

新しいコードでは toSignal を優先して検討してください。シグナルベースの変更検知とよく合いますし、computed で他のシグナルと自然に組み合わせられます。ただし、ライブラリやサンプルコードの多くが依然として async pipe を使っているので、両方読めるようにはなっておくべきです。

Signals vs Observable — いつ何を #

基礎 #1 で「RxJS と Signals を状況に応じて使い分ける時代」と言いましたね。短いガイドを整理しておきます。

  • Signals: 単純なコンポーネント状態、同期的な値、computed で派生する値。UI 状態を持つときに圧倒的に楽です。
  • Observable: 時間の流れが絡む非同期 — HTTP レスポンス、ユーザー入力ストリーム、WebSocket、ルーターイベント。operator で変換・結合・リトライが必要なら Observable の領域です。

境界が曖昧なときは、データの出どころが非同期ストリーム であれば Observable で受け取り、画面にすぐ使う段階で toSignal でシグナル化すると、自然に 2 つの世界がつながります。

まとめ #

今回の記事では RxJS の基礎を整理しました。

  • Observable は時間軸上の値の流れ。Promise と違って複数の値・遅延実行・キャンセル可能
  • subscribe はメモリリークの原因。takeUntilDestroyed 1 行で整理
  • 変換 operator: mapfiltertap
  • 結合 operator: switchMapmergeMapcombineLatest — 検索・並列・組み合わせ
  • Subject で値を push、BehaviorSubject で現在の状態を保持
  • HttpClient と結合して、検索・リトライ・フォールバックをパイプ 1 つで
  • テンプレートでは async pipe または toSignal。新しいコードは toSignal 優先
  • 単純な状態は Signals、非同期ストリームは Observable。境界では toSignal でつなぐ

次の記事は 「Angular中級 #4 ライフサイクルフック」 です。OnInit の向こう側のライフサイクル — 入力変更、変更検知の前後、ビュー初期化 — と OnPush 戦略、Signals がライフサイクルをどう単純化するかを扱います。

X