앵귤러 고급 강좌 #4 RxJS 심화 — 커스텀 operator와 Scheduler

10 분 소요

중급 #3 RxJS 기초에서 Observable의 구조, map/filter/switchMap 같은 기본 operator, Subject/BehaviorSubject까지 정리했습니다. 거기까지가 RxJS의 절반이라면, 이번 글이 다루는 게 나머지 절반입니다. higher-order Observable의 본 모습, 동시성 전략을 결정하는 4가지 *Map operator, 직접 만드는 operator, Scheduler로 시간을 통제하는 법, 그리고 marble testing까지 — 실무에서 RxJS가 진짜 무기로 바뀌는 지점들입니다.

Higher-order Observable이란 #

Observable이 값으로 또 다른 Observable을 흘리면 어떻게 될까요? 이게 higher-order Observable입니다. Observable<Observable<T>> 모양입니다. 처음 보면 “왜 이런 게 필요한가” 싶지만, 실은 우리는 이미 매일 만들고 있습니다.

src/app/higher-order-demo.ts
import { fromEvent, map, Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// click$는 Observable<Event>
const click$ = fromEvent(document, 'click');

// 클릭이 일어날 때마다 'ajax' Observable을 만들어 흘립니다.
// 결과 타입: Observable<Observable<Response>>
const requests$$: Observable<Observable<unknown>> = click$.pipe(
  map(() => ajax('/api/me')),
);

requests$$를 그대로 구독하면 흘러오는 건 응답 데이터가 아니라 응답 Observable 그 자체입니다. 우리가 원하는 건 안쪽 Observable을 풀어 진짜 값을 꺼내는 일입니다. 이걸 flattening이라고 부릅니다.

flattening 전략이 4가지 있습니다.

  • mergeAll — 들어오는 안쪽 Observable을 모두 동시에 구독. 결과가 뒤섞여도 상관 없을 때.
  • switchAll — 새 안쪽 Observable이 오면 이전 것은 즉시 취소하고 새 것만 본다.
  • concatAll — 한 번에 하나씩, 앞엣것이 끝나야 다음 것을 시작.
  • exhaustAll — 진행 중인 안쪽 Observable이 있으면 새로 들어오는 것은 무시.

실제로는 map + *All을 매번 같이 쓰니까, 둘을 합친 단축형이 우리가 늘 쓰는 mergeMap/switchMap/concatMap/exhaustMap입니다. 즉 — switchMapmap(...) + switchAll의 단축입니다.

concatMap vs mergeMap vs switchMap vs exhaustMap #

이름이 비슷하니까 익숙해지기 전까진 헷갈립니다. 동시성을 어떻게 다룰지로 구분하면 명확해집니다.

operator새 값이 올 때사용 패턴
concatMap줄 세워 순서대로 처리순서가 중요한 작업 (저장 큐, 트랜잭션)
mergeMap병렬로 동시에 진행순서 무관 병렬 요청 (벌크 업로드)
switchMap이전 것 취소, 새 것만검색 자동완성, 라우트 파라미터 변화
exhaustMap진행 중이면 무시폼 submit 더블 클릭 방지, 로그인 버튼

특히 exhaustMap은 의외로 자주 만나는 상황을 가장 깔끔하게 풉니다. 사용자가 로그인 버튼을 빠르게 두 번 눌렀을 때, 두 번째 클릭은 그냥 버려야 맞습니다.

src/app/login.component.ts
import { Component, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Subject, exhaustMap } from 'rxjs';
import { AuthService } from './auth.service';

@Component({ /* ... */ })
export class LoginComponent {
  private auth = inject(AuthService);
  private submit$ = new Subject<Credentials>();

  constructor() {
    this.submit$
      .pipe(
        exhaustMap(creds => this.auth.login(creds)),
        takeUntilDestroyed(),
      )
      .subscribe(user => { /* 라우팅 등 */ });
  }

  onSubmit(creds: Credentials) {
    this.submit$.next(creds);
  }
}

mergeMap이었다면 두 번 로그인이 날아갑니다. switchMap이었다면 첫 번째 요청이 응답 직전에 취소돼 사용자 경험이 어색해집니다. exhaustMap은 “이미 보내고 있으니까 너는 무시"가 됩니다.

실전 결정 기준: 마지막 것만 의미 있는가switchMap. 모두 보내야 하는가mergeMap. 순서가 중요한가concatMap. 중복 발사를 막고 싶은가exhaustMap. 헷갈릴 때 이 네 줄이 90%를 가립니다.

커스텀 operator 만들기 #

같은 파이프라인 조각을 여러 컴포넌트에서 반복하고 있다면, 함수로 빼낼 때입니다. RxJS의 operator는 결국 Observable<T>를 받아 Observable<R>을 돌려주는 함수일 뿐입니다. 타입 이름도 그대로입니다 — OperatorFunction<T, R>.

검색 패턴(debounce → distinct → trim → 길이 필터)을 매번 적는다고 해봅시다.

src/app/operators/search-input.ts
import { OperatorFunction, pipe } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, filter } from 'rxjs';

export function searchInput(
  debounceMs = 300,
  minLength = 2,
): OperatorFunction<string, string> {
  return pipe(
    debounceTime(debounceMs),
    map(q => q.trim()),
    distinctUntilChanged(),
    filter(q => q.length >= minLength),
  );
}

pipe()는 함수 합성 도구입니다. operator들을 받아 하나의 operator로 묶어 줍니다. 사용처는 단순해집니다.

src/app/search.component.ts
results = toSignal(
  this.query.valueChanges.pipe(
    searchInput(300, 2),
    switchMap(q => this.userService.search(q)),
  ),
  { initialValue: [] },
);

타입 추론도 그대로 흐릅니다. stringstring이라는 시그니처가 명시되어 있으니, 다음 단계의 switchMap에서 q는 자연스레 string으로 잡힙니다. 팀에 RxJS가 익숙한 사람이 적어질수록 이런 의미 있는 이름의 operator가 코드 가독성을 크게 끌어올립니다.

Scheduler — 비동기의 시간 제어 #

RxJS의 거의 모든 operator는 두 번째 인자로 Scheduler를 받을 수 있습니다. Scheduler는 “이 작업을 언제, 어떤 큐에 올려 실행할까"를 결정하는 객체입니다. 아무것도 지정하지 않으면 RxJS가 동기적/마이크로태스크 기반으로 알아서 굴립니다.

자주 쓰는 Scheduler는 다섯 가지입니다.

  • asyncSchedulersetTimeout 기반. 매크로태스크.
  • asapSchedulerPromise.resolve() 기반. 마이크로태스크.
  • queueScheduler — 동기, FIFO 큐. 재귀를 평탄화.
  • animationFrameSchedulerrequestAnimationFrame 기반. 60fps에 맞춰 움직이는 흐름에.
  • VirtualTimeScheduler — 테스트용. 시간을 가짜로 흘립니다.

실전 예시 하나. 1만 개짜리 데이터를 한 번에 그리면 메인 스레드가 멈춰버리니, 프레임 단위로 끊어 점진 렌더하고 싶습니다.

src/app/lib/chunked-render.ts
import { from, observeOn, animationFrameScheduler, bufferCount } from 'rxjs';

export function renderInChunks<T>(items: T[], chunkSize = 100) {
  return from(items).pipe(
    bufferCount(chunkSize),
    observeOn(animationFrameScheduler),
  );
}

bufferCount로 100개씩 묶고, observeOn이 그 묶음의 emit을 매 애니메이션 프레임으로 옮깁니다. UI는 부드럽게 채워지고 메인 스레드는 한가해집니다.

에러 처리 패턴 #

기초에서 catchError만 짧게 다뤘습니다. 실전에서는 재시도 정책이 거의 항상 같이 갑니다. 옛날 코드에서는 retryWhen을 자주 봤지만, RxJS 7부터 retry({ count, delay }) 형태가 표준입니다. retryWhen은 deprecated에 가깝습니다.

src/app/services/api.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, retry, timer, catchError, throwError } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class ApiService {
  private http = inject(HttpClient);

  get<T>(url: string): Observable<T> {
    return this.http.get<T>(url).pipe(
      retry({
        count: 3,
        delay: (err: HttpErrorResponse, attempt) => {
          // 4xx는 재시도해봐야 소용없으니 즉시 포기
          if (err.status >= 400 && err.status < 500) {
            return throwError(() => err);
          }
          // 5xx와 네트워크 오류는 지수 백오프
          return timer(2 ** attempt * 500);
        },
        resetOnSuccess: true,
      }),
      catchError(err => {
        // 모든 재시도 실패 후의 최종 폴백
        console.error('API 실패', err);
        return throwError(() => err);
      }),
    );
  }
}

핵심은 delay 콜백입니다. 에러 종류에 따라 재시도 여부를 다르게 두고 싶을 때 이 패턴을 그대로 가져다 씁니다. resetOnSuccess: true는 한 번 성공하면 카운터를 리셋하니, 장기 구독에서도 안전합니다.

멀티캐스팅 — 한 번 흘리고 여러 곳에서 받기 #

기본 Observable은 cold입니다. 구독할 때마다 처음부터 다시 실행됩니다. HTTP 요청이라면 — 같은 데이터를 두 컴포넌트가 구독했을 때 요청이 두 번 날아갑니다. 의도한 게 아니라면 멀티캐스팅으로 막아야 합니다.

shareReplay가 가장 자주 쓰는 도구입니다.

src/app/services/config.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { shareReplay } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class ConfigService {
  private http = inject(HttpClient);

  config$ = this.http.get<AppConfig>('/api/config').pipe(
    shareReplay({ bufferSize: 1, refCount: true }),
  );
}
  • bufferSize: 1 — 마지막 1개의 값을 기억해뒀다가, 늦게 구독한 사람에게도 즉시 흘려줍니다.
  • refCount: true — 구독자가 0이 되면 내부 구독을 정리합니다. 이게 없으면 한 번 살아난 흐름이 영영 메모리에 남습니다.

refCount를 끄면(기본값) Observable은 한 번 시작된 후 절대 멈추지 않습니다. 거의 항상 refCount: true로 켜두세요. 끄는 게 의도일 때만 끄는 겁니다.

share()shareReplay보다 가벼운 형제입니다. 값을 기억하지 않고 흘리는 시점부터의 값만 공유합니다. 이벤트 스트림에 적합합니다.

Subject 변형 — ReplaySubjectAsyncSubject #

중급에서 SubjectBehaviorSubject만 다뤘는데, 두 변형이 더 있습니다.

ReplaySubject — 최근 N개의 값을 기억합니다. BehaviorSubject가 1개만 기억하는 거라면, ReplaySubject는 N개를 기억하는 셈입니다. 초기값이 없어도 됩니다.

const log$ = new ReplaySubject<string>(5);
log$.next('a');
log$.next('b');
log$.next('c');
// 새 구독자도 a, b, c를 모두 받음
log$.subscribe(console.log);

세션 로그, 최근 알림 N개, 채팅 히스토리 같은 데 어울립니다.

AsyncSubjectcomplete되는 순간의 마지막 값 하나만 흘립니다. 완료 전에는 아무것도 흘리지 않습니다. 한 번에 결과 하나만 의미가 있는 작업 — 예를 들어 큰 계산의 최종 결과 — 에 적합한데, 사실 그런 케이스는 Promise나 단발 Observable로 풀리는 일이 많아 빈도는 가장 낮습니다. “이런 게 있다” 정도로 알아두면 됩니다.

Marble testing — 시간을 가짜로 #

비동기 흐름은 테스트하기가 까다롭습니다. setTimeout을 진짜로 기다리면 테스트가 느려지고, 타이밍이 맞물리는 버그는 재현조차 어렵습니다. TestScheduler를 쓰면 시간을 가상으로 흘릴 수 있습니다. RxJS 진영에서는 이걸 marble diagram 문법과 함께 씁니다.

src/app/operators/search-input.spec.ts
import { TestScheduler } from 'rxjs/testing';
import { searchInput } from './search-input';

describe('searchInput', () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('debounce 후 trim,distinct,minLength 통과', () => {
    scheduler.run(({ cold, expectObservable }) => {
      // ' a ' (0ms) → 'ab' (10ms) → 'ab' (20ms) → 'ab ' (40ms)
      const input$ = cold('a-b-b--c|', { a: ' a ', b: 'ab', c: 'ab ' });
      const expected =    '----------- 290ms b|';
      expectObservable(input$.pipe(searchInput(300, 2))).toBe(expected, { b: 'ab' });
    });
  });
});

-는 1프레임(가상의 1ms), 글자는 emit, |은 complete를 뜻합니다. 진짜로 300ms를 기다리지 않고도 debounce 동작을 검증할 수 있습니다. 상세 문법은 처음엔 낯설어도, 한 파일 통째로 한 번 짜보면 빠르게 익숙해집니다.

cold는 구독 시점부터 흐르는 Observable이고, hot은 이미 흐르고 있는 Observable입니다. HTTP 응답 같은 cold 스트림에는 cold, 마우스 이벤트나 WebSocket 같은 hot 스트림에는 hot을 씁니다.

흔한 함정 #

오래 쓰다 보면 마주치는 발자국 몇 가지를 정리해두겠습니다.

1. subscribe 안에 subscribe — operator로 풀어야 할 흐름을 콜백 중첩으로 짜는 패턴입니다. 거의 항상 switchMap이나 mergeMap으로 평탄화됩니다.

// 안 좋은 예
this.userId$.subscribe(id => {
  this.api.getUser(id).subscribe(user => {
    this.user = user;
  });
});

// 좋은 예
this.user$ = this.userId$.pipe(switchMap(id => this.api.getUser(id)));

2. switchMap 안의 raceswitchMap은 이전 안쪽 Observable을 취소하지만, 이전 effect의 부수 효과는 취소하지 않습니다. 캐시에 쓰거나 전역 상태를 건드리는 작업이 안쪽에 있다면, 취소된 흐름이 만든 부수 효과가 그대로 남을 수 있습니다.

3. cold/hot 혼동http.get()은 cold라서 subscribe마다 요청이 새로 나갑니다. 두 곳에서 구독하면 호출이 두 번 갑니다. 이걸 모르면 “왜 API가 두 번 불러요?” 하고 한참 헤맵니다. shareReplay({ bufferSize: 1, refCount: true })로 hot으로 만들거나, 서비스에서 한 번만 구독한 결과를 BehaviorSubject로 뿌려주면 됩니다.

4. takeUntilDestroyed 누락 — 직접 subscribe하는 곳에서 takeUntilDestroyed를 깜빡하면 메모리 누수입니다. 코드 리뷰 체크리스트에 한 줄 적어두세요.

노트
손이 익으면 subscribe를 거의 쓰지 않게 됩니다. 템플릿에서는 async pipetoSignal이 자동 구독,해제를 맡고, 컴포넌트 클래스에서는 effecttoObservable로 시그널과 잇습니다. subscribe가 늘어난다면 “operator로 풀 수 있는 게 있지 않나?” 한 번 의심해보세요.

마무리 #

이번 글에서는 RxJS 심화 주제를 정리했습니다.

  • Higher-order Observable과 4가지 flattening — mergeAll/switchAll/concatAll/exhaustAll
  • 동시성 결정 기준: concatMap(순서) / mergeMap(병렬) / switchMap(마지막) / exhaustMap(중복 무시)
  • 커스텀 operator는 OperatorFunction<T, R>을 돌려주는 함수. pipe로 합성
  • Scheduler로 시간 제어 — animationFrameScheduler로 점진 렌더, VirtualTimeScheduler로 테스트
  • retry({ count, delay })가 표준. retryWhen은 더 안 씁니다
  • 멀티캐스팅은 shareReplay({ bufferSize, refCount: true }). refCount는 거의 항상 켜기
  • ReplaySubject로 최근 N개, AsyncSubject는 거의 안 씀
  • TestScheduler와 marble 문법으로 가상 시간 테스트
  • 함정: subscribe 중첩, switchMap race, cold/hot 혼동, takeUntilDestroyed 누락

다음 글은 **“앵귤러 고급 강좌 #5 NgRx 입문”**입니다. 컴포넌트 트리를 넘어서는 전역 상태 — Store, Action, Reducer, Selector, Effects — 의 구조와, “정말 NgRx가 필요한가?“라는 질문을 함께 다루겠습니다.

X