본문으로 건너뛰기
Previous
Next
RxJS Complete Guide | Reactive Programming with Observables

RxJS Complete Guide | Reactive Programming with Observables

RxJS Complete Guide | Reactive Programming with Observables

이 글의 핵심

RxJS is a library for reactive programming using Observables. It makes composing asynchronous or callback-based code easier with a powerful operator-based approach.

Introduction

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asynchronous or callback-based code.

The Problem

Traditional async handling:

// Callbacks
setTimeout(() => {
  getData((data) => {
    processData(data, (result) => {
      console.log(result); // Callback hell
    });
  });
}, 1000);

// Promises (better, but limited)
fetch('/api/data')
  .then(res => res.json())
  .then(data => console.log(data));
// Can't cancel, only handles single values

The Solution

With RxJS:

import { fromEvent, debounceTime, map, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Search as user types
fromEvent(input, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  switchMap(query => ajax.getJSON(`/api/search?q=${query}`))
).subscribe(results => console.log(results));

Benefits:

  • Cancellable
  • Composable with operators
  • Handles multiple values
  • Built-in error handling

Real-World Adoption

RxJS is production-proven across major companies and frameworks:

Enterprise Usage:

  • Angular: RxJS is the core of Angular’s reactive architecture, used for HTTP requests, forms, and state management
  • Netflix: Leverages RxJS for complex UI state management and real-time data streaming across their video platform
  • Microsoft: Uses RxJS extensively in Visual Studio Code, Azure Portal, and other products for event-driven architectures
  • Google: Employs RxJS in Firebase, Google Cloud Console, and internal tools for managing async operations

Market Position:

  • 42+ million weekly npm downloads (April 2026) - one of the most downloaded packages in the JavaScript ecosystem
  • 30,000+ GitHub stars - indicating strong community adoption
  • Required dependency for every Angular application (Angular has 4M+ weekly downloads)
  • Used in 1.5M+ public GitHub repositories

Framework Integration:

  • Angular (built-in)
  • Vue.js (via @vueuse/rxjs)
  • React (via rxjs-hooks, @legendapp/state)
  • Svelte (via svelte-rxjs)

RxJS patterns are now influencing modern frameworks - React Signals, Vue 3 Composition API, and Svelte 5 runes all draw inspiration from reactive programming concepts pioneered by RxJS.

1. Installation

npm install rxjs

2. Core Concepts

Observable

import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

observable.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
  complete: () => console.log('Done'),
});
// Output: 1, 2, 3, Done

Creating Observables

import { of, from, interval, fromEvent } from 'rxjs';

// Emit values
of(1, 2, 3).subscribe(console.log);

// From array
from([1, 2, 3]).subscribe(console.log);

// From promise
from(fetch('/api/data')).subscribe(console.log);

// Interval
interval(1000).subscribe(count => console.log(count));

// DOM events
fromEvent(button, 'click').subscribe(e => console.log('Clicked'));

3. Operators

map

import { of, map } from 'rxjs';

of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log);
// Output: 2, 4, 6

filter

import { of, filter } from 'rxjs';

of(1, 2, 3, 4, 5).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log);
// Output: 2, 4

tap (Side Effects)

import { of, tap, map } from 'rxjs';

of(1, 2, 3).pipe(
  tap(x => console.log('Before:', x)),
  map(x => x * 2),
  tap(x => console.log('After:', x))
).subscribe();

4. Combination Operators

merge

import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const first$ = interval(1000).pipe(map(() => 'First'));
const second$ = interval(1500).pipe(map(() => 'Second'));

merge(first$, second$).subscribe(console.log);
// Output: First, Second, First, First, Second, ...

combineLatest

import { combineLatest, of } from 'rxjs';

const age$ = of(30);
const name$ = of('John');

combineLatest([age$, name$]).subscribe(([age, name]) => {
  console.log(`${name} is ${age} years old`);
});

zip

import { zip, of } from 'rxjs';

const age$ = of(30, 31, 32);
const name$ = of('John', 'Jane');

zip(age$, name$).subscribe(([age, name]) => {
  console.log(`${name}: ${age}`);
});
// Output: John: 30, Jane: 31

5. Transformation Operators

switchMap

import { fromEvent, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Cancel previous request when new search happens
fromEvent(input, 'input').pipe(
  switchMap(e => ajax.getJSON(`/api/search?q=${e.target.value}`))
).subscribe(results => console.log(results));

mergeMap

import { of, mergeMap, delay } from 'rxjs';

of(1, 2, 3).pipe(
  mergeMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// All emit after 1s (concurrent)

concatMap

import { of, concatMap, delay } from 'rxjs';

of(1, 2, 3).pipe(
  concatMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// Emit one by one (sequential)

exhaustMap

import { fromEvent, exhaustMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Ignore clicks while request is pending
fromEvent(button, 'click').pipe(
  exhaustMap(() => ajax.getJSON('/api/data'))
).subscribe(console.log);

6. Time-based Operators

debounceTime

import { fromEvent, debounceTime, map } from 'rxjs';

// Wait 300ms after last keystroke
fromEvent(input, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value)
).subscribe(console.log);

throttleTime

import { fromEvent, throttleTime } from 'rxjs';

// Emit at most once per 1000ms
fromEvent(button, 'click').pipe(
  throttleTime(1000)
).subscribe(() => console.log('Clicked'));

delay

import { of, delay } from 'rxjs';

of('Hello').pipe(
  delay(2000)
).subscribe(console.log);
// Output after 2 seconds

7. Error Handling

catchError

import { of, throwError, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';

ajax.getJSON('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of({ error: true }); // Fallback value
  })
).subscribe(console.log);

retry

import { ajax } from 'rxjs/ajax';
import { retry } from 'rxjs/operators';

ajax.getJSON('/api/data').pipe(
  retry(3) // Retry 3 times on error
).subscribe({
  next: console.log,
  error: err => console.error('Failed after 3 retries')
});

retryWhen

import { ajax } from 'rxjs/ajax';
import { retryWhen, delay, take } from 'rxjs/operators';

ajax.getJSON('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    delay(1000), // Wait 1s between retries
    take(3) // Max 3 retries
  ))
).subscribe(console.log);

8. Subjects

Subject

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(x => console.log('A:', x));
subject.subscribe(x => console.log('B:', x));

subject.next(1);
subject.next(2);
// Output: A: 1, B: 1, A: 2, B: 2

BehaviorSubject

import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(0); // Initial value

subject.subscribe(x => console.log('A:', x)); // Gets 0 immediately

subject.next(1);
subject.next(2);

subject.subscribe(x => console.log('B:', x)); // Gets 2 immediately

ReplaySubject

import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject(2); // Remember last 2 values

subject.next(1);
subject.next(2);
subject.next(3);

subject.subscribe(x => console.log(x));
// Output: 2, 3 (last 2 values)

9. Real-World Examples

import { fromEvent, debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const input = document.querySelector('#search');

fromEvent(input, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  distinctUntilChanged(),
  switchMap(query => 
    ajax.getJSON(`/api/search?q=${query}`).pipe(
      catchError(() => of([]))
    )
  )
).subscribe(results => {
  displayResults(results);
});

Infinite Scroll

import { fromEvent, map, filter, throttleTime } from 'rxjs';

fromEvent(window, 'scroll').pipe(
  throttleTime(200),
  map(() => ({
    scrollHeight: document.documentElement.scrollHeight,
    scrollTop: document.documentElement.scrollTop,
    clientHeight: document.documentElement.clientHeight
  })),
  filter(({ scrollHeight, scrollTop, clientHeight }) => 
    scrollHeight - scrollTop - clientHeight < 100
  )
).subscribe(() => {
  loadMoreData();
});

WebSocket with Auto-Reconnect

import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap } from 'rxjs/operators';

const socket$ = webSocket('ws://localhost:8080').pipe(
  retryWhen(errors => errors.pipe(
    tap(() => console.log('Reconnecting...')),
    delay(1000)
  ))
);

socket$.subscribe(
  msg => console.log('Message:', msg),
  err => console.error('Error:', err),
  () => console.log('Complete')
);

// Send message
socket$.next({ type: 'ping' });

Drag and Drop

import { fromEvent, takeUntil, map } from 'rxjs';

const target = document.querySelector('#draggable');

const mouseDown$ = fromEvent(target, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');

mouseDown$.pipe(
  switchMap(() => mouseMove$.pipe(
    map(e => ({ x: e.clientX, y: e.clientY })),
    takeUntil(mouseUp$)
  ))
).subscribe(pos => {
  target.style.left = pos.x + 'px';
  target.style.top = pos.y + 'px';
});

10. Best Practices

1. Always Unsubscribe

// Good
const subscription = observable.subscribe(console.log);
// Later...
subscription.unsubscribe();

// Better: use takeUntil
const destroy$ = new Subject();

observable.pipe(
  takeUntil(destroy$)
).subscribe(console.log);

// On cleanup
destroy$.next();
destroy$.complete();

2. Use shareReplay for Expensive Operations

import { ajax } from 'rxjs/ajax';
import { shareReplay } from 'rxjs/operators';

const data$ = ajax.getJSON('/api/data').pipe(
  shareReplay(1) // Cache last value
);

// Multiple subscribers share same request
data$.subscribe(console.log);
data$.subscribe(console.log);

3. Avoid Nested Subscriptions

// Bad
observable1.subscribe(val1 => {
  observable2.subscribe(val2 => {
    // Nested!
  });
});

// Good
observable1.pipe(
  switchMap(val1 => observable2)
).subscribe(val2 => {
  // Flat!
});

11. Angular Integration

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'app-users',
  template: `<div *ngFor="let user of users">{{ user.name }}</div>`
})
export class UsersComponent implements OnInit, OnDestroy {
  users: User[] = [];
  private destroy$ = new Subject<void>();
  
  constructor(private http: HttpClient) {}
  
  ngOnInit() {
    this.http.get<User[]>('/api/users').pipe(
      takeUntil(this.destroy$)
    ).subscribe(users => {
      this.users = users;
    });
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Summary

RxJS enables powerful reactive programming:

  • Observables for async data streams
  • Operators for transforming data
  • Error handling with retry and catchError
  • Cancellation with unsubscribe
  • Composition over nested callbacks

Key Takeaways:

  1. Use switchMap for HTTP requests (cancels previous)
  2. Use debounceTime for search inputs
  3. Always unsubscribe or use takeUntil
  4. Use shareReplay to avoid duplicate requests
  5. Avoid nested subscriptions

Next Steps:

  • Learn JavaScript Async
  • Try [TypeScript Generics](/en/blog/typescript-series-04-generics/
  • Build with [Angular](/en/blog/angular-complete-guide/

Resources:


자주 묻는 질문 (FAQ)

Q. 이 내용을 실무에서 언제 쓰나요?

A. Complete RxJS guide for reactive programming in JavaScript. Learn Observables, operators, error handling, and async patt… 실무에서는 위 본문의 예제와 선택 가이드를 참고해 적용하면 됩니다.

Q. 선행으로 읽으면 좋은 글은?

A. 각 글 하단의 이전 글 또는 관련 글 링크를 따라가면 순서대로 배울 수 있습니다. C++ 시리즈 목차에서 전체 흐름을 확인할 수 있습니다.

Q. 더 깊이 공부하려면?

A. cppreference와 해당 라이브러리 공식 문서를 참고하세요. 글 말미의 참고 자료 링크도 활용하면 좋습니다.


같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.

  • [Axios Complete Guide](/en/blog/axios-complete-guide/
  • [Alpine.js Complete Guide | Lightweight JavaScript Framework](/en/blog/alpine-js-complete-guide/
  • [Express.js Complete Guide | Fast Node.js Web Framework](/en/blog/express-complete-guide/

이 글에서 다루는 키워드 (관련 검색어)

RxJS, JavaScript, Reactive Programming, Observables, Async, TypeScript 등으로 검색하시면 이 글이 도움이 됩니다.