Angular Intermediate #3: RxJS Basics — Observable and Operators
In Basics #7, while covering HttpClient, we briefly met Observable. We brushed past it with “the result comes as an Observable, so just subscribe or convert to a signal with toSignal” and moved on. RxJS actually underpins half of Angular. In this post we take the deeper look at RxJS we’ve been putting off.
What is an Observable #
An Observable is a stream of values laid out across time. Not just one value — many values can flow through it in sequence, and they only flow while someone is subscribed.
The contrast with Promise makes the difference clear.
| Promise | Observable | |
|---|---|---|
| Number of values | Exactly one | Zero or more (many possible) |
| When it runs | Immediately (at creation) | Lazily (at subscription) |
| Cancellation | Not possible | Cancel by unsubscribing |
| Operators | .then chaining | pipe() with many composable operators |
If a value comes once and that’s it, a Promise is enough; but for data that flows over time — user input events, WebSocket messages — Observable is more natural. An HttpClient GET request also resolves once, but Angular standardized on Observable across the board for consistency.
import { Observable } from 'rxjs';
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
numbers$.subscribe({
next: v => console.log('value:', v),
complete: () => console.log('done'),
});
// value: 1
// value: 2
// value: 3
// done
Nothing happens until subscribe is called. Only at that moment does the function body run and values start flowing. Think of Observable as “a deferred pipeline of values”.
$. It’s a marker that says “this is an Observable.” Not enforced, but widely used in the Angular community.subscribe and unsubscribe — where memory leaks live #
An Observable lives as long as it’s subscribed. If a component disappears but the subscription isn’t released, the stream stays alive and creates a memory leak. That’s the issue we mentioned briefly in Basics #7.
If you want to clean it up by hand, call unsubscribe() on the Subscription returned by subscribe.
private sub?: Subscription;
ngOnInit() {
this.sub = interval(1000).subscribe(v => console.log(v));
}
ngOnDestroy() {
this.sub?.unsubscribe();
}The problem is that tracking a Subscription variable and wiring up ngOnDestroy for every component gets messy fast. takeUntilDestroyed, introduced in Angular 16, finishes the job in one line.
import { Component, inject, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';
@Component({ /* ... */ })
export class ClockComponent {
private destroyRef = inject(DestroyRef);
constructor() {
interval(1000)
.pipe(takeUntilDestroyed(this.destroyRef))
.subscribe(v => console.log(v));
}
}When the component is destroyed, takeUntilDestroyed cuts the stream automatically. No ngOnDestroy, no extra variable.
subscribe directly, reflexively pair it with takeUntilDestroyed. The most common source of memory leaks just disappears.Core operators (1) — transformation #
Operators are processing steps you slot into the Observable’s flow. You line them up inside pipe(). Start with these three.
map — turns a value into another value.
of(1, 2, 3).pipe(map(n => n * 10)).subscribe(console.log);
// 10, 20, 30
filter — only lets matching values through.
of(1, 2, 3, 4).pipe(filter(n => n % 2 === 0)).subscribe(console.log);
// 2, 4
tap — peeks at values without changing them. Used for debugging or side effects.
of(1, 2, 3).pipe(
tap(v => console.log('saw:', v)),
map(n => n * 2),
).subscribe();The shape matches the array map and filter you already know. The difference is that RxJS operates on arrays in time.
Core operators (2) — combination #
These operators combine Observables, or swap one Observable’s values for the values of another. They’re the most-used operators in real apps.
switchMap — when a new value arrives, cancel the previous inner Observable and switch to the new one. A perfect fit for “cancel the previous API call whenever the search term changes.”
mergeMap — when a new value arrives, run the inner Observables in parallel without canceling. Use it when you need every result.
combineLatest — combines the latest values from multiple Observables into one bundle. Whenever any of them emits, a new bundle flows out.
Search input → API call is the classic example that shows RxJS at its best.
import { Component, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { toSignal } from '@angular/core/rxjs-interop';
import { debounceTime, distinctUntilChanged, switchMap, startWith } from 'rxjs';
import { UserService } from './user.service';
@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule],
template: `
<input [formControl]="query" placeholder="Type to search" />
@for (user of results(); track user.id) {
<div>{{ user.name }}</div>
}
`,
})
export class SearchComponent {
private userService = inject(UserService);
query = new FormControl('', { nonNullable: true });
results = toSignal(
this.query.valueChanges.pipe(
startWith(''),
debounceTime(300),
distinctUntilChanged(),
switchMap(q => this.userService.search(q)),
),
{ initialValue: [] },
);
}In plain words:
- Each keystroke pushes a new value through
valueChanges. debounceTime(300)waits for 300ms of quiet to coalesce rapid typing.distinctUntilChangedfilters out repeated identical queries.switchMapcalls the API with the new query and automatically cancels the previous call.toSignalconverts the result to a signal and pushes it into the template.
Writing this imperatively means setTimeout, AbortController, and four or five state variables. With operators, it’s a 7-line pipeline.
switchMap vs mergeMap wrong and you’ll get scrambled results or older requests overwriting newer ones. For search/autocomplete where “only the latest matters,” switchMap. When you need to collect every result, mergeMap.Subject and BehaviorSubject #
Until now, the Observables we’ve used were streams someone else built and we just consumed. When you want to push values yourself, that’s Subject. A Subject is both an Observable and an Observer, so you can call next() to emit values.
import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class EventBus {
private events$ = new Subject<string>();
on$ = this.events$.asObservable();
emit(event: string) {
this.events$.next(event);
}
}Other components subscribe to bus.on$ and get notified each time bus.emit('login') is called.
BehaviorSubject adds two things on top.
- An initial value is required.
- It always remembers the current value. A new subscriber immediately gets the most recent value.
It’s a great fit as a state store.
import { BehaviorSubject } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class AuthService {
private user$$ = new BehaviorSubject<User | null>(null);
user$ = this.user$$.asObservable();
login(user: User) { this.user$$.next(user); }
logout() { this.user$$.next(null); }
get current() { return this.user$$.value; }
}With a plain Subject, a late subscriber wouldn’t know the current login state. BehaviorSubject remembers the last value and replays it instantly.
_$$, and expose a read-only Observable via asObservable(). That way nobody can call next() from outside.Combining with HttpClient #
Operators show their value when paired with HttpClient. Let’s wrap search, filtering, and retry into one pipe.
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, retry, timer, catchError, of } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class UserService {
private http = inject(HttpClient);
search(q: string): Observable<User[]> {
return this.http.get<User[]>('/api/users', { params: { q } }).pipe(
retry({ count: 2, delay: (_, i) => timer(500 * (i + 1)) }),
catchError(() => of([])),
);
}
}retryretries failed requests up to two times. Thedelaygives you backoff (500ms, 1000ms).- If it still fails,
catchErrorfalls back to an empty array. The UI naturally shows “no results.”
Writing this imperatively would mean nested try/catch and a counter variable. Two operators handle it.
async pipe and toSignal
#
There are two standard ways to consume Observables from a template.
async pipe is the long-time standard. The template subscribes automatically and unsubscribes when the component is destroyed.
@Component({
imports: [AsyncPipe],
template: `
@if (users$ | async; as list) {
@for (u of list; track u.id) { <div>{{ u.name }}</div> }
}
`,
})
export class UserListComponent {
private userService = inject(UserService);
users$ = this.userService.getUsers();
}toSignal is the new standard from Angular 16+. It converts an Observable into a signal, so in the template you just call it like any other signal.
users = toSignal(this.userService.getUsers(), { initialValue: [] });
// template: {{ users() }}
For new code, prefer toSignal. It plays nicely with signal-based change detection and composes naturally with computed. That said, plenty of libraries and example code still use async pipe, so know how to read both.
Signals vs Observables — when to use which #
Back in Basics #1 we said “the era of reaching for RxJS or Signals depending on the situation.” Here’s the short guide.
- Signals: simple component state, synchronous values, values derived via
computed. Overwhelmingly comfortable for holding UI state. - Observables: anything async with time involved — HTTP responses, user input streams, WebSocket, router events. If you need transformation, combination, or retry via operators, that’s Observable territory.
When the line is fuzzy, if the data source is an async stream, treat it as an Observable, and at the rendering boundary convert to a signal with toSignal. The two worlds connect naturally.
Wrapping up #
In this post we covered the basics of RxJS.
- Observable is a stream of values across time. Unlike Promise, it can carry many values, defers execution, and is cancelable
subscribeis where memory leaks live. One-linetakeUntilDestroyedsolves it- Transformation operators:
map,filter,tap - Combination operators:
switchMap,mergeMap,combineLatest— search, parallel, combination Subjectto push values,BehaviorSubjectto keep current state- Combine with HttpClient for search, retry, and fallback in one pipe
- In templates, use
async pipeortoSignal. PrefertoSignalin new code - Simple state → signal, async stream → Observable. Bridge with
toSignalat the boundary
The next post is “Angular Intermediate #4 Lifecycle Hooks.” We go beyond OnInit — input changes, before and after change detection, view init — plus the OnPush strategy and how signals simplify the lifecycle.