On this page

Essential RxJS for Angular

12 min read TextCh. 4 — Integration

RxJS in Angular 21

RxJS (Reactive Extensions for JavaScript) is the reactive programming library that Angular uses internally. Although Angular 21 favors Signals for UI state, RxJS remains essential for:

  • Asynchronous events (clicks, keyboard, timers)
  • HttpClient (HTTP requests)
  • Forms (valueChanges, statusChanges)
  • Router (navigation events)

Observable vs Signal

Aspect Observable Signal
Model Push (emits values) Pull (read on demand)
Current value Not guaranteed Always available
Subscription Manual (subscribe) Automatic in templates
Cancellation unsubscribe / operators Automatic
Ideal use Streams, HTTP, events UI state

Essential operators

Transformation

import { map, tap } from 'rxjs/operators';

// map: transforms each value
source$.pipe(
  map(user => user.name.toUpperCase()),
);

// tap: side effect without modifying the value
source$.pipe(
  tap(value => console.log('Received:', value)),
);

Filtering

import { filter, distinctUntilChanged, take } from 'rxjs/operators';

// filter: only passes values that meet the condition
clicks$.pipe(
  filter(event => event.target instanceof HTMLButtonElement),
);

// distinctUntilChanged: ignores consecutive repeated values
input$.pipe(distinctUntilChanged());

// take: takes only the first N values
source$.pipe(take(5));

Time

import { debounceTime, throttleTime } from 'rxjs/operators';

// debounceTime: waits X ms without emission before passing the value
search$.pipe(debounceTime(300));

// throttleTime: emits at most once every X ms
scroll$.pipe(throttleTime(100));

Flattening

These operators are crucial when an Observable emits another Observable:

import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';

// switchMap: cancels the previous request when a new value arrives
search$.pipe(
  switchMap(term => http.get(`/api/search?q=${term}`)),
);

// concatMap: waits for the previous one to finish before starting the next
queue$.pipe(
  concatMap(task => processTask(task)),
);

// exhaustMap: ignores new values while one is in progress
saveButton$.pipe(
  exhaustMap(() => saveData()),
);
Operator Cancels previous Order Typical use
switchMap Yes Latest Search, autocomplete
mergeMap No Parallel Independent operations
concatMap No Sequential Task queue
exhaustMap Ignores new First Form submission

RxJS - Signals bridge

toSignal: Observable to Signal

import { toSignal } from '@angular/core/rxjs-interop';

readonly time = toSignal(
  interval(1000).pipe(map(n => new Date())),
  { initialValue: new Date() },
);

toObservable: Signal to Observable

import { toObservable } from '@angular/core/rxjs-interop';

readonly filter = signal('all');

readonly filter$ = toObservable(this.filter);

takeUntilDestroyed

Automatically cancels subscriptions when the component is destroyed:

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

constructor() {
  this.router.events.pipe(
    takeUntilDestroyed(),
  ).subscribe(event => {
    // automatically canceled when the component is destroyed
  });
}

Practice

  1. Build a search with debounce: Create a FormControl for a search field and chain debounceTime(300), distinctUntilChanged(), and switchMap() to make HTTP requests only when the user stops typing.
  2. Convert an Observable to a Signal: Use toSignal() to convert the search results stream into a signal and display it in the template without using the async pipe.
  3. Clean up subscriptions: Add takeUntilDestroyed() to a manual subscription in a component's constructor and verify it is canceled when the component is destroyed.

In the next lesson, we will learn how to create custom directives and pipes to extend Angular.

toSignal and toObservable
Use toSignal() to convert Observables to Signals and toObservable() for the reverse. These functions from the @angular/core/rxjs-interop package are the bridge between both worlds.
Avoid memory leaks
Use takeUntilDestroyed() to automatically cancel subscriptions when the component is destroyed. Alternatively, use toSignal() which manages the subscription automatically.
import {
  Component, signal, inject,
  ChangeDetectionStrategy, DestroyRef,
} from '@angular/core';
import { takeUntilDestroyed, toSignal } from '@angular/core/rxjs-interop';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import {
  debounceTime, distinctUntilChanged,
  switchMap, catchError, filter, map, startWith,
} from 'rxjs/operators';
import { of, merge, Subject } from 'rxjs';

interface SearchResult {
  id: number;
  title: string;
  type: string;
}

@Component({
  selector: 'app-search',
  changeDetection: ChangeDetectionStrategy.OnPush,
  imports: [ReactiveFormsModule],
  templateUrl: './search.html',
})
export class Search {
  private readonly http = inject(HttpClient);
  private readonly destroyRef = inject(DestroyRef);

  readonly searchInput = new FormControl('', { nonNullable: true });

  // Convert Observable to Signal with toSignal
  readonly results = toSignal(
    this.searchInput.valueChanges.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      filter(term => term.length >= 2),
      switchMap(term =>
        this.http.get<SearchResult[]>(`/api/search?q=${term}`).pipe(
          catchError(() => of([])),
        )
      ),
    ),
    { initialValue: [] },
  );

  readonly totalResults = signal(0);

  // Manual subscription with takeUntilDestroyed
  private readonly clear$ = new Subject<void>();

  constructor() {
    merge(
      this.searchInput.valueChanges.pipe(map(v => v.length)),
    ).pipe(
      takeUntilDestroyed(this.destroyRef),
    ).subscribe(count => {
      this.totalResults.set(count);
    });
  }

  clearSearch(): void {
    this.searchInput.reset();
    this.clear$.next();
  }
}