앵귤러 중급 강좌 #3 RxJS 기초 — Observable과 Operator
기초 #7에서 HttpClient를 다루며 Observable을 슬쩍 만났습니다. 그때는 “결과가 Observable로 오니까 subscribe하거나 toSignal로 변환해서 쓰자” 정도로 넘어갔습니다. 사실 RxJS는 앵귤러의 절반을 떠받치는 도구입니다. 이번 글에서는 미뤄뒀던 RxJS의 핵심 개념을 본격적으로 들여다보겠습니다.
Observable이란 #
Observable은 시간 위에 늘어선 값의 흐름입니다. 한 번에 하나가 아니라 여러 개의 값이 차례로 흘러올 수 있고, 흘러온 값은 누군가 구독하고 있을 때만 흘러갑니다.
Promise와 비교하면 차이가 또렷해집니다.
| Promise | Observable | |
|---|---|---|
| 값의 개수 | 단 하나 | 0개 이상 (여러 개 가능) |
| 실행 시점 | 즉시(생성과 동시에) | 지연(구독 시점에) |
| 취소 | 불가 | 구독 해제로 취소 가능 |
| 연산자 | .then 체이닝 | pipe()로 다양한 operator 조합 |
값이 한 번만 나오면 Promise로 충분하지만, 사용자의 입력 이벤트나 WebSocket 메시지처럼 시간을 따라 계속 흘러오는 데이터는 Observable이 더 자연스럽습니다. HttpClient의 GET 요청도 결과가 한 번 오고 끝이지만, 앵귤러는 일관성을 위해 모든 비동기를 Observable로 통일했습니다.
import { Observable } from 'rxjs';
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
numbers$.subscribe({
next: v => console.log('값:', v),
complete: () => console.log('끝'),
});
// 값: 1
// 값: 2
// 값: 3
// 끝
subscribe를 호출하기 전까지는 아무 일도 일어나지 않습니다. 호출하는 순간에야 비로소 함수 본문이 실행되고 값이 흘러나옵니다. **Observable은 “지연된 값의 파이프라인”**이라고 생각하면 편합니다.
$를 붙이는 컨벤션을 자주 보게 됩니다. “이건 Observable이다"라는 표시입니다. 강제는 아니지만 앵귤러 커뮤니티에서 널리 통용되는 관습입니다.subscribe와 unsubscribe — 메모리 누수의 출처 #
Observable은 구독한 만큼 살아 있습니다. 컴포넌트가 사라져도 구독을 풀지 않으면 흐름이 그대로 살아남아 메모리 누수가 생깁니다. 기초 #7에서 짧게 언급했던 그 문제입니다.
직접 풀고 싶다면 subscribe가 돌려준 Subscription에 unsubscribe()를 호출하면 됩니다.
private sub?: Subscription;
ngOnInit() {
this.sub = interval(1000).subscribe(v => console.log(v));
}
ngOnDestroy() {
this.sub?.unsubscribe();
}문제는 컴포넌트마다 Subscription 변수와 ngOnDestroy를 챙기는 게 금방 지저분해진다는 점입니다. 앵귤러 16부터 들어온 takeUntilDestroyed가 이 일을 한 줄로 끝내줍니다.
import { Component, inject, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';
@Component({ /* ... */ })
export class ClockComponent {
private destroyRef = inject(DestroyRef);
constructor() {
interval(1000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(v => console.log(v));
}
}컴포넌트가 사라지면 takeUntilDestroyed가 흐름을 자동으로 끊어줍니다. ngOnDestroy도, 변수도 필요 없습니다.
subscribe해야 한다면, 반사적으로 takeUntilDestroyed를 같이 적으세요. 메모리 누수의 가장 흔한 출처가 사라집니다.핵심 operator (1) — 변환 #
operator는 Observable의 흐름 위에 끼워 넣는 가공 단계입니다. pipe() 안에 줄 세워 적용합니다. 가장 먼저 익혀야 할 세 가지부터 보겠습니다.
map — 값을 다른 값으로 바꿉니다.
of(1, 2, 3).pipe(map(n => n * 10)).subscribe(console.log);
// 10, 20, 30
filter — 조건에 맞는 값만 통과시킵니다.
of(1, 2, 3, 4).pipe(filter(n => n % 2 === 0)).subscribe(console.log);
// 2, 4
tap — 값을 바꾸지 않고 곁눈질만 합니다. 디버깅이나 부수 효과에 씁니다.
of(1, 2, 3).pipe(
tap(v => console.log('흘러간 값:', v)),
map(n => n * 2),
).subscribe();배열의 map, filter와 모양이 같습니다. 다만 RxJS는 시간 위의 배열에 대해 동작한다는 점이 다릅니다.
핵심 operator (2) — 결합 #
여러 Observable을 합치거나 한 Observable의 값을 다른 Observable로 바꿔치기하는 operator들입니다. 실전에서 가장 자주 쓰입니다.
switchMap — 새 값이 오면 이전 내부 Observable을 취소하고 새 것으로 바꿉니다. 검색어가 바뀔 때마다 이전 API 호출을 버리는 패턴에 딱 맞습니다.
mergeMap — 새 값이 와도 이전 것을 취소하지 않고 병렬로 흘립니다. 모든 결과를 다 받아야 할 때 씁니다.
combineLatest — 여러 Observable의 최신 값을 한 묶음으로 합칩니다. 어느 한쪽이 새 값을 내면 다시 한 묶음이 흘러나옵니다.
검색어 입력에서 API 호출로 이어지는 흐름은 RxJS의 진면목이 드러나는 단골 예제입니다.
import { Component, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { toSignal } from '@angular/core/rxjs-interop';
import { debounceTime, distinctUntilChanged, switchMap, startWith } from 'rxjs';
import { UserService } from './user.service';
@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule],
template: `
<input [formControl]="query" placeholder="검색어 입력" />
@for (user of results(); track user.id) {
<div>{{ user.name }}</div>
}
`,
})
export class SearchComponent {
private userService = inject(UserService);
query = new FormControl('', { nonNullable: true });
results = toSignal(
this.query.valueChanges.pipe(
startWith(''),
debounceTime(300),
distinctUntilChanged(),
switchMap(q => this.userService.search(q)),
),
{ initialValue: [] },
);
}흐름을 한국어로 풀면 이렇습니다.
- 사용자가 입력할 때마다
valueChanges가 새 값을 흘립니다. debounceTime(300)이 300ms 동안 잠잠해지길 기다려 연타를 묶습니다.distinctUntilChanged가 같은 검색어 반복을 거릅니다.switchMap이 새 검색어로 API를 호출하고, 이전 호출은 자동 취소합니다.- 결과를
toSignal이 시그널로 변환해 템플릿에 흘려줍니다.
이걸 명령형으로 짜면 setTimeout, AbortController, 상태 변수 너댓 개가 필요합니다. operator로는 7줄짜리 파이프라인 한 줄에 끝납니다.
switchMap과 mergeMap을 잘못 고르면 결과가 뒤섞이거나 이전 요청이 나중 결과를 덮어쓰는 버그가 생깁니다. 검색,자동완성처럼 “마지막 것만 쓰면 된다” 면 switchMap, 여러 요청 결과를 모두 모아야 한다면 mergeMap입니다.Subject와 BehaviorSubject #
지금까지의 Observable은 누군가 만들어준 흐름을 받아 쓰는 쪽이었습니다. 내가 직접 값을 밀어 넣고 싶을 때(push) 쓰는 게 Subject입니다. Subject는 Observable이면서 동시에 옵저버라서, next()로 값을 흘려보낼 수 있습니다.
import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class EventBus {
private events$ = new Subject<string>();
on$ = this.events$.asObservable();
emit(event: string) {
this.events$.next(event);
}
}다른 컴포넌트에서 bus.on$을 구독하면, bus.emit('login')이 호출될 때마다 알림을 받습니다.
BehaviorSubject는 여기에 두 가지가 더 붙은 변형입니다.
- 초기값이 필수입니다.
- 현재 값을 항상 기억합니다. 새로 구독한 사람도 즉시 가장 최근 값을 받습니다.
상태 저장소로 쓰기에 잘 맞습니다.
import { BehaviorSubject } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class AuthService {
private user$$ = new BehaviorSubject<User | null>(null);
user$ = this.user$$.asObservable();
login(user: User) { this.user$$.next(user); }
logout() { this.user$$.next(null); }
get current() { return this.user$$.value; }
}Subject로 했다면 늦게 구독한 컴포넌트가 현재 로그인 상태를 모릅니다. BehaviorSubject는 마지막 값을 기억하고 있다가 즉시 흘려 줍니다.
_$$ 같은 컨벤션으로 private에 두고, 외부에는 asObservable()로 읽기 전용 Observable만 공개합니다. 그래야 아무 데서나 next()를 호출하지 못합니다.HttpClient와의 결합 #
operator들이 HttpClient와 만나면 진가가 드러납니다. 검색,필터,재시도를 한 파이프에 엮어보겠습니다.
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, retry, timer, catchError, of } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class UserService {
private http = inject(HttpClient);
search(q: string): Observable<User[]> {
return this.http.get<User[]>('/api/users', { params: { q } }).pipe(
retry({ count: 2, delay: (_, i) => timer(500 * (i + 1)) }),
catchError(() => of([])),
);
}
}retry가 실패한 요청을 두 번까지 재시도합니다.delay로 백오프(500ms, 1000ms)를 줬습니다.- 그래도 실패하면
catchError가 빈 배열로 폴백시킵니다. UI는 “결과 없음"으로 자연스럽게 처리됩니다.
이런 흐름을 명령형으로 짜려면 try/catch 중첩과 카운터 변수가 한참 필요한데, operator 두 개로 끝납니다.
async pipe와 toSignal
#
템플릿에서 Observable을 다루는 두 가지 표준 방법이 있습니다.
**async pipe**는 오랜 표준입니다. 템플릿이 자동으로 구독하고, 컴포넌트가 사라지면 자동으로 해제합니다.
@Component({
imports: [AsyncPipe],
template: `
@if (users$ | async; as list) {
@for (u of list; track u.id) { <div>{{ u.name }}</div> }
}
`,
})
export class UserListComponent {
private userService = inject(UserService);
users$ = this.userService.getUsers();
}**toSignal**은 Angular 16+의 새 표준입니다. Observable을 시그널로 변환하니, 템플릿에서는 그냥 시그널처럼 호출하면 됩니다.
users = toSignal(this.userService.getUsers(), { initialValue: [] });
// 템플릿: {{ users() }}
새 코드에서는 toSignal을 우선 고려하세요. 시그널 기반 변경 감지와 잘 맞고, computed로 다른 시그널과 자연스럽게 조합됩니다. 다만 라이브러리,예제 코드 다수가 여전히 async pipe를 쓰니 둘 다 읽을 줄은 알아야 합니다.
시그널 vs Observable — 언제 무엇을 #
기초 #1에서 “RxJS와 Signals를 상황에 따라 골라 쓰는 시대"라고 했습니다. 짧은 가이드를 정리해두겠습니다.
- 시그널: 단순한 컴포넌트 상태, 동기 값,
computed로 파생되는 값. UI 상태를 들고 있을 때 압도적으로 편합니다. - Observable: 시간 흐름이 끼어드는 비동기 — HTTP 응답, 사용자 입력 스트림, WebSocket, 라우터 이벤트. operator로 변환,결합,재시도가 필요하면 Observable의 영역입니다.
경계가 애매할 때는 데이터의 출처가 비동기 스트림이면 Observable로 받고, 화면에 바로 쓸 단계에서 toSignal로 시그널화하면 자연스럽게 두 세계가 이어집니다.
마무리 #
이번 글에서는 RxJS의 기초를 정리했습니다.
- Observable은 시간 위의 값 흐름을 정리합니다. Promise와 달리 여러 값,지연 실행,취소 가능
subscribe는 메모리 누수의 출처.takeUntilDestroyed한 줄로 정리- 변환 operator:
map,filter,tap - 결합 operator:
switchMap,mergeMap,combineLatest— 검색,병렬,조합 Subject로 값을 push,BehaviorSubject로 현재 상태 보관- HttpClient와 결합해 검색,재시도,폴백을 파이프 한 줄로
- 템플릿에서는
async pipe또는toSignal. 새 코드는toSignal을 우선 - 단순 상태는 시그널, 비동기 스트림은 Observable. 경계에선
toSignal로 잇기
다음 글은 **“앵귤러 중급 강좌 #4 라이프사이클 훅”**입니다. OnInit 너머의 라이프사이클 — 입력 변경, 변경 감지 전후, 뷰 초기화 — 와 OnPush 전략, 시그널이 라이프사이클을 어떻게 단순화하는지를 다루겠습니다.