본문으로 건너뛰기
Previous
Next
RxJS 완벽 가이드 | 반응형 프로그래밍·Observable·Operators·실전 활용

RxJS 완벽 가이드 | 반응형 프로그래밍·Observable·Operators·실전 활용

RxJS 완벽 가이드 | 반응형 프로그래밍·Observable·Operators·실전 활용

이 글의 핵심

RxJS로 반응형 프로그래밍을 구현하는 완벽 가이드. Observable, Operators, Subject, Scheduler까지 실전 예제로 정리. RxJS·Reactive Programming·Observable 중심으로 설명합니다.

한참 전에, 검색창에 타이핑할 때마다 API를 때리다가 서버에 혼났던 적이 있어요. inputaddEventListener를 몇 겹 쌓고, debounce는 손으로 짜고, 이전 요청을 취소할 생각은 못 하고… 그때 “아, 이건 시간에 따라 흐름이 있다”는 걸 머리로는 알았는데 코드로는 계속 if문 덩어리로만 썼죠. 그게 반응형 프로그래밍이 처음과 만나는 지점이에요. 이벤트·시간·네트워크가 겹칠수록 “지금 무슨 시퀀스가 돌고 있지?”를 선언해 두고 싶어지는 거죠.

솔직히 말하면, 비동기면 Promise로 충분할 때 많아요. 한 번만 resolve되고, 취소가 없고, 흐름이 직선이면 async/await가 제일 읽기 쉽습니다. RxJS는 그 다음 단계예요. 여러 값이 이어질 수 있고, 중간에 끊을 수 있고, 여러 스트림을 섞을 수 있을 때 이 도구를 꺼내면 이득이 큽니다. “모든 API 호출에 Observable”로 가면 팀이 미칩니다. 딱 닿는 경계—검색 자동완성, WebSocket, 폼·라우트 이벤트가 뒤엉킨 화면—에 두는 게 맞아요.

RxJS는 그걸 Observable이라는 스트림과, map·filter 같은 오퍼레이터로 이어 붙이는 방식으로 풀어요. offrom으로 출발지를 만들고, interval·timer는 시간, fromEvent는 DOM이나 Node 이벤트를 그대로 스트림으로 바꿉니다.

import { of, from } from 'rxjs';
// of: 값으로 생성
const source$ = of(1, 2, 3, 4, 5);
source$.subscribe((value) => console.log(value));
// from: Array, Promise, Iterable로 생성
const array$ = from([1, 2, 3]);
const promise$ = from(fetch('/api/users'));
import { interval, timer } from 'rxjs';
// 1초마다 emit
const interval$ = interval(1000);
interval$.subscribe((value) => console.log(value));
// 3초 후 한 번만 emit
const timer$ = timer(3000);
timer$.subscribe(() => console.log('Timer!'));
import { fromEvent } from 'rxjs';
const button = document.getElementById('btn');
const click$ = fromEvent(button, 'click');
click$.subscribe(() => console.log('Clicked!'));

중간에 값을 갈고 닦는 건 pipe 안에서 해요. 짝수만 골라서 열 곱한다든지, 검색어를 debounce하고 직전 값과 다를 때만 넘긴다든지.

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const source$ = of(1, 2, 3, 4, 5);
source$
  .pipe(
    filter((x) => x % 2 === 0),
    map((x) => x * 10)
  )
  .subscribe((value) => console.log(value));
// 20, 40
import { fromEvent } from 'rxjs';
import { debounceTime, distinctUntilChanged, map } from 'rxjs/operators';
const input = document.getElementById('search');
const search$ = fromEvent(input, 'input');
search$
  .pipe(
    map((event) => (event.target as HTMLInputElement).value),
    debounceTime(500),
    distinctUntilChanged()
  )
  .subscribe((value) => {
    console.log('Search:', value);
  });

요청이 겹칠 때는 switchMap이 제일 잘 익숙해져요. 새 입력이 오면 안쪽 subscription을 갈아끼우니까 이전 HTTP가 살아 있으면 경쟁 상태(레이스)가 덜해요. 반대로 다 병렬로 때리고 싶으면 mergeMap, 줄 세우고 싶으면 concatMap 쪽 느낌이에요.

import { fromEvent, of } from 'rxjs';
import { switchMap, mergeMap, concatMap } from 'rxjs/operators';
const button = document.getElementById('btn');
const click$ = fromEvent(button, 'click');
// switchMap: 이전 요청 취소
click$
  .pipe(
    switchMap(() => fetch('/api/users').then((r) => r.json()))
  )
  .subscribe((users) => console.log(users));
// mergeMap: 병렬 실행
click$
  .pipe(
    mergeMap(() => fetch('/api/users').then((r) => r.json()))
  )
  .subscribe((users) => console.log(users));
// concatMap: 순차 실행
click$
  .pipe(
    concatMap(() => fetch('/api/users').then((r) => r.json()))
  )
  .subscribe((users) => console.log(users));

Subject는 “내가 next로 밀어 넣는 스트림”이에요. 싱크대에서 물 뿌리듯이 여리 여리 구독자한테 뿌릴 수 있죠. BehaviorSubject는 마지막 값을 들고 있어서 “지금 상태가 뭔지”를 대표로 쓰기 좋고, ReplaySubject는 늦게 온 애한테도 최근 n개를 되감아 보여줘요.

import { Subject } from 'rxjs';
const subject$ = new Subject<number>();
subject$.subscribe((value) => console.log('A:', value));
subject$.subscribe((value) => console.log('B:', value));
subject$.next(1);
subject$.next(2);
// A: 1, B: 1, A: 2, B: 2
import { BehaviorSubject } from 'rxjs';
const subject$ = new BehaviorSubject<number>(0);
subject$.subscribe((value) => console.log('A:', value));
// A: 0
subject$.next(1);
subject$.next(2);
subject$.subscribe((value) => console.log('B:', value));
// B: 2
import { ReplaySubject } from 'rxjs';
const subject$ = new ReplaySubject<number>(2);
subject$.next(1);
subject$.next(2);
subject$.next(3);
subject$.subscribe((value) => console.log('A:', value));
// A: 2, A: 3

에러는 스트림 안에서 catchError로 흡수하거나, retry로 몇 번 더 굴려보다가 실패 시 fallback 스트림으로 돌릴 수 있어요. 다만 retry를 POST에 막 쓰면 멱등 아닌 요청이 두 번 갈 수 있으니, 운 정책은 꼭 같이 봐야 합니다.

import { of, throwError } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';
const source$ = throwError(() => new Error('Error!'));
source$
  .pipe(
    retry(3),
    catchError((error) => {
      console.error('Error:', error);
      return of('Fallback value');
    })
  )
  .subscribe((value) => console.log(value));

검색 자동완성은 위 조합이 그대로 “이야기 한 편”이에요. 타이핑 → 값 뽑기 → debounce → 이전 쿼리랑 다를 때만 → switchMap으로 요청. 빈 쿼리면 빈 배열. 옛날에 if문으로 엮던 걸 그림으로 펼쳐 놓은 느낌이죠.

import { fromEvent, of } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, map } from 'rxjs/operators';
const input = document.getElementById('search') as HTMLInputElement;
const results = document.getElementById('results');
const search$ = fromEvent(input, 'input').pipe(
  map((event) => (event.target as HTMLInputElement).value),
  debounceTime(500),
  distinctUntilChanged(),
  switchMap((query) => {
    if (!query) return of([]);
    return fetch(`/api/search?q=${query}`).then((r) => r.json());
  })
);
search$.subscribe((items) => {
  results.innerHTML = items.map((item) => `<li>${item.name}</li>`).join('');
});

Angular와는 원래 잘 맞아요. 컴포넌트가 죽을 때 async 파이프나 takeUntilDestroyed로 구독을 정리해 주는 흐름이 있어서, “스트림 쌓다가 누수”를 줄이기 좋죠.

import { Component } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { Subject } from 'rxjs';
@Component({
  selector: 'app-search',
  template: `
    <input (input)="search$.next($event.target.value)" />
    <ul>
      <li *ngFor="let user of users$ | async">{{ user.name }}</li>
    </ul>
  `,
})
export class SearchComponent {
  search$ = new Subject<string>();
  users$ = this.search$.pipe(
    debounceTime(500),
    distinctUntilChanged(),
    switchMap((query) => this.http.get(`/api/users?q=${query}`))
  );
  constructor(private http: HttpClient) {}
}

Cold Observable은 구독할 때마다 안쪽이 다시 돌아가요. 그래서 똑같은 걸 컴포넌트 둘이 각각 구독하면 요청이 두 번 나갈 수 있어요. 그럴 땐 상위에서 한 번 구독해서 내려주거나, share / shareReplay로 “한 소스, 여럿이 share” 쪽을 생각합니다. shareReplay는 최근 n개를 물고 있으니까 설정 한 번 가져온 뒤 여기저기 쓰는 패턴엔 잘 맞는데, refCount를 어떻게 잡느냐에 따라 메모리 이야기가 갈려요.

스케줄러(언제 큐에 탈지)는 Zone이랑 같이 쓸 때 순서 꼬이면 테스트만 죽는 경우가 많아요. UI는 가능하면 rAF 쪽, 무거운 건 쪼개서 돌리는 식.

switchMap이 안쪽을 정리해 준다고 해도, 직접 건 타이머·리스너finalize로 정리해 줘야 해요. HTTP는 AbortSignal이랑 엮으면 “이제 이건 버려”가 더 선명해집니다.

import { fromEvent, switchMap, EMPTY } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, debounceTime, map } from 'rxjs/operators';

const input = document.getElementById('q') as HTMLInputElement;

fromEvent(input, 'input').pipe(
  map((e) => (e.target as HTMLInputElement).value),
  debounceTime(300),
  switchMap((q) =>
    q.trim() === ''
      ? EMPTY
      : ajax.getJSON(`/api/search?q=${encodeURIComponent(q)}`).pipe(
          catchError(() => EMPTY)
        )
  )
).subscribe(render);

막막할 때는 이렇게 보면 돼요. 아무것도 안 온다—Cold인데 subscribe를 안 했거나 EMPTY/NEVER를 탔는지. 요청이 두 번—Cold를 둘이 따로 먹었는지, share가 필요한지. 메모리 경고interval이나 DOM이 destroy될 때 안 끊겼는지, Angular면 takeUntilDestroyed 쪽. switch 썼는데도 옛 응답—Promise를 그냥 await만 하고 스트림으로 안 감쌌는지. 테스트만 터진다—가상 시간이 없어서. 표로 정리하던 걸 머릿속에서 이렇게 흘리면 됩니다.

Promise는 한 번, Observable은 0~무한, 끊을 수 있어서 그림이 달라요. 학습 곡선은 있어요. 대신 시간과 이벤트가 겹치는 UI에 한 번 익혀 두면, 나중에 Angular 말고 다른 데서도 “아 이건 흐름이네”가 빨리 보이기 시작해요. React 쪽에선 그냥 rxjs만 설치해 써도 되는데, 팀이 hooks 중심이면 “왜 이 프로젝트만 우주기지냐”는 질문은 받을 수 있어요. 그때는 정말 스트림이 필요한지, 아니면 Promise로 끊을 수 있는지 먼저 합의하는 게 좋고요.

나머지로 비슷한 맥락은 Angular 완벽 가이드, Redux 완벽 가이드, TypeScript 쪽이랑 같이 보면 돼요. 배포하실 때 git addgit commitgit push 하고 npm run deploy 순서만 헷갈리지 마세요.

RxJS, Reactive Programming, Observable, Operators, TypeScript, Angular, Frontend 쪽 키워드로 찾아오시면 이 글도 같이 잡힐 거예요.