RxJS Complete Guide | Reactive Programming with Observables

RxJS Complete Guide | Reactive Programming with Observables

이 글의 핵심

RxJS is a library for reactive programming using Observables. It makes composing asynchronous or callback-based code easier with a powerful operator-based approach.

Introduction

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asynchronous or callback-based code.

The Problem

Traditional async handling:

// Callbacks
setTimeout(() => {
  getData((data) => {
    processData(data, (result) => {
      console.log(result); // Callback hell
    });
  });
}, 1000);

// Promises (better, but limited)
fetch('/api/data')
  .then(res => res.json())
  .then(data => console.log(data));
// Can't cancel, only handles single values

The Solution

With RxJS:

import { fromEvent, debounceTime, map, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Search as user types
fromEvent(input, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  switchMap(query => ajax.getJSON(`/api/search?q=${query}`))
).subscribe(results => console.log(results));

Benefits:

  • Cancellable
  • Composable with operators
  • Handles multiple values
  • Built-in error handling

1. Installation

npm install rxjs

2. Core Concepts

Observable

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

observable.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
  complete: () => console.log('Done'),
});
// Output: 1, 2, 3, Done

Creating Observables

import { of, from, interval, fromEvent } from 'rxjs';

// Emit values
of(1, 2, 3).subscribe(console.log);

// From array
from([1, 2, 3]).subscribe(console.log);

// From promise
from(fetch('/api/data')).subscribe(console.log);

// Interval
interval(1000).subscribe(count => console.log(count));

// DOM events
fromEvent(button, 'click').subscribe(e => console.log('Clicked'));

3. Operators

map

import { of, map } from 'rxjs';

of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log);
// Output: 2, 4, 6

filter

import { of, filter } from 'rxjs';

of(1, 2, 3, 4, 5).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log);
// Output: 2, 4

tap (Side Effects)

import { of, tap, map } from 'rxjs';

of(1, 2, 3).pipe(
  tap(x => console.log('Before:', x)),
  map(x => x * 2),
  tap(x => console.log('After:', x))
).subscribe();

4. Combination Operators

merge

import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const first$ = interval(1000).pipe(map(() => 'First'));
const second$ = interval(1500).pipe(map(() => 'Second'));

merge(first$, second$).subscribe(console.log);
// Output: First, Second, First, First, Second, ...

combineLatest

import { combineLatest, of } from 'rxjs';

const age$ = of(30);
const name$ = of('John');

combineLatest([age$, name$]).subscribe(([age, name]) => {
  console.log(`${name} is ${age} years old`);
});

zip

import { zip, of } from 'rxjs';

const age$ = of(30, 31, 32);
const name$ = of('John', 'Jane');

zip(age$, name$).subscribe(([age, name]) => {
  console.log(`${name}: ${age}`);
});
// Output: John: 30, Jane: 31

5. Transformation Operators

switchMap

import { fromEvent, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Cancel previous request when new search happens
fromEvent(input, 'input').pipe(
  switchMap(e => ajax.getJSON(`/api/search?q=${e.target.value}`))
).subscribe(results => console.log(results));

mergeMap

import { of, mergeMap, delay } from 'rxjs';

of(1, 2, 3).pipe(
  mergeMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// All emit after 1s (concurrent)

concatMap

import { of, concatMap, delay } from 'rxjs';

of(1, 2, 3).pipe(
  concatMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// Emit one by one (sequential)

exhaustMap

import { fromEvent, exhaustMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Ignore clicks while request is pending
fromEvent(button, 'click').pipe(
  exhaustMap(() => ajax.getJSON('/api/data'))
).subscribe(console.log);

6. Time-based Operators

debounceTime

import { fromEvent, debounceTime, map } from 'rxjs';

// Wait 300ms after last keystroke
fromEvent(input, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value)
).subscribe(console.log);

throttleTime

import { fromEvent, throttleTime } from 'rxjs';

// Emit at most once per 1000ms
fromEvent(button, 'click').pipe(
  throttleTime(1000)
).subscribe(() => console.log('Clicked'));

delay

import { of, delay } from 'rxjs';

of('Hello').pipe(
  delay(2000)
).subscribe(console.log);
// Output after 2 seconds

7. Error Handling

catchError

import { of, throwError, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';

ajax.getJSON('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of({ error: true }); // Fallback value
  })
).subscribe(console.log);

retry

import { ajax } from 'rxjs/ajax';
import { retry } from 'rxjs/operators';

ajax.getJSON('/api/data').pipe(
  retry(3) // Retry 3 times on error
).subscribe({
  next: console.log,
  error: err => console.error('Failed after 3 retries')
});

retryWhen

import { ajax } from 'rxjs/ajax';
import { retryWhen, delay, take } from 'rxjs/operators';

ajax.getJSON('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    delay(1000), // Wait 1s between retries
    take(3) // Max 3 retries
  ))
).subscribe(console.log);

8. Subjects

Subject

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(x => console.log('A:', x));
subject.subscribe(x => console.log('B:', x));

subject.next(1);
subject.next(2);
// Output: A: 1, B: 1, A: 2, B: 2

BehaviorSubject

import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(0); // Initial value

subject.subscribe(x => console.log('A:', x)); // Gets 0 immediately

subject.next(1);
subject.next(2);

subject.subscribe(x => console.log('B:', x)); // Gets 2 immediately

ReplaySubject

import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(2); // Remember last 2 values

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(x => console.log(x));
// Output: 2, 3 (last 2 values)

9. Real-World Examples

import { fromEvent, debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const input = document.querySelector('#search');

fromEvent(input, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  distinctUntilChanged(),
  switchMap(query => 
    ajax.getJSON(`/api/search?q=${query}`).pipe(
      catchError(() => of([]))
    )
  )
).subscribe(results => {
  displayResults(results);
});

Infinite Scroll

import { fromEvent, map, filter, throttleTime } from 'rxjs';

fromEvent(window, 'scroll').pipe(
  throttleTime(200),
  map(() => ({
    scrollHeight: document.documentElement.scrollHeight,
    scrollTop: document.documentElement.scrollTop,
    clientHeight: document.documentElement.clientHeight
  })),
  filter(({ scrollHeight, scrollTop, clientHeight }) => 
    scrollHeight - scrollTop - clientHeight < 100
  )
).subscribe(() => {
  loadMoreData();
});

WebSocket with Auto-Reconnect

import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap } from 'rxjs/operators';

const socket$ = webSocket('ws://localhost:8080').pipe(
  retryWhen(errors => errors.pipe(
    tap(() => console.log('Reconnecting...')),
    delay(1000)
  ))
);

socket$.subscribe(
  msg => console.log('Message:', msg),
  err => console.error('Error:', err),
  () => console.log('Complete')
);

// Send message
socket$.next({ type: 'ping' });

Drag and Drop

import { fromEvent, takeUntil, map } from 'rxjs';

const target = document.querySelector('#draggable');

const mouseDown$ = fromEvent(target, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');

mouseDown$.pipe(
  switchMap(() => mouseMove$.pipe(
    map(e => ({ x: e.clientX, y: e.clientY })),
    takeUntil(mouseUp$)
  ))
).subscribe(pos => {
  target.style.left = pos.x + 'px';
  target.style.top = pos.y + 'px';
});

10. Best Practices

1. Always Unsubscribe

// Good
const subscription = observable.subscribe(console.log);
// Later...
subscription.unsubscribe();

// Better: use takeUntil
const destroy$ = new Subject();

observable.pipe(
  takeUntil(destroy$)
).subscribe(console.log);

// On cleanup
destroy$.next();
destroy$.complete();

2. Use shareReplay for Expensive Operations

import { ajax } from 'rxjs/ajax';
import { shareReplay } from 'rxjs/operators';

const data$ = ajax.getJSON('/api/data').pipe(
  shareReplay(1) // Cache last value
);

// Multiple subscribers share same request
data$.subscribe(console.log);
data$.subscribe(console.log);

3. Avoid Nested Subscriptions

// Bad
observable1.subscribe(val1 => {
  observable2.subscribe(val2 => {
    // Nested!
  });
});

// Good
observable1.pipe(
  switchMap(val1 => observable2)
).subscribe(val2 => {
  // Flat!
});

11. Angular Integration

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'app-users',
  template: `<div *ngFor="let user of users">{{ user.name }}</div>`
})
export class UsersComponent implements OnInit, OnDestroy {
  users: User[] = [];
  private destroy$ = new Subject<void>();
  
  constructor(private http: HttpClient) {}
  
  ngOnInit() {
    this.http.get<User[]>('/api/users').pipe(
      takeUntil(this.destroy$)
    ).subscribe(users => {
      this.users = users;
    });
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Summary

RxJS enables powerful reactive programming:

  • Observables for async data streams
  • Operators for transforming data
  • Error handling with retry and catchError
  • Cancellation with unsubscribe
  • Composition over nested callbacks

Key Takeaways:

  1. Use switchMap for HTTP requests (cancels previous)
  2. Use debounceTime for search inputs
  3. Always unsubscribe or use takeUntil
  4. Use shareReplay to avoid duplicate requests
  5. Avoid nested subscriptions

Next Steps:

  • Learn JavaScript Async
  • Try TypeScript Generics
  • Build with Angular

Resources: