앵귤러 고급 강좌 #4 RxJS 심화 — 커스텀 operator와 Scheduler
중급 #3 RxJS 기초에서 Observable의 구조, map/filter/switchMap 같은 기본 operator, Subject/BehaviorSubject까지 정리했습니다. 거기까지가 RxJS의 절반이라면, 이번 글이 다루는 게 나머지 절반입니다. higher-order Observable의 본 모습, 동시성 전략을 결정하는 4가지 *Map operator, 직접 만드는 operator, Scheduler로 시간을 통제하는 법, 그리고 marble testing까지 — 실무에서 RxJS가 진짜 무기로 바뀌는 지점들입니다.
Higher-order Observable이란 #
Observable이 값으로 또 다른 Observable을 흘리면 어떻게 될까요? 이게 higher-order Observable입니다. Observable<Observable<T>> 모양입니다. 처음 보면 “왜 이런 게 필요한가” 싶지만, 실은 우리는 이미 매일 만들고 있습니다.
import { fromEvent, map, Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// click$는 Observable<Event>
const click$ = fromEvent(document, 'click');
// 클릭이 일어날 때마다 'ajax' Observable을 만들어 흘립니다.
// 결과 타입: Observable<Observable<Response>>
const requests$$: Observable<Observable<unknown>> = click$.pipe(
map(() => ajax('/api/me')),
);requests$$를 그대로 구독하면 흘러오는 건 응답 데이터가 아니라 응답 Observable 그 자체입니다. 우리가 원하는 건 안쪽 Observable을 풀어 진짜 값을 꺼내는 일입니다. 이걸 flattening이라고 부릅니다.
flattening 전략이 4가지 있습니다.
mergeAll— 들어오는 안쪽 Observable을 모두 동시에 구독. 결과가 뒤섞여도 상관 없을 때.switchAll— 새 안쪽 Observable이 오면 이전 것은 즉시 취소하고 새 것만 본다.concatAll— 한 번에 하나씩, 앞엣것이 끝나야 다음 것을 시작.exhaustAll— 진행 중인 안쪽 Observable이 있으면 새로 들어오는 것은 무시.
실제로는 map + *All을 매번 같이 쓰니까, 둘을 합친 단축형이 우리가 늘 쓰는 mergeMap/switchMap/concatMap/exhaustMap입니다. 즉 — switchMap은 map(...) + switchAll의 단축입니다.
concatMap vs mergeMap vs switchMap vs exhaustMap
#
이름이 비슷하니까 익숙해지기 전까진 헷갈립니다. 동시성을 어떻게 다룰지로 구분하면 명확해집니다.
| operator | 새 값이 올 때 | 사용 패턴 |
|---|---|---|
concatMap | 줄 세워 순서대로 처리 | 순서가 중요한 작업 (저장 큐, 트랜잭션) |
mergeMap | 병렬로 동시에 진행 | 순서 무관 병렬 요청 (벌크 업로드) |
switchMap | 이전 것 취소, 새 것만 | 검색 자동완성, 라우트 파라미터 변화 |
exhaustMap | 진행 중이면 무시 | 폼 submit 더블 클릭 방지, 로그인 버튼 |
특히 exhaustMap은 의외로 자주 만나는 상황을 가장 깔끔하게 풉니다. 사용자가 로그인 버튼을 빠르게 두 번 눌렀을 때, 두 번째 클릭은 그냥 버려야 맞습니다.
import { Component, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Subject, exhaustMap } from 'rxjs';
import { AuthService } from './auth.service';
@Component({ /* ... */ })
export class LoginComponent {
private auth = inject(AuthService);
private submit$ = new Subject<Credentials>();
constructor() {
this.submit$
.pipe(
exhaustMap(creds => this.auth.login(creds)),
takeUntilDestroyed(),
)
.subscribe(user => { /* 라우팅 등 */ });
}
onSubmit(creds: Credentials) {
this.submit$.next(creds);
}
}mergeMap이었다면 두 번 로그인이 날아갑니다. switchMap이었다면 첫 번째 요청이 응답 직전에 취소돼 사용자 경험이 어색해집니다. exhaustMap은 “이미 보내고 있으니까 너는 무시"가 됩니다.
switchMap. 모두 보내야 하는가 → mergeMap. 순서가 중요한가 → concatMap. 중복 발사를 막고 싶은가 → exhaustMap. 헷갈릴 때 이 네 줄이 90%를 가립니다.커스텀 operator 만들기 #
같은 파이프라인 조각을 여러 컴포넌트에서 반복하고 있다면, 함수로 빼낼 때입니다. RxJS의 operator는 결국 Observable<T>를 받아 Observable<R>을 돌려주는 함수일 뿐입니다. 타입 이름도 그대로입니다 — OperatorFunction<T, R>.
검색 패턴(debounce → distinct → trim → 길이 필터)을 매번 적는다고 해봅시다.
import { OperatorFunction, pipe } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, filter } from 'rxjs';
export function searchInput(
debounceMs = 300,
minLength = 2,
): OperatorFunction<string, string> {
return pipe(
debounceTime(debounceMs),
map(q => q.trim()),
distinctUntilChanged(),
filter(q => q.length >= minLength),
);
}pipe()는 함수 합성 도구입니다. operator들을 받아 하나의 operator로 묶어 줍니다. 사용처는 단순해집니다.
results = toSignal(
this.query.valueChanges.pipe(
searchInput(300, 2),
switchMap(q => this.userService.search(q)),
),
{ initialValue: [] },
);타입 추론도 그대로 흐릅니다. string → string이라는 시그니처가 명시되어 있으니, 다음 단계의 switchMap에서 q는 자연스레 string으로 잡힙니다. 팀에 RxJS가 익숙한 사람이 적어질수록 이런 의미 있는 이름의 operator가 코드 가독성을 크게 끌어올립니다.
Scheduler — 비동기의 시간 제어 #
RxJS의 거의 모든 operator는 두 번째 인자로 Scheduler를 받을 수 있습니다. Scheduler는 “이 작업을 언제, 어떤 큐에 올려 실행할까"를 결정하는 객체입니다. 아무것도 지정하지 않으면 RxJS가 동기적/마이크로태스크 기반으로 알아서 굴립니다.
자주 쓰는 Scheduler는 다섯 가지입니다.
asyncScheduler—setTimeout기반. 매크로태스크.asapScheduler—Promise.resolve()기반. 마이크로태스크.queueScheduler— 동기, FIFO 큐. 재귀를 평탄화.animationFrameScheduler—requestAnimationFrame기반. 60fps에 맞춰 움직이는 흐름에.VirtualTimeScheduler— 테스트용. 시간을 가짜로 흘립니다.
실전 예시 하나. 1만 개짜리 데이터를 한 번에 그리면 메인 스레드가 멈춰버리니, 프레임 단위로 끊어 점진 렌더하고 싶습니다.
import { from, observeOn, animationFrameScheduler, bufferCount } from 'rxjs';
export function renderInChunks<T>(items: T[], chunkSize = 100) {
return from(items).pipe(
bufferCount(chunkSize),
observeOn(animationFrameScheduler),
);
}bufferCount로 100개씩 묶고, observeOn이 그 묶음의 emit을 매 애니메이션 프레임으로 옮깁니다. UI는 부드럽게 채워지고 메인 스레드는 한가해집니다.
에러 처리 패턴 #
기초에서 catchError만 짧게 다뤘습니다. 실전에서는 재시도 정책이 거의 항상 같이 갑니다. 옛날 코드에서는 retryWhen을 자주 봤지만, RxJS 7부터 retry({ count, delay }) 형태가 표준입니다. retryWhen은 deprecated에 가깝습니다.
import { Injectable, inject } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, retry, timer, catchError, throwError } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class ApiService {
private http = inject(HttpClient);
get<T>(url: string): Observable<T> {
return this.http.get<T>(url).pipe(
retry({
count: 3,
delay: (err: HttpErrorResponse, attempt) => {
// 4xx는 재시도해봐야 소용없으니 즉시 포기
if (err.status >= 400 && err.status < 500) {
return throwError(() => err);
}
// 5xx와 네트워크 오류는 지수 백오프
return timer(2 ** attempt * 500);
},
resetOnSuccess: true,
}),
catchError(err => {
// 모든 재시도 실패 후의 최종 폴백
console.error('API 실패', err);
return throwError(() => err);
}),
);
}
}핵심은 delay 콜백입니다. 에러 종류에 따라 재시도 여부를 다르게 두고 싶을 때 이 패턴을 그대로 가져다 씁니다. resetOnSuccess: true는 한 번 성공하면 카운터를 리셋하니, 장기 구독에서도 안전합니다.
멀티캐스팅 — 한 번 흘리고 여러 곳에서 받기 #
기본 Observable은 cold입니다. 구독할 때마다 처음부터 다시 실행됩니다. HTTP 요청이라면 — 같은 데이터를 두 컴포넌트가 구독했을 때 요청이 두 번 날아갑니다. 의도한 게 아니라면 멀티캐스팅으로 막아야 합니다.
shareReplay가 가장 자주 쓰는 도구입니다.
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { shareReplay } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class ConfigService {
private http = inject(HttpClient);
config$ = this.http.get<AppConfig>('/api/config').pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
}bufferSize: 1— 마지막 1개의 값을 기억해뒀다가, 늦게 구독한 사람에게도 즉시 흘려줍니다.refCount: true— 구독자가 0이 되면 내부 구독을 정리합니다. 이게 없으면 한 번 살아난 흐름이 영영 메모리에 남습니다.
refCount를 끄면(기본값) Observable은 한 번 시작된 후 절대 멈추지 않습니다. 거의 항상 refCount: true로 켜두세요. 끄는 게 의도일 때만 끄는 겁니다.
share()는 shareReplay보다 가벼운 형제입니다. 값을 기억하지 않고 흘리는 시점부터의 값만 공유합니다. 이벤트 스트림에 적합합니다.
Subject 변형 — ReplaySubject와 AsyncSubject
#
중급에서 Subject와 BehaviorSubject만 다뤘는데, 두 변형이 더 있습니다.
ReplaySubject — 최근 N개의 값을 기억합니다. BehaviorSubject가 1개만 기억하는 거라면, ReplaySubject는 N개를 기억하는 셈입니다. 초기값이 없어도 됩니다.
const log$ = new ReplaySubject<string>(5);
log$.next('a');
log$.next('b');
log$.next('c');
// 새 구독자도 a, b, c를 모두 받음
log$.subscribe(console.log);세션 로그, 최근 알림 N개, 채팅 히스토리 같은 데 어울립니다.
AsyncSubject — complete되는 순간의 마지막 값 하나만 흘립니다. 완료 전에는 아무것도 흘리지 않습니다. 한 번에 결과 하나만 의미가 있는 작업 — 예를 들어 큰 계산의 최종 결과 — 에 적합한데, 사실 그런 케이스는 Promise나 단발 Observable로 풀리는 일이 많아 빈도는 가장 낮습니다. “이런 게 있다” 정도로 알아두면 됩니다.
Marble testing — 시간을 가짜로 #
비동기 흐름은 테스트하기가 까다롭습니다. setTimeout을 진짜로 기다리면 테스트가 느려지고, 타이밍이 맞물리는 버그는 재현조차 어렵습니다. TestScheduler를 쓰면 시간을 가상으로 흘릴 수 있습니다. RxJS 진영에서는 이걸 marble diagram 문법과 함께 씁니다.
import { TestScheduler } from 'rxjs/testing';
import { searchInput } from './search-input';
describe('searchInput', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('debounce 후 trim,distinct,minLength 통과', () => {
scheduler.run(({ cold, expectObservable }) => {
// ' a ' (0ms) → 'ab' (10ms) → 'ab' (20ms) → 'ab ' (40ms)
const input$ = cold('a-b-b--c|', { a: ' a ', b: 'ab', c: 'ab ' });
const expected = '----------- 290ms b|';
expectObservable(input$.pipe(searchInput(300, 2))).toBe(expected, { b: 'ab' });
});
});
});-는 1프레임(가상의 1ms), 글자는 emit, |은 complete를 뜻합니다. 진짜로 300ms를 기다리지 않고도 debounce 동작을 검증할 수 있습니다. 상세 문법은 처음엔 낯설어도, 한 파일 통째로 한 번 짜보면 빠르게 익숙해집니다.
cold는 구독 시점부터 흐르는 Observable이고, hot은 이미 흐르고 있는 Observable입니다. HTTP 응답 같은 cold 스트림에는 cold, 마우스 이벤트나 WebSocket 같은 hot 스트림에는 hot을 씁니다.
흔한 함정 #
오래 쓰다 보면 마주치는 발자국 몇 가지를 정리해두겠습니다.
1. subscribe 안에 subscribe — operator로 풀어야 할 흐름을 콜백 중첩으로 짜는 패턴입니다. 거의 항상 switchMap이나 mergeMap으로 평탄화됩니다.
// 안 좋은 예
this.userId$.subscribe(id => {
this.api.getUser(id).subscribe(user => {
this.user = user;
});
});
// 좋은 예
this.user$ = this.userId$.pipe(switchMap(id => this.api.getUser(id)));2. switchMap 안의 race — switchMap은 이전 안쪽 Observable을 취소하지만, 이전 effect의 부수 효과는 취소하지 않습니다. 캐시에 쓰거나 전역 상태를 건드리는 작업이 안쪽에 있다면, 취소된 흐름이 만든 부수 효과가 그대로 남을 수 있습니다.
3. cold/hot 혼동 — http.get()은 cold라서 subscribe마다 요청이 새로 나갑니다. 두 곳에서 구독하면 호출이 두 번 갑니다. 이걸 모르면 “왜 API가 두 번 불러요?” 하고 한참 헤맵니다. shareReplay({ bufferSize: 1, refCount: true })로 hot으로 만들거나, 서비스에서 한 번만 구독한 결과를 BehaviorSubject로 뿌려주면 됩니다.
4. takeUntilDestroyed 누락 — 직접 subscribe하는 곳에서 takeUntilDestroyed를 깜빡하면 메모리 누수입니다. 코드 리뷰 체크리스트에 한 줄 적어두세요.
subscribe를 거의 쓰지 않게 됩니다. 템플릿에서는 async pipe나 toSignal이 자동 구독,해제를 맡고, 컴포넌트 클래스에서는 effect나 toObservable로 시그널과 잇습니다. subscribe가 늘어난다면 “operator로 풀 수 있는 게 있지 않나?” 한 번 의심해보세요.마무리 #
이번 글에서는 RxJS 심화 주제를 정리했습니다.
- Higher-order Observable과 4가지 flattening —
mergeAll/switchAll/concatAll/exhaustAll - 동시성 결정 기준:
concatMap(순서) /mergeMap(병렬) /switchMap(마지막) /exhaustMap(중복 무시) - 커스텀 operator는
OperatorFunction<T, R>을 돌려주는 함수.pipe로 합성 - Scheduler로 시간 제어 —
animationFrameScheduler로 점진 렌더,VirtualTimeScheduler로 테스트 retry({ count, delay })가 표준.retryWhen은 더 안 씁니다- 멀티캐스팅은
shareReplay({ bufferSize, refCount: true }).refCount는 거의 항상 켜기 ReplaySubject로 최근 N개,AsyncSubject는 거의 안 씀TestScheduler와 marble 문법으로 가상 시간 테스트- 함정: subscribe 중첩, switchMap race, cold/hot 혼동,
takeUntilDestroyed누락
다음 글은 **“앵귤러 고급 강좌 #5 NgRx 입문”**입니다. 컴포넌트 트리를 넘어서는 전역 상태 — Store, Action, Reducer, Selector, Effects — 의 구조와, “정말 NgRx가 필요한가?“라는 질문을 함께 다루겠습니다.