Angular Advanced #4 RxJS in depth — custom operators and Schedulers
In Intermediate #3 RxJS basics we covered the structure of Observables, basic operators like map/filter/switchMap, and Subject/BehaviorSubject. If that’s half of RxJS, this post is the other half. The real shape of higher-order Observables, the four *Map operators that decide concurrency strategy, custom operators you build yourself, controlling time with Schedulers, and marble testing — these are the points where RxJS becomes a real asset in practice.
What is a higher-order Observable #
What happens if an Observable emits another Observable as a value? That’s a higher-order Observable. The shape Observable<Observable<T>>. It looks unfamiliar at first — “why would I need this?” — but we already create them every day.
import { fromEvent, map, Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// click$ is Observable<Event>
const click$ = fromEvent(document, 'click');
// Each click produces an 'ajax' Observable.
// Result type: Observable<Observable<Response>>
const requests$$: Observable<Observable<unknown>> = click$.pipe(
map(() => ajax('/api/me')),
);If you subscribe to requests$$ directly, what comes out is not the response data but the response Observable itself. What we want is to unwrap the inner Observable and get the actual value out. That’s called flattening.
There are four flattening strategies.
mergeAll— Subscribe to every incoming inner Observable concurrently. Use when the order of results doesn’t matter.switchAll— When a new inner Observable arrives, immediately cancel the previous one and watch only the new one.concatAll— One at a time, in order; the next starts only after the previous finishes.exhaustAll— If an inner Observable is in progress, ignore newcomers.
In practice you always pair map + *All, so the shorthand we use every day is mergeMap/switchMap/concatMap/exhaustMap. That is — switchMap is the shorthand for map(...) + switchAll.
concatMap vs mergeMap vs switchMap vs exhaustMap
#
The names are similar, so they’re confusing until you internalize them. Splitting by how they handle concurrency makes it clear.
| operator | When a new value arrives | Pattern |
|---|---|---|
concatMap | Queue and process in order | Order-sensitive work (save queue, transactions) |
mergeMap | Run in parallel, all at once | Order-agnostic parallel requests (bulk uploads) |
switchMap | Cancel the previous, take the new | Search autocomplete, route param changes |
exhaustMap | Ignore new ones while in progress | Form submit double-click guard, login button |
exhaustMap in particular cleanly solves a situation you meet more often than you’d think. When the user clicks the login button twice quickly, the second click should just be discarded.
import { Component, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Subject, exhaustMap } from 'rxjs';
import { AuthService } from './auth.service';
@Component({ /* ... */ })
export class LoginComponent {
private auth = inject(AuthService);
private submit$ = new Subject<Credentials>();
constructor() {
this.submit$
.pipe(
exhaustMap(creds => this.auth.login(creds)),
takeUntilDestroyed(),
)
.subscribe(user => { /* routing, etc. */ });
}
onSubmit(creds: Credentials) {
this.submit$.next(creds);
}
}With mergeMap, two login requests would fly. With switchMap, the first request would get canceled right before the response — awkward UX. exhaustMap says “I’m already sending one, so you’re ignored.”
switchMap. everything must be sent → mergeMap. order matters → concatMap. block duplicate firings → exhaustMap. When you’re stuck, these four lines cover 90%.Building custom operators #
If you find yourself repeating the same pipeline fragment across multiple components, it’s time to extract a function. An RxJS operator is, in the end, just a function that takes Observable<T> and returns Observable<R>. The type name is exactly that — OperatorFunction<T, R>.
Suppose you keep writing the same search pattern (debounce → distinct → trim → length filter).
import { OperatorFunction, pipe } from 'rxjs';
import { debounceTime, distinctUntilChanged, map, filter } from 'rxjs';
export function searchInput(
debounceMs = 300,
minLength = 2,
): OperatorFunction<string, string> {
return pipe(
debounceTime(debounceMs),
map(q => q.trim()),
distinctUntilChanged(),
filter(q => q.length >= minLength),
);
}pipe() is a function-composition tool. It takes operators and bundles them into a single operator. The call sites get simpler.
results = toSignal(
this.query.valueChanges.pipe(
searchInput(300, 2),
switchMap(q => this.userService.search(q)),
),
{ initialValue: [] },
);Type inference flows through. The signature string → string is explicit, so in the next step switchMap, q is naturally typed as string. As fewer team members have deep RxJS knowledge, named operators like this noticeably improve readability.
Scheduler — controlling time in async #
Almost every RxJS operator can take a Scheduler as its second argument. A Scheduler is an object that decides “in which queue and when to run this work.” If you don’t specify one, RxJS schedules synchronously or via microtasks on its own.
The five Schedulers you’ll commonly use:
asyncScheduler— Based onsetTimeout. Macrotasks.asapScheduler— Based onPromise.resolve(). Microtasks.queueScheduler— Synchronous, FIFO queue. Flattens recursion.animationFrameScheduler— Based onrequestAnimationFrame. For flows tied to 60fps.VirtualTimeScheduler— For tests. Runs time virtually.
A practical example. Drawing 10,000 records all at once would freeze the main thread, so you want frame-by-frame progressive rendering.
import { from, observeOn, animationFrameScheduler, bufferCount } from 'rxjs';
export function renderInChunks<T>(items: T[], chunkSize = 100) {
return from(items).pipe(
bufferCount(chunkSize),
observeOn(animationFrameScheduler),
);
}bufferCount groups them in batches of 100, and observeOn shifts each batch’s emit onto an animation frame. The UI fills smoothly and the main thread stays free.
Error handling patterns #
In Basics we briefly covered only catchError. In practice a retry policy almost always comes along. Old code reached for retryWhen, but since RxJS 7 the standard form is retry({ count, delay }). retryWhen is essentially deprecated.
import { Injectable, inject } from '@angular/core';
import { HttpClient, HttpErrorResponse } from '@angular/common/http';
import { Observable, retry, timer, catchError, throwError } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class ApiService {
private http = inject(HttpClient);
get<T>(url: string): Observable<T> {
return this.http.get<T>(url).pipe(
retry({
count: 3,
delay: (err: HttpErrorResponse, attempt) => {
// 4xx isn't worth retrying — give up immediately
if (err.status >= 400 && err.status < 500) {
return throwError(() => err);
}
// 5xx and network errors get exponential backoff
return timer(2 ** attempt * 500);
},
resetOnSuccess: true,
}),
catchError(err => {
// final fallback after every retry fails
console.error('API failed', err);
return throwError(() => err);
}),
);
}
}The point is the delay callback. Use this pattern when you want different retry behavior depending on the error. resetOnSuccess: true resets the counter on a single success, so it’s safe even on long-lived subscriptions.
Multicasting — emit once, receive in many places #
By default an Observable is cold. Every subscribe re-runs from the start. For an HTTP request — if two components subscribe to the same data, the request fires twice. If that’s not what you want, multicasting is the fix.
shareReplay is the most common tool.
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { shareReplay } from 'rxjs';
@Injectable({ providedIn: 'root' })
export class ConfigService {
private http = inject(HttpClient);
config$ = this.http.get<AppConfig>('/api/config').pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
}bufferSize: 1— Remembers the last 1 value and replays it to late subscribers immediately.refCount: true— When subscribers drop to zero, cleans up the inner subscription. Without this, a stream that has come alive once stays in memory forever.
If you turn refCount off (the default), the Observable, once started, never stops. Almost always keep refCount: true. Turn it off only when that’s the intent.
share() is the lighter sibling of shareReplay. It doesn’t remember values; it shares only emissions from the time of sharing onward. It fits event streams.
Subject variants — ReplaySubject and AsyncSubject
#
In Intermediate we covered only Subject and BehaviorSubject; there are two more variants.
ReplaySubject — Remembers the last N values. If BehaviorSubject remembers 1, ReplaySubject remembers N. No initial value required.
const log$ = new ReplaySubject<string>(5);
log$.next('a');
log$.next('b');
log$.next('c');
// new subscribers receive a, b, c
log$.subscribe(console.log);It fits session logs, the latest N notifications, chat history, and so on.
AsyncSubject — Emits only the last value at the moment of complete. Before completion, it emits nothing. It suits tasks where only one final result matters — say, the result of a heavy computation — but those cases are usually solved with a Promise or a single-shot Observable, so it’s the rarest of the four. Knowing it exists is enough.
Marble testing — virtual time #
Async flows are hard to test. Truly waiting for setTimeout makes tests slow, and timing-related bugs are even hard to reproduce. With TestScheduler, you can run time virtually. The RxJS scene uses it together with marble diagram syntax.
import { TestScheduler } from 'rxjs/testing';
import { searchInput } from './search-input';
describe('searchInput', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('passes trim,distinct,minLength after debounce', () => {
scheduler.run(({ cold, expectObservable }) => {
// ' a ' (0ms) → 'ab' (10ms) → 'ab' (20ms) → 'ab ' (40ms)
const input$ = cold('a-b-b--c|', { a: ' a ', b: 'ab', c: 'ab ' });
const expected = '----------- 290ms b|';
expectObservable(input$.pipe(searchInput(300, 2))).toBe(expected, { b: 'ab' });
});
});
});- is one frame (one virtual ms), letters are emissions, | is complete. You can verify debounce behavior without truly waiting 300ms. The syntax feels foreign at first, but working through one complete test file gets you comfortable quickly.
cold is an Observable that flows from the moment of subscription, while hot is one that’s already flowing. For cold streams like an HTTP response, use cold; for hot streams like mouse events or WebSocket, use hot.
Common pitfalls #
A few pitfalls you’ll encounter over time.
1. subscribe inside subscribe — The pattern of writing as nested callbacks what should be one operator chain. Almost always flattens with switchMap or mergeMap.
// Bad
this.userId$.subscribe(id => {
this.api.getUser(id).subscribe(user => {
this.user = user;
});
});
// Good
this.user$ = this.userId$.pipe(switchMap(id => this.api.getUser(id)));2. Race inside switchMap — switchMap cancels the previous inner Observable, but it does not cancel the previous effect’s side effects. If something inside writes to a cache or touches global state, the side effect of a canceled flow can stick around.
3. cold/hot confusion — http.get() is cold, so each subscribe fires a fresh request. Subscribe in two places and the call goes twice. Without knowing this, you spend a long time wondering “why is the API called twice?” Make it hot with shareReplay({ bufferSize: 1, refCount: true }), or subscribe once in a service and republish via a BehaviorSubject.
4. Missing takeUntilDestroyed — Forgetting takeUntilDestroyed where you call subscribe directly is a memory leak. Pin a line about it on the code-review checklist.
subscribe. Templates use async pipe or toSignal for auto-subscribe/unsubscribe; in component classes, you bridge to signals with effect or toObservable. If subscribe keeps growing, suspect “is there an operator I should be using?” once.Wrap-up #
In this post we covered RxJS in depth.
- Higher-order Observables and four flattenings —
mergeAll/switchAll/concatAll/exhaustAll - Concurrency decision rule:
concatMap(order) /mergeMap(parallel) /switchMap(latest) /exhaustMap(ignore duplicates) - A custom operator is a function that returns an
OperatorFunction<T, R>. Compose withpipe - Control time with Schedulers —
animationFrameSchedulerfor progressive rendering,VirtualTimeSchedulerfor tests retry({ count, delay })is the standard.retryWhenis no longer used- Multicasting is
shareReplay({ bufferSize, refCount: true }). Almost always turnrefCounton ReplaySubjectfor the last N values;AsyncSubjectis rarely used- Virtual-time testing with
TestSchedulerand marble syntax - Pitfalls: nested subscribe, switchMap race, cold/hot confusion, missing
takeUntilDestroyed
The next post is “Angular Advanced #5 NgRx introduction”. We’ll cover global state beyond the component tree — Store, Action, Reducer, Selector, Effects — together with the question “do I really need NgRx?”