RxJS Complete Guide | Reactive Programming with Observables
이 글의 핵심
RxJS is a library for reactive programming using Observables. It makes composing asynchronous or callback-based code easier with a powerful operator-based approach.
Introduction
RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables, making it easier to compose asynchronous or callback-based code.
The Problem
Traditional async handling:
// Callbacks
setTimeout(() => {
getData((data) => {
processData(data, (result) => {
console.log(result); // Callback hell
});
});
}, 1000);
// Promises (better, but limited)
fetch('/api/data')
.then(res => res.json())
.then(data => console.log(data));
// Can't cancel, only handles single values
The Solution
With RxJS:
import { fromEvent, debounceTime, map, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Search as user types
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value),
switchMap(query => ajax.getJSON(`/api/search?q=${query}`))
).subscribe(results => console.log(results));
Benefits:
- Cancellable
- Composable with operators
- Handles multiple values
- Built-in error handling
Real-World Adoption
RxJS is production-proven across major companies and frameworks:
Enterprise Usage:
- Angular: RxJS is the core of Angular’s reactive architecture, used for HTTP requests, forms, and state management
- Netflix: Leverages RxJS for complex UI state management and real-time data streaming across their video platform
- Microsoft: Uses RxJS extensively in Visual Studio Code, Azure Portal, and other products for event-driven architectures
- Google: Employs RxJS in Firebase, Google Cloud Console, and internal tools for managing async operations
Market Position:
- 42+ million weekly npm downloads (April 2026) - one of the most downloaded packages in the JavaScript ecosystem
- 30,000+ GitHub stars - indicating strong community adoption
- Required dependency for every Angular application (Angular has 4M+ weekly downloads)
- Used in 1.5M+ public GitHub repositories
Framework Integration:
- Angular (built-in)
- Vue.js (via @vueuse/rxjs)
- React (via rxjs-hooks, @legendapp/state)
- Svelte (via svelte-rxjs)
RxJS patterns are now influencing modern frameworks - React Signals, Vue 3 Composition API, and Svelte 5 runes all draw inspiration from reactive programming concepts pioneered by RxJS.
1. Installation
npm install rxjs
2. Core Concepts
Observable
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
observable.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
complete: () => console.log('Done'),
});
// Output: 1, 2, 3, Done
Creating Observables
import { of, from, interval, fromEvent } from 'rxjs';
// Emit values
of(1, 2, 3).subscribe(console.log);
// From array
from([1, 2, 3]).subscribe(console.log);
// From promise
from(fetch('/api/data')).subscribe(console.log);
// Interval
interval(1000).subscribe(count => console.log(count));
// DOM events
fromEvent(button, 'click').subscribe(e => console.log('Clicked'));
3. Operators
map
import { of, map } from 'rxjs';
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log);
// Output: 2, 4, 6
filter
import { of, filter } from 'rxjs';
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log);
// Output: 2, 4
tap (Side Effects)
import { of, tap, map } from 'rxjs';
of(1, 2, 3).pipe(
tap(x => console.log('Before:', x)),
map(x => x * 2),
tap(x => console.log('After:', x))
).subscribe();
4. Combination Operators
merge
import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';
const first$ = interval(1000).pipe(map(() => 'First'));
const second$ = interval(1500).pipe(map(() => 'Second'));
merge(first$, second$).subscribe(console.log);
// Output: First, Second, First, First, Second, ...
combineLatest
import { combineLatest, of } from 'rxjs';
const age$ = of(30);
const name$ = of('John');
combineLatest([age$, name$]).subscribe(([age, name]) => {
console.log(`${name} is ${age} years old`);
});
zip
import { zip, of } from 'rxjs';
const age$ = of(30, 31, 32);
const name$ = of('John', 'Jane');
zip(age$, name$).subscribe(([age, name]) => {
console.log(`${name}: ${age}`);
});
// Output: John: 30, Jane: 31
5. Transformation Operators
switchMap
import { fromEvent, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Cancel previous request when new search happens
fromEvent(input, 'input').pipe(
switchMap(e => ajax.getJSON(`/api/search?q=${e.target.value}`))
).subscribe(results => console.log(results));
mergeMap
import { of, mergeMap, delay } from 'rxjs';
of(1, 2, 3).pipe(
mergeMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// All emit after 1s (concurrent)
concatMap
import { of, concatMap, delay } from 'rxjs';
of(1, 2, 3).pipe(
concatMap(x => of(x).pipe(delay(1000)))
).subscribe(console.log);
// Emit one by one (sequential)
exhaustMap
import { fromEvent, exhaustMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
// Ignore clicks while request is pending
fromEvent(button, 'click').pipe(
exhaustMap(() => ajax.getJSON('/api/data'))
).subscribe(console.log);
6. Time-based Operators
debounceTime
import { fromEvent, debounceTime, map } from 'rxjs';
// Wait 300ms after last keystroke
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value)
).subscribe(console.log);
throttleTime
import { fromEvent, throttleTime } from 'rxjs';
// Emit at most once per 1000ms
fromEvent(button, 'click').pipe(
throttleTime(1000)
).subscribe(() => console.log('Clicked'));
delay
import { of, delay } from 'rxjs';
of('Hello').pipe(
delay(2000)
).subscribe(console.log);
// Output after 2 seconds
7. Error Handling
catchError
import { of, throwError, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
ajax.getJSON('/api/data').pipe(
catchError(error => {
console.error('Error:', error);
return of({ error: true }); // Fallback value
})
).subscribe(console.log);
retry
import { ajax } from 'rxjs/ajax';
import { retry } from 'rxjs/operators';
ajax.getJSON('/api/data').pipe(
retry(3) // Retry 3 times on error
).subscribe({
next: console.log,
error: err => console.error('Failed after 3 retries')
});
retryWhen
import { ajax } from 'rxjs/ajax';
import { retryWhen, delay, take } from 'rxjs/operators';
ajax.getJSON('/api/data').pipe(
retryWhen(errors => errors.pipe(
delay(1000), // Wait 1s between retries
take(3) // Max 3 retries
))
).subscribe(console.log);
8. Subjects
Subject
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(x => console.log('A:', x));
subject.subscribe(x => console.log('B:', x));
subject.next(1);
subject.next(2);
// Output: A: 1, B: 1, A: 2, B: 2
BehaviorSubject
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // Initial value
subject.subscribe(x => console.log('A:', x)); // Gets 0 immediately
subject.next(1);
subject.next(2);
subject.subscribe(x => console.log('B:', x)); // Gets 2 immediately
ReplaySubject
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(2); // Remember last 2 values
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(x => console.log(x));
// Output: 2, 3 (last 2 values)
9. Real-World Examples
Autocomplete Search
import { fromEvent, debounceTime, distinctUntilChanged, switchMap, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const input = document.querySelector('#search');
fromEvent(input, 'input').pipe(
debounceTime(300),
map(e => e.target.value),
distinctUntilChanged(),
switchMap(query =>
ajax.getJSON(`/api/search?q=${query}`).pipe(
catchError(() => of([]))
)
)
).subscribe(results => {
displayResults(results);
});
Infinite Scroll
import { fromEvent, map, filter, throttleTime } from 'rxjs';
fromEvent(window, 'scroll').pipe(
throttleTime(200),
map(() => ({
scrollHeight: document.documentElement.scrollHeight,
scrollTop: document.documentElement.scrollTop,
clientHeight: document.documentElement.clientHeight
})),
filter(({ scrollHeight, scrollTop, clientHeight }) =>
scrollHeight - scrollTop - clientHeight < 100
)
).subscribe(() => {
loadMoreData();
});
WebSocket with Auto-Reconnect
import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap } from 'rxjs/operators';
const socket$ = webSocket('ws://localhost:8080').pipe(
retryWhen(errors => errors.pipe(
tap(() => console.log('Reconnecting...')),
delay(1000)
))
);
socket$.subscribe(
msg => console.log('Message:', msg),
err => console.error('Error:', err),
() => console.log('Complete')
);
// Send message
socket$.next({ type: 'ping' });
Drag and Drop
import { fromEvent, takeUntil, map } from 'rxjs';
const target = document.querySelector('#draggable');
const mouseDown$ = fromEvent(target, 'mousedown');
const mouseMove$ = fromEvent(document, 'mousemove');
const mouseUp$ = fromEvent(document, 'mouseup');
mouseDown$.pipe(
switchMap(() => mouseMove$.pipe(
map(e => ({ x: e.clientX, y: e.clientY })),
takeUntil(mouseUp$)
))
).subscribe(pos => {
target.style.left = pos.x + 'px';
target.style.top = pos.y + 'px';
});
10. Best Practices
1. Always Unsubscribe
// Good
const subscription = observable.subscribe(console.log);
// Later...
subscription.unsubscribe();
// Better: use takeUntil
const destroy$ = new Subject();
observable.pipe(
takeUntil(destroy$)
).subscribe(console.log);
// On cleanup
destroy$.next();
destroy$.complete();
2. Use shareReplay for Expensive Operations
import { ajax } from 'rxjs/ajax';
import { shareReplay } from 'rxjs/operators';
const data$ = ajax.getJSON('/api/data').pipe(
shareReplay(1) // Cache last value
);
// Multiple subscribers share same request
data$.subscribe(console.log);
data$.subscribe(console.log);
3. Avoid Nested Subscriptions
// Bad
observable1.subscribe(val1 => {
observable2.subscribe(val2 => {
// Nested!
});
});
// Good
observable1.pipe(
switchMap(val1 => observable2)
).subscribe(val2 => {
// Flat!
});
11. Angular Integration
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
import { HttpClient } from '@angular/common/http';
@Component({
selector: 'app-users',
template: `<div *ngFor="let user of users">{{ user.name }}</div>`
})
export class UsersComponent implements OnInit, OnDestroy {
users: User[] = [];
private destroy$ = new Subject<void>();
constructor(private http: HttpClient) {}
ngOnInit() {
this.http.get<User[]>('/api/users').pipe(
takeUntil(this.destroy$)
).subscribe(users => {
this.users = users;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Summary
RxJS enables powerful reactive programming:
- Observables for async data streams
- Operators for transforming data
- Error handling with retry and catchError
- Cancellation with unsubscribe
- Composition over nested callbacks
Key Takeaways:
- Use
switchMapfor HTTP requests (cancels previous) - Use
debounceTimefor search inputs - Always unsubscribe or use
takeUntil - Use
shareReplayto avoid duplicate requests - Avoid nested subscriptions
Next Steps:
- Learn JavaScript Async
- Try [TypeScript Generics](/en/blog/typescript-series-04-generics/
- Build with [Angular](/en/blog/angular-complete-guide/
Resources:
자주 묻는 질문 (FAQ)
Q. 이 내용을 실무에서 언제 쓰나요?
A. Complete RxJS guide for reactive programming in JavaScript. Learn Observables, operators, error handling, and async patt… 실무에서는 위 본문의 예제와 선택 가이드를 참고해 적용하면 됩니다.
Q. 선행으로 읽으면 좋은 글은?
A. 각 글 하단의 이전 글 또는 관련 글 링크를 따라가면 순서대로 배울 수 있습니다. C++ 시리즈 목차에서 전체 흐름을 확인할 수 있습니다.
Q. 더 깊이 공부하려면?
A. cppreference와 해당 라이브러리 공식 문서를 참고하세요. 글 말미의 참고 자료 링크도 활용하면 좋습니다.
같이 보면 좋은 글 (내부 링크)
이 주제와 연결되는 다른 글입니다.
- [Axios Complete Guide](/en/blog/axios-complete-guide/
- [Alpine.js Complete Guide | Lightweight JavaScript Framework](/en/blog/alpine-js-complete-guide/
- [Express.js Complete Guide | Fast Node.js Web Framework](/en/blog/express-complete-guide/
이 글에서 다루는 키워드 (관련 검색어)
RxJS, JavaScript, Reactive Programming, Observables, Async, TypeScript 등으로 검색하시면 이 글이 도움이 됩니다.