Programación reactiva con RxJS

rxjs

RxJS es una librería que implementa ReactiveX en javascript, permitiendo hacer uso de la programación reactiva en esta plataforma.

Gracias al uso de observables basados en eventos se facilita en gran medida la programación asíncrona, ofreciendo grandes ventajas respecto al uso de callback y Promise.

El funcionamiento básico de los observables se basa en un objecto llamado subject que contiene una lista de dependencias llamadas observers a las que notifica cuando se produce algún cambio, llamando a uno de sus métodos, de forma que reaccionen a ese cambio.

Los observables generan flujos de datos que se generan valores en diferentes momentos, y los observers subscritos a esos cambios reaccionan a los datos que se envían.

Estos son los conceptos básicos utilizados para este patrón:

  • Observable: el objecto a cuyos cambios de estado se quieren subscribir otros objetos.
  • Observer: el objeto que desea ser notificado cuando se producen cambios en el estado de algún observable.
  • Subscription: se produce cuando los observers se conectan con los observables para recibir los valores que estos emiten. Estas subscripciones se pueden cancelar posteriormente.
  • Subject: actúa como observer y observable a la vez. Puede emitir valores y subscribirse a observables.
  • Operadores: permiten realizar operaciones con los datos enviados por el observer o el subject antes de ser enviados al observable. Se pueden encadenar.

En este ejemplo se crea un observable mediante el operador create.

import { Observable } from 'rxjs';

const observable = Observable.create((observer) => {
  observer.next('Hola');
  observer.next('mundo');
});

const subscripcion = observable.subscribe({
  next: x => console.log('Dato emitido: ' + x),
  error: err => console.error('Error: ' + err),
  complete: () => console.log('Terminado'),
});

El método subscribe del observable puede contener un único parámetro, que sería la función que se ejecuta cada vez que el observable emite un valor, o como en este ejemplo, contener tres funciones, la primera para cuando se emite un valor, la segunda en caso de error y la última para cuando se emite el último valor, en caso de no haberse producido ningún error antes

En este ejemplo se crea el observable a partir de un evento. input$ representa el flujo de datos que proviene de cualquier interacción que haga el usuario con elementoHtml.

const elementoHtml = document.querySelector('input[type=text]');

const input$ = Rx.Observable.fromEvent(elemento, 'input');

const subscripcion = input$.subscribe({
  next: event => console.log(`Has escrito: ${event.target.value}`),
  error: err => console.log(`Error: ${err}`),
  complete: () => console.log(`Completado`),
});

Mediante Subject, además de crearse la subscripción, es desde el propio Subject desde donde se emiten los valores.

let subject = new Rx.Subject();

var subscription = subject.subscribe(
    x => console.log('onNext: ' + x),
    e => console.log('onError: ' + e),
    () => console.log('onCompleted'));

subject.onNext(1);
// => onNext: 1

subject.onNext(2);
// => onNext: 2

subject.onCompleted();
// => onCompleted

subscription.dispose();

Existen varias versiones de Subject que modifican ligeramente su comportamiento habitual:

  • AsyncSubject: tan solo emite el último valor producido desde el observable en el momento en que este se ha completado.
  • BehaviorSubject: comienza emitiendo el último valor que se ha producido. Al crearlo se le puede pasar un valor por defecto, que será el que se emita al crear una subscripción en caso de no haberse emitido aún ningún valor.
  • ReplaySubject: cuando se crea una subscripción a este tipo de subject se emiten todos los valores que se han producido hasta el momento, incluyendo los que se produjeron antes de crearse la subscripción

Operadores

Una de las herramientas más importantes con las que cuenta RxJS es el uso de operadores.

Existen distintos tipos de operadores entre los que destacan los siguientes:

  • Creación: se utilizan para crear observables a partir de otros elementos, como pueden ser arrays, eventos o promesas (from,fromEvent, of).
  • Filtro: de entre los valores emitidos por los observables, filtran los que cumplen determinadas características y descartan el resto (debounceTime, distinctUntilChanged, filter, take, takeUntil).
  • Transformación: se encargan de modificar los datos transmitidos a traves del flujo de datos producido desde los observables (bufferTime, concatMap, map, mergeMap, scan, switchMap).
  • Gestión de errores: gestionan el comportamiento de la aplicación cuando se produce algún error. Permiten reintentar la operación que ha producido el error (catchError, retry, retryWhen).
  • Combinación: se utilizan para juntar la información emitida desde distintos observables (combineLatest, concat, merge, startWith , withLatestFrom, zip).
  • Utilidades: proporcionan utilidades básicas a los observables (tap, delay, toPromise).

Estos son alguno de los operadores más comunes:

  • create: se utiliza principalmente para pruebas. Se crea un observable pasando como parámetro una función con un observer.
  • from: crea un observable a partir de un arrays, una cadenas, Promises o iterables.
  • of: convierte los argumentos que se pasan a la función en valores que emite el observable creado y emite una notificación de completado al terminar.
  • interval: crea un observable que emite valores numéricos según el intervalo que se indique al crearlo.
  • timer: similar a interval, añade un primer parámetro que indica la pausa inicial antes de emitir un número. Si contiene un segundo parámetro, este se utiliza como interval, para generar valores periódicamente.
  • toPromise: convierte un obserbable en una Promise.
  • combineLatest: toma como parámetros distintos observables y emite datos cada vez que uno de ellos genera datos nuevos, combinando estos últimos datos con los más recientes del resto de observables que contiene.
  • concat: según el orden en que se pasen los observables como parámetros, los ejecuta en orden, no pasando al siguiente hasta que no se ha completado el actual.
  • merge: permite juntar dos flujos de datos en uno, ejecutándose simultáneamente.
  • zip: espera a que todos los observables que lo componen emitan algún valor antes de generar un array combinandolos todos.
  • filter: emite solo los valores que cumple una cierta condición.
  • take: emite un número máximo de valores antes de completar el observable.
  • skip: se salta el número indicado de los primero valores generados por el observable.
  • delay: retrasa la emisión de valores durante el tiempo indicado.
  • debounceTime: descarta los valores emitidos entre un cierto intervalo de tiempo. Si se le pasa por parámetro 1 segundo, no emitirá más de un valor por segundo.
  • bufferTime: almacena todos los valores generados y los emite como array en el periodo indicado en el parámetro.
  • map: permite aplicar cambios a cada uno de los valores emitidos.
  • mergeMap: combina dos observables de forma que cada vez que el interno emite un valor, lo combina con el valor del observable externo.
  • switchMap: combina dos observables reiniciando el observable interno cada vez que se produce un nuevo valor del externo.
  • catchError: captura los errores del observable.
  • retry: reintenta ejecutar el observable el número de veces indicado. Muy útil cuando es un error que puede ser temporal, como al ejecutar una petición ajax que puede fallar por falta de conexión.
  • tap: permite efectuar acciones sobre los datos emitidos sin transformarlos.

Los operadores se pueden encadenar mediante el uso de la función pipe() de la siguiente forma:

import { filter, map } from 'rxjs/operators';

const numeros = of(1, 2, 3, 4, 5)
  .pipe(
    filter(n => n < 3),
    map(n => n * 2)
  );

numeros.subscribe(x => console.log(x));

Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *