Angular Intermediate #3: RxJS Basics — Observable and Operators

8 min read

In Basics #7, while covering HttpClient, we briefly met Observable. We brushed past it with “the result comes as an Observable, so just subscribe or convert to a signal with toSignal” and moved on. RxJS actually underpins half of Angular. In this post we take the deeper look at RxJS we’ve been putting off.

What is an Observable #

An Observable is a stream of values laid out across time. Not just one value — many values can flow through it in sequence, and they only flow while someone is subscribed.

The contrast with Promise makes the difference clear.

PromiseObservable
Number of valuesExactly oneZero or more (many possible)
When it runsImmediately (at creation)Lazily (at subscription)
CancellationNot possibleCancel by unsubscribing
Operators.then chainingpipe() with many composable operators

If a value comes once and that’s it, a Promise is enough; but for data that flows over time — user input events, WebSocket messages — Observable is more natural. An HttpClient GET request also resolves once, but Angular standardized on Observable across the board for consistency.

src/app/observable-demo.ts
import { Observable } from 'rxjs';

const numbers$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

numbers$.subscribe({
  next: v => console.log('value:', v),
  complete: () => console.log('done'),
});
// value: 1
// value: 2
// value: 3
// done

Nothing happens until subscribe is called. Only at that moment does the function body run and values start flowing. Think of Observable as “a deferred pipeline of values”.

Note
You’ll often see the convention of suffixing variable names with $. It’s a marker that says “this is an Observable.” Not enforced, but widely used in the Angular community.

subscribe and unsubscribe — where memory leaks live #

An Observable lives as long as it’s subscribed. If a component disappears but the subscription isn’t released, the stream stays alive and creates a memory leak. That’s the issue we mentioned briefly in Basics #7.

If you want to clean it up by hand, call unsubscribe() on the Subscription returned by subscribe.

bad — managing it yourself
private sub?: Subscription;

ngOnInit() {
  this.sub = interval(1000).subscribe(v => console.log(v));
}

ngOnDestroy() {
  this.sub?.unsubscribe();
}

The problem is that tracking a Subscription variable and wiring up ngOnDestroy for every component gets messy fast. takeUntilDestroyed, introduced in Angular 16, finishes the job in one line.

src/app/clock.component.ts
import { Component, inject, DestroyRef } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { interval } from 'rxjs';

@Component({ /* ... */ })
export class ClockComponent {
  private destroyRef = inject(DestroyRef);

  constructor() {
    interval(1000)
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe(v => console.log(v));
  }
}

When the component is destroyed, takeUntilDestroyed cuts the stream automatically. No ngOnDestroy, no extra variable.

Tip
In new code, if you have to call subscribe directly, reflexively pair it with takeUntilDestroyed. The most common source of memory leaks just disappears.

Core operators (1) — transformation #

Operators are processing steps you slot into the Observable’s flow. You line them up inside pipe(). Start with these three.

map — turns a value into another value.

of(1, 2, 3).pipe(map(n => n * 10)).subscribe(console.log);
// 10, 20, 30

filter — only lets matching values through.

of(1, 2, 3, 4).pipe(filter(n => n % 2 === 0)).subscribe(console.log);
// 2, 4

tap — peeks at values without changing them. Used for debugging or side effects.

of(1, 2, 3).pipe(
  tap(v => console.log('saw:', v)),
  map(n => n * 2),
).subscribe();

The shape matches the array map and filter you already know. The difference is that RxJS operates on arrays in time.

Core operators (2) — combination #

These operators combine Observables, or swap one Observable’s values for the values of another. They’re the most-used operators in real apps.

switchMap — when a new value arrives, cancel the previous inner Observable and switch to the new one. A perfect fit for “cancel the previous API call whenever the search term changes.”

mergeMap — when a new value arrives, run the inner Observables in parallel without canceling. Use it when you need every result.

combineLatest — combines the latest values from multiple Observables into one bundle. Whenever any of them emits, a new bundle flows out.

Search input → API call is the classic example that shows RxJS at its best.

src/app/search.component.ts
import { Component, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { toSignal } from '@angular/core/rxjs-interop';
import { debounceTime, distinctUntilChanged, switchMap, startWith } from 'rxjs';
import { UserService } from './user.service';

@Component({
  selector: 'app-search',
  standalone: true,
  imports: [ReactiveFormsModule],
  template: `
    <input [formControl]="query" placeholder="Type to search" />
    @for (user of results(); track user.id) {
      <div>{{ user.name }}</div>
    }
  `,
})
export class SearchComponent {
  private userService = inject(UserService);
  query = new FormControl('', { nonNullable: true });

  results = toSignal(
    this.query.valueChanges.pipe(
      startWith(''),
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(q => this.userService.search(q)),
    ),
    { initialValue: [] },
  );
}

In plain words:

  1. Each keystroke pushes a new value through valueChanges.
  2. debounceTime(300) waits for 300ms of quiet to coalesce rapid typing.
  3. distinctUntilChanged filters out repeated identical queries.
  4. switchMap calls the API with the new query and automatically cancels the previous call.
  5. toSignal converts the result to a signal and pushes it into the template.

Writing this imperatively means setTimeout, AbortController, and four or five state variables. With operators, it’s a 7-line pipeline.

Note
Pick switchMap vs mergeMap wrong and you’ll get scrambled results or older requests overwriting newer ones. For search/autocomplete where “only the latest matters,” switchMap. When you need to collect every result, mergeMap.

Subject and BehaviorSubject #

Until now, the Observables we’ve used were streams someone else built and we just consumed. When you want to push values yourself, that’s Subject. A Subject is both an Observable and an Observer, so you can call next() to emit values.

src/app/event-bus.ts
import { Injectable } from '@angular/core';
import { Subject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class EventBus {
  private events$ = new Subject<string>();
  on$ = this.events$.asObservable();

  emit(event: string) {
    this.events$.next(event);
  }
}

Other components subscribe to bus.on$ and get notified each time bus.emit('login') is called.

BehaviorSubject adds two things on top.

  1. An initial value is required.
  2. It always remembers the current value. A new subscriber immediately gets the most recent value.

It’s a great fit as a state store.

src/app/auth.service.ts
import { BehaviorSubject } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class AuthService {
  private user$$ = new BehaviorSubject<User | null>(null);
  user$ = this.user$$.asObservable();

  login(user: User) { this.user$$.next(user); }
  logout()          { this.user$$.next(null); }
  get current()     { return this.user$$.value; }
}

With a plain Subject, a late subscriber wouldn’t know the current login state. BehaviorSubject remembers the last value and replays it instantly.

Tip
Don’t expose a Subject variable directly. Keep it private under a convention like _$$, and expose a read-only Observable via asObservable(). That way nobody can call next() from outside.

Combining with HttpClient #

Operators show their value when paired with HttpClient. Let’s wrap search, filtering, and retry into one pipe.

src/app/user.service.ts
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, retry, timer, catchError, of } from 'rxjs';

@Injectable({ providedIn: 'root' })
export class UserService {
  private http = inject(HttpClient);

  search(q: string): Observable<User[]> {
    return this.http.get<User[]>('/api/users', { params: { q } }).pipe(
      retry({ count: 2, delay: (_, i) => timer(500 * (i + 1)) }),
      catchError(() => of([])),
    );
  }
}
  • retry retries failed requests up to two times. The delay gives you backoff (500ms, 1000ms).
  • If it still fails, catchError falls back to an empty array. The UI naturally shows “no results.”

Writing this imperatively would mean nested try/catch and a counter variable. Two operators handle it.

async pipe and toSignal #

There are two standard ways to consume Observables from a template.

async pipe is the long-time standard. The template subscribes automatically and unsubscribes when the component is destroyed.

async pipe
@Component({
  imports: [AsyncPipe],
  template: `
    @if (users$ | async; as list) {
      @for (u of list; track u.id) { <div>{{ u.name }}</div> }
    }
  `,
})
export class UserListComponent {
  private userService = inject(UserService);
  users$ = this.userService.getUsers();
}

toSignal is the new standard from Angular 16+. It converts an Observable into a signal, so in the template you just call it like any other signal.

toSignal
users = toSignal(this.userService.getUsers(), { initialValue: [] });
// template: {{ users() }}

For new code, prefer toSignal. It plays nicely with signal-based change detection and composes naturally with computed. That said, plenty of libraries and example code still use async pipe, so know how to read both.

Signals vs Observables — when to use which #

Back in Basics #1 we said “the era of reaching for RxJS or Signals depending on the situation.” Here’s the short guide.

  • Signals: simple component state, synchronous values, values derived via computed. Overwhelmingly comfortable for holding UI state.
  • Observables: anything async with time involved — HTTP responses, user input streams, WebSocket, router events. If you need transformation, combination, or retry via operators, that’s Observable territory.

When the line is fuzzy, if the data source is an async stream, treat it as an Observable, and at the rendering boundary convert to a signal with toSignal. The two worlds connect naturally.

Wrapping up #

In this post we covered the basics of RxJS.

  • Observable is a stream of values across time. Unlike Promise, it can carry many values, defers execution, and is cancelable
  • subscribe is where memory leaks live. One-line takeUntilDestroyed solves it
  • Transformation operators: map, filter, tap
  • Combination operators: switchMap, mergeMap, combineLatest — search, parallel, combination
  • Subject to push values, BehaviorSubject to keep current state
  • Combine with HttpClient for search, retry, and fallback in one pipe
  • In templates, use async pipe or toSignal. Prefer toSignal in new code
  • Simple state → signal, async stream → Observable. Bridge with toSignal at the boundary

The next post is “Angular Intermediate #4 Lifecycle Hooks.” We go beyond OnInit — input changes, before and after change detection, view init — plus the OnPush strategy and how signals simplify the lifecycle.

X