En esta página

RxJS esencial para Angular

12 min lectura TextoCap. 4 — Integración

RxJS en Angular 21

RxJS (Reactive Extensions for JavaScript) es la libreria de programación reactiva que Angular usa internamente. Aunque Angular 21 favorece Signals para estado de UI, RxJS sigue siendo esencial para:

  • Eventos asincronos (clicks, teclado, timers)
  • HttpClient (peticiones HTTP)
  • Formularios (valueChanges, statusChanges)
  • Router (eventos de navegación)

Observable vs Signal

Aspecto Observable Signal
Modelo Push (emite valores) Pull (se lee bajo demanda)
Valor actual No garantizado Siempre disponible
Suscripcion Manual (subscribe) Automatica en templates
Cancelacion unsubscribe / operadores Automatica
Uso ideal Streams, HTTP, eventos Estado de UI

Operadores esenciales

Transformacion

import { map, tap } from 'rxjs/operators';

// map: transforma cada valor
source$.pipe(
  map(usuario => usuario.nombre.toUpperCase()),
);

// tap: efecto secundario sin modificar el valor
source$.pipe(
  tap(valor => console.log('Recibido:', valor)),
);

Filtrado

import { filter, distinctUntilChanged, take } from 'rxjs/operators';

// filter: solo deja pasar valores que cumplen la condición
clicks$.pipe(
  filter(evento => evento.target instanceof HTMLButtonElement),
);

// distinctUntilChanged: ignora valores repetidos consecutivos
input$.pipe(distinctUntilChanged());

// take: toma solo los primeros N valores
source$.pipe(take(5));

Tiempo

import { debounceTime, throttleTime } from 'rxjs/operators';

// debounceTime: espera X ms sin emision antes de pasar el valor
busqueda$.pipe(debounceTime(300));

// throttleTime: emite máximo una vez cada X ms
scroll$.pipe(throttleTime(100));

Aplanamiento (Flattening)

Estos operadores son cruciales cuando un Observable emite otro Observable:

import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';

// switchMap: cancela la petición anterior al recibir un nuevo valor
busqueda$.pipe(
  switchMap(término => http.get(`/api/buscar?q=${término}`)),
);

// concatMap: espera a que termine la anterior antes de iniciar la siguiente
cola$.pipe(
  concatMap(tarea => procesarTarea(tarea)),
);

// exhaustMap: ignora nuevos valores mientras hay uno en proceso
botonGuardar$.pipe(
  exhaustMap(() => guardarDatos()),
);
Operador Cancela anterior Orden Uso tipico
switchMap Si Último Busqueda, autocomplete
mergeMap No Paralelo Operaciones independientes
concatMap No Secuencial Cola de tareas
exhaustMap Ignora nuevos Primero Envio de formularios

Puente RxJS - Signals

toSignal: Observable a Signal

import { toSignal } from '@angular/core/rxjs-interop';

readonly tiempo = toSignal(
  interval(1000).pipe(map(n => new Date())),
  { initialValue: new Date() },
);

toObservable: Signal a Observable

import { toObservable } from '@angular/core/rxjs-interop';

readonly filtro = signal('todos');

readonly filtro$ = toObservable(this.filtro);

takeUntilDestroyed

Cancela suscripciones automaticamente cuando el componente se destruye:

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

constructor() {
  this.router.events.pipe(
    takeUntilDestroyed(),
  ).subscribe(evento => {
    // se cancela automaticamente al destruir el componente
  });
}

Práctica

  1. Implementa un buscador con debounce: Crea un FormControl para un campo de busqueda y encadena debounceTime(300), distinctUntilChanged() y switchMap() para hacer peticiones HTTP solo cuando el usuario deje de escribir.
  2. Convierte un Observable a Signal: Usa toSignal() para convertir el flujo de resultados de busqueda a un signal y muestralo en el template sin usar el pipe async.
  3. Limpia suscripciones: Agrega takeUntilDestroyed() a una suscripcion manual en el constructor de un componente y verifica que se cancela al destruir el componente.

En la siguiente leccion aprenderemos a crear directivas y pipes personalizados para extender Angular.

toSignal y toObservable
Usa toSignal() para convertir Observables a Signals y toObservable() para el camino inverso. Estas funciones del paquete @angular/core/rxjs-interop son el puente entre ambos mundos.
Evita memory leaks
Usa takeUntilDestroyed() para cancelar suscripciones automaticamente cuando el componente se destruye. Alternativamente, usa toSignal() que gestiona la suscripción automaticamente.
import {
  Component, signal, inject,
  ChangeDetectionStrategy, DestroyRef,
} from '@angular/core';
import { takeUntilDestroyed, toSignal } from '@angular/core/rxjs-interop';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import {
  debounceTime, distinctUntilChanged,
  switchMap, catchError, filter, map, startWith,
} from 'rxjs/operators';
import { of, merge, Subject } from 'rxjs';

interface Resultado {
  id: number;
  título: string;
  tipo: string;
}

@Component({
  selector: 'app-buscador',
  changeDetection: ChangeDetectionStrategy.OnPush,
  imports: [ReactiveFormsModule],
  templateUrl: './buscador.html',
})
export class Buscador {
  private readonly http = inject(HttpClient);
  private readonly destroyRef = inject(DestroyRef);

  readonly busqueda = new FormControl('', { nonNullable: true });

  // Convertir Observable a Signal con toSignal
  readonly resultados = toSignal(
    this.busqueda.valueChanges.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      filter(término => término.length >= 2),
      switchMap(término =>
        this.http.get<Resultado[]>(`/api/buscar?q=${término}`).pipe(
          catchError(() => of([])),
        )
      ),
    ),
    { initialValue: [] },
  );

  readonly totalResultados = signal(0);

  // Suscripcion manual con takeUntilDestroyed
  private readonly limpiar$ = new Subject<void>();

  constructor() {
    merge(
      this.busqueda.valueChanges.pipe(map(v => v.length)),
    ).pipe(
      takeUntilDestroyed(this.destroyRef),
    ).subscribe(count => {
      this.totalResultados.set(count);
    });
  }

  limpiarBusqueda(): void {
    this.busqueda.reset();
    this.limpiar$.next();
  }
}