On this page
Essential RxJS for Angular
RxJS in Angular 21
RxJS (Reactive Extensions for JavaScript) is the reactive programming library that Angular uses internally. Although Angular 21 favors Signals for UI state, RxJS remains essential for:
- Asynchronous events (clicks, keyboard, timers)
- HttpClient (HTTP requests)
- Forms (valueChanges, statusChanges)
- Router (navigation events)
Observable vs Signal
| Aspect | Observable | Signal |
|---|---|---|
| Model | Push (emits values) | Pull (read on demand) |
| Current value | Not guaranteed | Always available |
| Subscription | Manual (subscribe) |
Automatic in templates |
| Cancellation | unsubscribe / operators |
Automatic |
| Ideal use | Streams, HTTP, events | UI state |
Essential operators
Transformation
import { map, tap } from 'rxjs/operators';
// map: transforms each value
source$.pipe(
map(user => user.name.toUpperCase()),
);
// tap: side effect without modifying the value
source$.pipe(
tap(value => console.log('Received:', value)),
);Filtering
import { filter, distinctUntilChanged, take } from 'rxjs/operators';
// filter: only passes values that meet the condition
clicks$.pipe(
filter(event => event.target instanceof HTMLButtonElement),
);
// distinctUntilChanged: ignores consecutive repeated values
input$.pipe(distinctUntilChanged());
// take: takes only the first N values
source$.pipe(take(5));Time
import { debounceTime, throttleTime } from 'rxjs/operators';
// debounceTime: waits X ms without emission before passing the value
search$.pipe(debounceTime(300));
// throttleTime: emits at most once every X ms
scroll$.pipe(throttleTime(100));Flattening
These operators are crucial when an Observable emits another Observable:
import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';
// switchMap: cancels the previous request when a new value arrives
search$.pipe(
switchMap(term => http.get(`/api/search?q=${term}`)),
);
// concatMap: waits for the previous one to finish before starting the next
queue$.pipe(
concatMap(task => processTask(task)),
);
// exhaustMap: ignores new values while one is in progress
saveButton$.pipe(
exhaustMap(() => saveData()),
);| Operator | Cancels previous | Order | Typical use |
|---|---|---|---|
switchMap |
Yes | Latest | Search, autocomplete |
mergeMap |
No | Parallel | Independent operations |
concatMap |
No | Sequential | Task queue |
exhaustMap |
Ignores new | First | Form submission |
RxJS - Signals bridge
toSignal: Observable to Signal
import { toSignal } from '@angular/core/rxjs-interop';
readonly time = toSignal(
interval(1000).pipe(map(n => new Date())),
{ initialValue: new Date() },
);toObservable: Signal to Observable
import { toObservable } from '@angular/core/rxjs-interop';
readonly filter = signal('all');
readonly filter$ = toObservable(this.filter);takeUntilDestroyed
Automatically cancels subscriptions when the component is destroyed:
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
constructor() {
this.router.events.pipe(
takeUntilDestroyed(),
).subscribe(event => {
// automatically canceled when the component is destroyed
});
}Practice
- Build a search with debounce: Create a
FormControlfor a search field and chaindebounceTime(300),distinctUntilChanged(), andswitchMap()to make HTTP requests only when the user stops typing. - Convert an Observable to a Signal: Use
toSignal()to convert the search results stream into a signal and display it in the template without using theasyncpipe. - Clean up subscriptions: Add
takeUntilDestroyed()to a manual subscription in a component's constructor and verify it is canceled when the component is destroyed.
In the next lesson, we will learn how to create custom directives and pipes to extend Angular.
toSignal and toObservable
Use toSignal() to convert Observables to Signals and toObservable() for the reverse. These functions from the @angular/core/rxjs-interop package are the bridge between both worlds.
Avoid memory leaks
Use takeUntilDestroyed() to automatically cancel subscriptions when the component is destroyed. Alternatively, use toSignal() which manages the subscription automatically.
import {
Component, signal, inject,
ChangeDetectionStrategy, DestroyRef,
} from '@angular/core';
import { takeUntilDestroyed, toSignal } from '@angular/core/rxjs-interop';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import {
debounceTime, distinctUntilChanged,
switchMap, catchError, filter, map, startWith,
} from 'rxjs/operators';
import { of, merge, Subject } from 'rxjs';
interface SearchResult {
id: number;
title: string;
type: string;
}
@Component({
selector: 'app-search',
changeDetection: ChangeDetectionStrategy.OnPush,
imports: [ReactiveFormsModule],
templateUrl: './search.html',
})
export class Search {
private readonly http = inject(HttpClient);
private readonly destroyRef = inject(DestroyRef);
readonly searchInput = new FormControl('', { nonNullable: true });
// Convert Observable to Signal with toSignal
readonly results = toSignal(
this.searchInput.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(term => term.length >= 2),
switchMap(term =>
this.http.get<SearchResult[]>(`/api/search?q=${term}`).pipe(
catchError(() => of([])),
)
),
),
{ initialValue: [] },
);
readonly totalResults = signal(0);
// Manual subscription with takeUntilDestroyed
private readonly clear$ = new Subject<void>();
constructor() {
merge(
this.searchInput.valueChanges.pipe(map(v => v.length)),
).pipe(
takeUntilDestroyed(this.destroyRef),
).subscribe(count => {
this.totalResults.set(count);
});
}
clearSearch(): void {
this.searchInput.reset();
this.clear$.next();
}
}
Sign in to track your progress