본문으로 건너뛰기
Previous
Next
RabbitMQ 완벽 가이드 | 메시지 큐·Exchange

RabbitMQ 완벽 가이드 | 메시지 큐·Exchange

RabbitMQ 완벽 가이드 | 메시지 큐·Exchange

이 글의 핵심

RabbitMQ 완벽 가이드에 대해 정리한 개발 블로그 글입니다. RabbitMQ로 메시지 큐 시스템을 구축하는 완벽 가이드입니다. Exchange, Queue, Binding, Routing, Worker Pattern, 고가용성까지 실전 예제로 정리했습니다. > 실무 경험 공유:… 개념과 예제 코드를 단계적으로 다루며, 실무·학습에 참고할 수 있도록 구성했습니다. 관련 키워드:…

이 글의 핵심

RabbitMQ로 메시지 큐 시스템을 구축하는 완벽 가이드입니다. Exchange, Queue, Binding, Routing, Worker Pattern, 고가용성까지 실전 예제로 정리했습니다.

실무 경험 공유: 마이크로서비스 간 통신을 RabbitMQ로 구축하면서, 시스템 안정성을 크게 향상시키고 비동기 처리로 응답 속도를 3배 개선한 경험을 공유합니다.

들어가며: “서비스 간 통신이 복잡해요”

실무 문제 시나리오

시나리오 1: API 호출이 실패하면 데이터가 유실돼요

동기 호출은 실패 시 재시도가 어렵습니다. RabbitMQ는 메시지를 보장합니다. 시나리오 2: 느린 작업이 전체를 막아요

이메일 전송이 5초 걸립니다. RabbitMQ로 비동기 처리합니다. 시나리오 3: 서비스 간 결합도가 높아요

직접 호출하면 의존성이 생깁니다. RabbitMQ로 분리합니다.

1. RabbitMQ란?

핵심 특징

RabbitMQ는 메시지 브로커입니다. 주요 개념:

  • Producer: 메시지 발행
  • Exchange: 메시지 라우팅
  • Queue: 메시지 저장
  • Consumer: 메시지 소비
  • Binding: Exchange와 Queue 연결 사용 사례:
  • 비동기 작업 처리
  • 마이크로서비스 통신
  • 이벤트 기반 아키텍처

2. 설치

Docker

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

Management UI: http://localhost:15672 (guest/guest)

3. Node.js 클라이언트

설치

npm install amqplib

Producer (발행자)

// producer.ts
import amqp from 'amqplib';
async function sendMessage() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'tasks';
  const message = JSON.stringify({
    type: 'send_email',
    to: '[email protected]',
    subject: 'Hello',
  });
  await channel.assertQueue(queue, { durable: true });
  channel.sendToQueue(queue, Buffer.from(message), {
    persistent: true,
  });
  console.log('Sent:', message);
  setTimeout(() => {
    connection.close();
  }, 500);
}
sendMessage();

Consumer (소비자)

// consumer.ts
import amqp from 'amqplib';
async function consumeMessages() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'tasks';
  await channel.assertQueue(queue, { durable: true });
  channel.prefetch(1);  // 한 번에 하나씩 처리
  console.log('Waiting for messages...');
  channel.consume(queue, async (msg) => {
    if (msg) {
      const content = JSON.parse(msg.content.toString());
      console.log('Received:', content);
      // 작업 처리
      await processTask(content);
      // ACK (처리 완료)
      channel.ack(msg);
    }
  });
}
async function processTask(task: any) {
  console.log('Processing:', task);
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log('Done');
}
consumeMessages();

4. Exchange 타입

Direct Exchange

// Producer
await channel.assertExchange('logs', 'direct', { durable: true });
channel.publish('logs', 'error', Buffer.from('Error log'));
channel.publish('logs', 'info', Buffer.from('Info log'));
// Consumer (error만 받음)
await channel.assertExchange('logs', 'direct', { durable: true });
const queue = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue.queue, 'logs', 'error');
channel.consume(queue.queue, (msg) => {
  if (msg) {
    console.log('Error:', msg.content.toString());
    channel.ack(msg);
  }
});

Fanout Exchange (브로드캐스트)

// Producer
await channel.assertExchange('notifications', 'fanout', { durable: true });
channel.publish('notifications', '', Buffer.from('New notification'));
// Consumer 1
const queue1 = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue1.queue, 'notifications', '');
// Consumer 2
const queue2 = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue2.queue, 'notifications', '');

Topic Exchange (패턴 매칭)

// Producer
await channel.assertExchange('events', 'topic', { durable: true });
channel.publish('events', 'user.created', Buffer.from('User created'));
channel.publish('events', 'user.deleted', Buffer.from('User deleted'));
channel.publish('events', 'order.created', Buffer.from('Order created'));
// Consumer (user.* 패턴)
const queue = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue.queue, 'events', 'user.*');

4.1 AMQP 0-9-1 내부: 채널·프레임·흐름 제어

RabbitMQ 클라이언트가 amqp.connect 이후 createChannel을 호출하면, TCP 하나 위에 가상 연결인 채널이 다수 열립니다. 메시지는 프레임(frame) 단위로 오가며, publish는 기본적으로 비동기입니다. 그래서 publish 직후 바로 connection.close()를 호출하면 버퍼에 남은 프레임이 유실될 수 있어, 짧은 지연이나 Publisher Confirm 패턴이 필요합니다.

흐름 제어(prefetch) 는 QoS 프레임으로 브로커에 “이 컨슈머에게는 미확인 메시지를 최대 n개까지”라고 알립니다. prefetch(1)은 공정한 작업 분배에 유리하지만, 메시지 처리가 매우 짧으면 스루풋이 떨어질 수 있습니다. 반대로 prefetch를 크게 잡으면 한 워커에 긴 작업이 몰려 다른 큐 소비가 지연될 수 있으므로, 작업 길이 분포를 보고 조정합니다.


4.2 프로덕션 패턴: DLX, TTL, Quorum Queue

죽은 편지 큐(Dead Letter Exchange, DLX) 는 처리 실패·거부·TTL 초과 메시지를 별도 큐로 모아 운영자가 재처리·분석할 수 있게 합니다. basic.nack(requeue=false)만으로는 “어디로 갔는지”가 불명확할 수 있어, DLX + DLQ는 관측 가능성을 크게 높입니다.

RabbitMQ 3.8 이후 권장되는 Quorum Queue복제된 Raft 기반 큐로, 클러스터에서 가용성·내구성 트레이드오프가 클래식 미러링과 다릅니다. 트랜잭션 메일·결제 알림처럼 절대 유실되면 안 되는 워크로드는 정책(클래식 durable + 미러 / 쿼럼)을 명시적으로 선택하고, 메시지 크기·TTL을 제한해 브로커 디스크를 보호합니다.

// DLX/DLQ 최소 예: 큐 선언 시 deadLetterExchange 지정 (브로커·정책에 맞게 조정)
await channel.assertExchange('tasks.dlx', 'direct', { durable: true });
await channel.assertQueue('tasks.dlq', { durable: true });
await channel.bindQueue('tasks.dlq', 'tasks.dlx', 'failed');

await channel.assertQueue('tasks', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': 'tasks.dlx',
    'x-dead-letter-routing-key': 'failed',
    'x-message-ttl': 3600000, // 1h — 운영 정책에 맞게
  },
});

5. Worker Pattern

여러 Worker

// worker.ts
import amqp from 'amqplib';
async function startWorker(workerId: number) {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'tasks';
  await channel.assertQueue(queue, { durable: true });
  channel.prefetch(1);
  console.log(`Worker ${workerId} started`);
  channel.consume(queue, async (msg) => {
    if (msg) {
      const task = JSON.parse(msg.content.toString());
      console.log(`Worker ${workerId} processing:`, task);
      await processTask(task);
      channel.ack(msg);
    }
  });
}
// 3개의 Worker 시작
for (let i = 1; i <= 3; i++) {
  startWorker(i);
}

6. 실전 예제: 이메일 전송 시스템

Producer (API 서버)

// api/send-email.ts
import express from 'express';
import amqp from 'amqplib';
const app = express();
app.use(express.json());
let channel: amqp.Channel;
async function setupRabbitMQ() {
  const connection = await amqp.connect('amqp://localhost');
  channel = await connection.createChannel();
  await channel.assertQueue('emails', { durable: true });
}
app.post('/api/send-email', async (req, res) => {
  const { to, subject, body } = req.body;
  channel.sendToQueue(
    'emails',
    Buffer.from(JSON.stringify({ to, subject, body })),
    { persistent: true }
  );
  res.json({ message: 'Email queued' });
});
setupRabbitMQ().then(() => {
  app.listen(3000, () => console.log('API server running'));
});

Consumer (Email Worker)

// workers/email-worker.ts
import amqp from 'amqplib';
import nodemailer from 'nodemailer';
const transporter = nodemailer.createTransport({
  host: 'smtp.gmail.com',
  port: 587,
  auth: {
    user: process.env.EMAIL_USER,
    pass: process.env.EMAIL_PASS,
  },
});
async function startEmailWorker() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  await channel.assertQueue('emails', { durable: true });
  channel.prefetch(1);
  console.log('Email worker started');
  channel.consume('emails', async (msg) => {
    if (msg) {
      const { to, subject, body } = JSON.parse(msg.content.toString());
      try {
        await transporter.sendMail({ to, subject, text: body });
        console.log('Email sent to:', to);
        channel.ack(msg);
      } catch (error) {
        console.error('Failed to send email:', error);
        channel.nack(msg, false, true);  // 재시도
      }
    }
  });
}
startEmailWorker();

7. 고가용성

클러스터

# 노드 1
docker run -d --hostname rabbit1 --name rabbit1 \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_ERLANG_COOKIE='secret' \
  rabbitmq:3-management
# 노드 2
docker run -d --hostname rabbit2 --name rabbit2 \
  -p 5673:5672 -p 15673:15672 \
  -e RABBITMQ_ERLANG_COOKIE='secret' \
  --link rabbit1:rabbit1 \
  rabbitmq:3-management
# 클러스터 조인
docker exec -it rabbit2 rabbitmqctl stop_app
docker exec -it rabbit2 rabbitmqctl join_cluster rabbit@rabbit1
docker exec -it rabbit2 rabbitmqctl start_app

8. 모니터링

Management Plugin

# 플러그인 활성화
rabbitmq-plugins enable rabbitmq_management
# API로 확인
curl -u guest:guest http://localhost:15672/api/overview

취업·면접과 연결하기

Exchange·Queue·라우팅·워커 패턴은 메시지 큐·마이크로서비스 면접과 잘 맞습니다. 기술 면접 완벽 대비 가이드와, 과제·알고 병행은 코딩 테스트 완벽 대비 가이드를 함께 보세요.


9. 트러블슈팅

증상흔한 원인대응
메시지가 소비되지 않음noAck: true로 consume했거나, prefetch로 막힘ack/nack 경로 확인, channel.checkQueue로 backlog
중복 처리네트워크 끊김 후 재전달, at-least-once 특성멱등 키·아웃박스 패턴
브로커 메모리 급증메시지가 쌓이고 소비 속도 < 생산 속도소비자 스케일아웃, TTL·max-length 정책
“RESOURCE_LOCKED” 등미러링·쿼럼 정책 불일치클러스터 버전·정책 문서 점검

정리 및 체크리스트

핵심 요약

  • RabbitMQ: 메시지 브로커
  • Exchange: 메시지 라우팅
  • Queue: 메시지 저장
  • Worker Pattern: 병렬 처리
  • 고가용성: 클러스터 지원
  • AMQP: 표준 프로토콜

프로덕션 체크리스트

  • RabbitMQ 설치
  • Exchange 및 Queue 설계
  • Producer 구현
  • Consumer 구현
  • 에러 처리 및 재시도
  • 클러스터 구성
  • 모니터링 설정

같이 보면 좋은 글


이 글에서 다루는 키워드

RabbitMQ, Message Queue, AMQP, Microservices, Async, Backend

자주 묻는 질문 (FAQ)

Q. RabbitMQ vs Kafka, 어떤 게 나은가요?

A. RabbitMQ는 메시지 브로커입니다. Kafka는 이벤트 스트리밍 플랫폼입니다. 작업 큐는 RabbitMQ, 로그 수집은 Kafka를 권장합니다.

Q. Redis Pub/Sub vs RabbitMQ, 차이가 뭔가요?

A. Redis는 메시지를 저장하지 않습니다. RabbitMQ는 메시지를 보장합니다. 중요한 메시지는 RabbitMQ를 사용하세요.

Q. 메시지 순서가 보장되나요?

A. 단일 Queue에서는 순서가 보장됩니다. 여러 Consumer가 있으면 순서가 보장되지 않습니다.

Q. 프로덕션에서 사용해도 되나요?

A. 네, Instagram, Reddit 등 많은 기업에서 사용합니다.

심화 부록: 구현·운영 관점

이 부록은 앞선 본문에서 다룬 주제(「RabbitMQ 완벽 가이드 | 메시지 큐·Exchange·Queue·Routing·Worker Pattern」)를 구현·런타임·운영 관점에서 다시 압축합니다. 도메인별 세부 구현은 글마다 다르지만, 입력 검증 → 핵심 연산 → 부작용(I/O·네트워크·동시성) → 관측의 흐름으로 장애를 나누면 원인 추적이 빨라집니다.

내부 동작과 핵심 메커니즘

flowchart TD
  A[입력·요청·이벤트] --> B[파싱·검증·디코딩]
  B --> C[핵심 연산·상태 전이]
  C --> D[부작용: I/O·네트워크·동시성]
  D --> E[결과·관측·저장]
sequenceDiagram
  participant C as 클라이언트/호출자
  participant B as 경계(런타임·게이트웨이·프로세스)
  participant D as 의존성(API·DB·큐·파일)
  C->>B: 요청/이벤트
  B->>D: 조회·쓰기·RPC
  D-->>B: 지연·부분 실패·재시도 가능
  B-->>C: 응답 또는 오류(코드·상관 ID)
  • 불변 조건(Invariant): 버퍼 경계, 프로토콜 상태, 트랜잭션 격리, FD 상한 등 단계별로 문장으로 적어 두면 디버깅 비용이 줄어듭니다.
  • 결정성: 순수 층과 시간·네트워크·스케줄에 의존하는 층을 분리해야 테스트와 장애 분석이 쉬워집니다.
  • 경계 비용: 직렬화, 인코딩, syscall 횟수, 락 경합, 할당·GC, 캐시 미스를 의심 목록에 둡니다.
  • 백프레셔: 생산자가 소비자보다 빠를 때 버퍼·큐·스트림에서 속도를 줄이는 신호를 어디에 둘지 정의합니다.

프로덕션 운영 패턴

영역운영 관점 질문
관측성요청 단위 상관 ID, 에러율·지연 p95/p99, 의존성 타임아웃·재시도가 대시보드에 보이는가
안전성입력 검증·권한·비밀·감사 로그가 코드 경로마다 일관적인가
신뢰성재시도는 멱등 연산에만 적용되는가, 서킷 브레이커·백오프·DLQ가 있는가
성능캐시·배치 크기·커넥션 풀·인덱스·백프레셔가 데이터 규모에 맞는가
배포롤백 룬북, 카나리/블루그린, 마이그레이션·피처 플래그가 문서화되어 있는가
용량피크 트래픽·디스크·FD·스레드 풀 상한을 주기적으로 검증하는가

스테이징은 데이터 양·네트워크 RTT·동시성을 프로덕션에 가깝게 맞출수록 재현율이 올라갑니다.

확장 예시: 엔드투엔드 미니 시나리오

앞선 본문 주제(「RabbitMQ 완벽 가이드 | 메시지 큐·Exchange·Queue·Routing·Worker Pattern」)를 배포·운영 흐름에 맞춰 옮긴 체크리스트입니다. 도메인에 맞게 단계 이름만 바꿔 적용할 수 있습니다.

  1. 입력 계약 고정: 스키마·버전·최대 페이로드·타임아웃·에러 코드를 경계에 둔다.
  2. 핵심 경로 계측: 요청 ID, 단계별 지연, 외부 호출 결과 코드를 로그·메트릭·트레이스에서 한 흐름으로 본다.
  3. 실패 주입: 의존성 타임아웃·5xx·부분 데이터·락 대기를 스테이징에서 재현한다.
  4. 호환·롤백: 설정/마이그레이션/클라이언트 버전을 되돌릴 수 있는지 확인한다.
  5. 부하 후 검증: 피크 대비 p95/p99, 에러율, 리소스 상한, 알림 임계값을 점검한다.
handle(request):
  ctx = newCorrelationId()
  validated = validateSchema(request)
  authorize(validated, ctx)
  result = domainCore(validated)
  persistOrEmit(result, idempotentKey)
  recordMetrics(ctx, latency, outcome)
  return result

문제 해결(Troubleshooting)

증상가능 원인조치
간헐적 실패레이스, 타임아웃, 외부 의존성, DNS최소 재현 스크립트, 분산 트레이스·로그 상관관계, 재시도·서킷 설정 점검
성능 저하N+1, 동기 I/O, 락 경합, 과도한 직렬화, 캐시 미스프로파일러·APM으로 핫스팟 확인 후 한 가지씩 제거
메모리 증가캐시 무제한, 구독/리스너 누수, 대용량 버퍼, 커넥션 미반납상한·TTL·힙/FD 스냅샷 비교
빌드·배포만 실패환경 변수, 권한, 플랫폼 차이, lockfileCI 로그와 로컬 diff, 런타임·이미지 버전 핀
설정 불일치프로필·시크릿·기본값, 리전스키마 검증된 설정 단일 소스와 배포 매트릭스 표준화
데이터 불일치비멱등 재시도, 부분 쓰기, 캐시 무효화 누락멱등 키·아웃박스·트랜잭션 경계 재검토

권장 순서: (1) 최소 재현 (2) 최근 변경 범위 축소 (3) 환경·의존성 차이 (4) 관측으로 가설 검증 (5) 수정 후 회귀·부하 테스트.

배포 전에는 git addgit commitgit pushnpm run deploy 순서를 권장합니다.