RxJS 완벽 가이드 | 반응형 프로그래밍·Observable·Operators·실전 활용
이 글의 핵심
RxJS로 반응형 프로그래밍을 구현하는 완벽 가이드. Observable, Operators, Subject, Scheduler까지 실전 예제로 정리. RxJS·Reactive Programming·Observable 중심으로 설명합니다.
한참 전에, 검색창에 타이핑할 때마다 API를 때리다가 서버에 혼났던 적이 있어요. input에 addEventListener를 몇 겹 쌓고, debounce는 손으로 짜고, 이전 요청을 취소할 생각은 못 하고… 그때 “아, 이건 시간에 따라 흐름이 있다”는 걸 머리로는 알았는데 코드로는 계속 if문 덩어리로만 썼죠. 그게 반응형 프로그래밍이 처음과 만나는 지점이에요. 이벤트·시간·네트워크가 겹칠수록 “지금 무슨 시퀀스가 돌고 있지?”를 선언해 두고 싶어지는 거죠.
솔직히 말하면, 비동기면 Promise로 충분할 때 많아요. 한 번만 resolve되고, 취소가 없고, 흐름이 직선이면 async/await가 제일 읽기 쉽습니다. RxJS는 그 다음 단계예요. 여러 값이 이어질 수 있고, 중간에 끊을 수 있고, 여러 스트림을 섞을 수 있을 때 이 도구를 꺼내면 이득이 큽니다. “모든 API 호출에 Observable”로 가면 팀이 미칩니다. 딱 닿는 경계—검색 자동완성, WebSocket, 폼·라우트 이벤트가 뒤엉킨 화면—에 두는 게 맞아요.
RxJS는 그걸 Observable이라는 스트림과, map·filter 같은 오퍼레이터로 이어 붙이는 방식으로 풀어요. of랑 from으로 출발지를 만들고, interval·timer는 시간, fromEvent는 DOM이나 Node 이벤트를 그대로 스트림으로 바꿉니다.
import { of, from } from 'rxjs';
// of: 값으로 생성
const source$ = of(1, 2, 3, 4, 5);
source$.subscribe((value) => console.log(value));
// from: Array, Promise, Iterable로 생성
const array$ = from([1, 2, 3]);
const promise$ = from(fetch('/api/users'));
import { interval, timer } from 'rxjs';
// 1초마다 emit
const interval$ = interval(1000);
interval$.subscribe((value) => console.log(value));
// 3초 후 한 번만 emit
const timer$ = timer(3000);
timer$.subscribe(() => console.log('Timer!'));
import { fromEvent } from 'rxjs';
const button = document.getElementById('btn');
const click$ = fromEvent(button, 'click');
click$.subscribe(() => console.log('Clicked!'));
중간에 값을 갈고 닦는 건 pipe 안에서 해요. 짝수만 골라서 열 곱한다든지, 검색어를 debounce하고 직전 값과 다를 때만 넘긴다든지.
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const source$ = of(1, 2, 3, 4, 5);
source$
.pipe(
filter((x) => x % 2 === 0),
map((x) => x * 10)
)
.subscribe((value) => console.log(value));
// 20, 40
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, map } from 'rxjs/operators';
const input = document.getElementById('search');
const search$ = fromEvent(input, 'input');
search$
.pipe(
map((event) => (event.target as HTMLInputElement).value),
debounceTime(500),
distinctUntilChanged()
)
.subscribe((value) => {
console.log('Search:', value);
});
요청이 겹칠 때는 switchMap이 제일 잘 익숙해져요. 새 입력이 오면 안쪽 subscription을 갈아끼우니까 이전 HTTP가 살아 있으면 경쟁 상태(레이스)가 덜해요. 반대로 다 병렬로 때리고 싶으면 mergeMap, 줄 세우고 싶으면 concatMap 쪽 느낌이에요.
import { fromEvent, of } from 'rxjs';
import { switchMap, mergeMap, concatMap } from 'rxjs/operators';
const button = document.getElementById('btn');
const click$ = fromEvent(button, 'click');
// switchMap: 이전 요청 취소
click$
.pipe(
switchMap(() => fetch('/api/users').then((r) => r.json()))
)
.subscribe((users) => console.log(users));
// mergeMap: 병렬 실행
click$
.pipe(
mergeMap(() => fetch('/api/users').then((r) => r.json()))
)
.subscribe((users) => console.log(users));
// concatMap: 순차 실행
click$
.pipe(
concatMap(() => fetch('/api/users').then((r) => r.json()))
)
.subscribe((users) => console.log(users));
Subject는 “내가 next로 밀어 넣는 스트림”이에요. 싱크대에서 물 뿌리듯이 여리 여리 구독자한테 뿌릴 수 있죠. BehaviorSubject는 마지막 값을 들고 있어서 “지금 상태가 뭔지”를 대표로 쓰기 좋고, ReplaySubject는 늦게 온 애한테도 최근 n개를 되감아 보여줘요.
import { Subject } from 'rxjs';
const subject$ = new Subject<number>();
subject$.subscribe((value) => console.log('A:', value));
subject$.subscribe((value) => console.log('B:', value));
subject$.next(1);
subject$.next(2);
// A: 1, B: 1, A: 2, B: 2
import { BehaviorSubject } from 'rxjs';
const subject$ = new BehaviorSubject<number>(0);
subject$.subscribe((value) => console.log('A:', value));
// A: 0
subject$.next(1);
subject$.next(2);
subject$.subscribe((value) => console.log('B:', value));
// B: 2
import { ReplaySubject } from 'rxjs';
const subject$ = new ReplaySubject<number>(2);
subject$.next(1);
subject$.next(2);
subject$.next(3);
subject$.subscribe((value) => console.log('A:', value));
// A: 2, A: 3
에러는 스트림 안에서 catchError로 흡수하거나, retry로 몇 번 더 굴려보다가 실패 시 fallback 스트림으로 돌릴 수 있어요. 다만 retry를 POST에 막 쓰면 멱등 아닌 요청이 두 번 갈 수 있으니, 운 정책은 꼭 같이 봐야 합니다.
import { of, throwError } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
const source$ = throwError(() => new Error('Error!'));
source$
.pipe(
retry(3),
catchError((error) => {
console.error('Error:', error);
return of('Fallback value');
})
)
.subscribe((value) => console.log(value));
검색 자동완성은 위 조합이 그대로 “이야기 한 편”이에요. 타이핑 → 값 뽑기 → debounce → 이전 쿼리랑 다를 때만 → switchMap으로 요청. 빈 쿼리면 빈 배열. 옛날에 if문으로 엮던 걸 그림으로 펼쳐 놓은 느낌이죠.
import { fromEvent, of } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators';
const input = document.getElementById('search') as HTMLInputElement;
const results = document.getElementById('results');
const search$ = fromEvent(input, 'input').pipe(
map((event) => (event.target as HTMLInputElement).value),
debounceTime(500),
distinctUntilChanged(),
switchMap((query) => {
if (!query) return of([]);
return fetch(`/api/search?q=${query}`).then((r) => r.json());
})
);
search$.subscribe((items) => {
results.innerHTML = items.map((item) => `<li>${item.name}</li>`).join('');
});
Angular와는 원래 잘 맞아요. 컴포넌트가 죽을 때 async 파이프나 takeUntilDestroyed로 구독을 정리해 주는 흐름이 있어서, “스트림 쌓다가 누수”를 줄이기 좋죠.
import { Component } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { Subject } from 'rxjs';
@Component({
selector: 'app-search',
template: `
<input (input)="search$.next($event.target.value)" />
<ul>
<li *ngFor="let user of users$ | async">{{ user.name }}</li>
</ul>
`,
})
export class SearchComponent {
search$ = new Subject<string>();
users$ = this.search$.pipe(
debounceTime(500),
distinctUntilChanged(),
switchMap((query) => this.http.get(`/api/users?q=${query}`))
);
constructor(private http: HttpClient) {}
}
Cold Observable은 구독할 때마다 안쪽이 다시 돌아가요. 그래서 똑같은 걸 컴포넌트 둘이 각각 구독하면 요청이 두 번 나갈 수 있어요. 그럴 땐 상위에서 한 번 구독해서 내려주거나, share / shareReplay로 “한 소스, 여럿이 share” 쪽을 생각합니다. shareReplay는 최근 n개를 물고 있으니까 설정 한 번 가져온 뒤 여기저기 쓰는 패턴엔 잘 맞는데, refCount를 어떻게 잡느냐에 따라 메모리 이야기가 갈려요.
스케줄러(언제 큐에 탈지)는 Zone이랑 같이 쓸 때 순서 꼬이면 테스트만 죽는 경우가 많아요. UI는 가능하면 rAF 쪽, 무거운 건 쪼개서 돌리는 식.
switchMap이 안쪽을 정리해 준다고 해도, 직접 건 타이머·리스너는 finalize로 정리해 줘야 해요. HTTP는 AbortSignal이랑 엮으면 “이제 이건 버려”가 더 선명해집니다.
import { fromEvent, switchMap, EMPTY } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, debounceTime, map } from 'rxjs/operators';
const input = document.getElementById('q') as HTMLInputElement;
fromEvent(input, 'input').pipe(
map((e) => (e.target as HTMLInputElement).value),
debounceTime(300),
switchMap((q) =>
q.trim() === ''
? EMPTY
: ajax.getJSON(`/api/search?q=${encodeURIComponent(q)}`).pipe(
catchError(() => EMPTY)
)
)
).subscribe(render);
막막할 때는 이렇게 보면 돼요. 아무것도 안 온다—Cold인데 subscribe를 안 했거나 EMPTY/NEVER를 탔는지. 요청이 두 번—Cold를 둘이 따로 먹었는지, share가 필요한지. 메모리 경고—interval이나 DOM이 destroy될 때 안 끊겼는지, Angular면 takeUntilDestroyed 쪽. switch 썼는데도 옛 응답—Promise를 그냥 await만 하고 스트림으로 안 감쌌는지. 테스트만 터진다—가상 시간이 없어서. 표로 정리하던 걸 머릿속에서 이렇게 흘리면 됩니다.
Promise는 한 번, Observable은 0~무한, 끊을 수 있어서 그림이 달라요. 학습 곡선은 있어요. 대신 시간과 이벤트가 겹치는 UI에 한 번 익혀 두면, 나중에 Angular 말고 다른 데서도 “아 이건 흐름이네”가 빨리 보이기 시작해요. React 쪽에선 그냥 rxjs만 설치해 써도 되는데, 팀이 hooks 중심이면 “왜 이 프로젝트만 우주기지냐”는 질문은 받을 수 있어요. 그때는 정말 스트림이 필요한지, 아니면 Promise로 끊을 수 있는지 먼저 합의하는 게 좋고요.
나머지로 비슷한 맥락은 Angular 완벽 가이드, Redux 완벽 가이드, TypeScript 쪽이랑 같이 보면 돼요. 배포하실 때 git add → git commit → git push 하고 npm run deploy 순서만 헷갈리지 마세요.
RxJS, Reactive Programming, Observable, Operators, TypeScript, Angular, Frontend 쪽 키워드로 찾아오시면 이 글도 같이 잡힐 거예요.