메시지 큐·Kafka·NATS 완벽 가이드 | 분산 시스템 메시징 비교

메시지 큐·Kafka·NATS 완벽 가이드 | 분산 시스템 메시징 비교

이 글의 핵심

메시지 큐(RabbitMQ, Redis), Kafka, NATS의 동작 원리와 실전 활용법. 비동기 처리, 이벤트 스트리밍, 고성능 메시징을 실무 예제로 이해하고 적용하는 완벽 가이드.

들어가며: 메시징 시스템의 세 가지 접근법

현대 분산 시스템에서 메시징은 핵심 인프라입니다. 이 글에서는 각기 다른 철학과 강점을 가진 세 가지 메시징 시스템을 비교합니다:

  1. 전통적 메시지 큐 (RabbitMQ, Redis): 작업 큐와 복잡한 라우팅
  2. Kafka: 대용량 이벤트 스트리밍과 데이터 파이프라인
  3. NATS: 초고속 클라우드 네이티브 메시징

이 글에서 배울 내용:

  • 메시지 큐의 개념과 RabbitMQ, Redis Queue 비교
  • Kafka의 아키텍처와 이벤트 스트리밍
  • NATS의 동작 원리와 JetStream
  • 실무 시나리오와 선택 가이드

목차

  1. 메시지 큐(Message Queue) 기초
  2. RabbitMQ vs Redis Queue
  3. Apache Kafka 완벽 가이드
  4. NATS: 클라우드 네이티브 메시징
  5. 메시징 시스템 비교와 선택
  6. 실전 시나리오와 아키텍처
  7. 베스트 프랙티스

1. 메시지 큐(Message Queue) 기초

메시지 큐란?

메시지 큐는 애플리케이션 간에 메시지를 비동기로 전달하는 중간 저장소입니다. 생산자(Producer)가 메시지를 큐에 넣으면, 소비자(Consumer)가 나중에 꺼내서 처리합니다.

flowchart LR
    P1[Producer 1] --> MQ[Message Queue]
    P2[Producer 2] --> MQ
    MQ --> C1[Consumer 1]
    MQ --> C2[Consumer 2]
    MQ --> C3[Consumer 3]

왜 메시지 큐가 필요한가?

시나리오 1: 이메일 발송

문제: 사용자가 회원가입 버튼을 누르면 이메일 발송에 3초가 걸려 응답이 느립니다.

# ❌ 동기 처리 (나쁜 예)
def signup(request):
    user = create_user(request.data)
    send_welcome_email(user.email)  # 3초 대기
    return Response({"message": "가입 완료"})

해결: 메시지 큐에 이메일 작업을 넣고 즉시 응답합니다.

# ✅ 비동기 처리 (좋은 예)
def signup(request):
    user = create_user(request.data)
    queue.publish("email_queue", {
        "to": user.email,
        "template": "welcome"
    })
    return Response({"message": "가입 완료"})  # 즉시 응답

# 별도 워커가 처리
def email_worker():
    while True:
        msg = queue.consume("email_queue")
        send_welcome_email(msg["to"], msg["template"])

시나리오 2: 부하 분산

문제: 주문이 몰리면 서버가 다운됩니다.

해결: 메시지 큐가 버퍼 역할을 하여 워커가 처리할 수 있는 속도로 작업을 분배합니다.

sequenceDiagram
    participant Client
    participant API
    participant Queue
    participant Worker

    Client->>API: 주문 요청 (1000건/초)
    API->>Queue: 메시지 발행
    API->>Client: 즉시 응답
    
    Queue->>Worker: 메시지 전달 (100건/초)
    Worker->>Worker: 주문 처리

메시지 큐의 주요 패턴

1. Work Queue (작업 큐)

여러 워커가 작업을 나눠서 처리합니다.

# Producer
for i in range(100):
    queue.publish("tasks", f"task-{i}")

# Consumer (여러 개 실행)
def worker():
    while True:
        task = queue.consume("tasks")
        process_task(task)

2. Pub/Sub (발행/구독)

하나의 메시지를 여러 구독자가 받습니다.

# Publisher
queue.publish("orders", {"order_id": 123, "amount": 50000})

# Subscriber 1: 재고 시스템
def inventory_subscriber():
    queue.subscribe("orders", lambda msg: update_inventory(msg))

# Subscriber 2: 배송 시스템
def shipping_subscriber():
    queue.subscribe("orders", lambda msg: create_shipment(msg))

2. RabbitMQ vs Redis Queue

RabbitMQ

특징:

  • AMQP 프로토콜 기반
  • 복잡한 라우팅 (Exchange, Binding)
  • 메시지 영속성 보장
  • 클러스터링 지원

사용 예시:

import pika

# 연결
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 큐 선언
channel.queue_declare(queue='tasks', durable=True)

# 메시지 발행
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body='Hello World',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 영속성
    ))

# 메시지 소비
def callback(ch, method, properties, body):
    print(f"Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()

Redis Queue

특징:

  • 인메모리 기반 (빠름)
  • 간단한 API (LPUSH, RPOP)
  • Pub/Sub 지원
  • 메시지 유실 가능성 (영속성 설정 필요)

사용 예시:

import redis

r = redis.Redis(host='localhost', port=6379)

# 메시지 발행
r.lpush('tasks', 'task-1')
r.lpush('tasks', 'task-2')

# 메시지 소비
while True:
    task = r.brpop('tasks', timeout=5)
    if task:
        print(f"Processing {task[1]}")

비교표

항목RabbitMQRedis Queue
속도중간매우 빠름
영속성기본 지원설정 필요
라우팅복잡한 라우팅 가능단순
메모리디스크 사용인메모리
사용 사례복잡한 워크플로우간단한 작업 큐

3. Apache Kafka 완벽 가이드

Kafka란?

Apache Kafka는 분산 이벤트 스트리밍 플랫폼으로, 대용량 데이터를 실시간으로 수집, 저장, 처리합니다.

Kafka vs 메시지 큐

항목KafkaRabbitMQ
목적이벤트 스트리밍작업 큐
메시지 저장디스크 (재생 가능)소비 후 삭제
처리량매우 높음 (수백만 msg/s)중간
순서 보장파티션 내 보장큐 내 보장
사용 사례로그 수집, 이벤트 소싱비동기 작업 처리

Kafka 아키텍처

flowchart TB
    subgraph Producers
        P1[Producer 1]
        P2[Producer 2]
    end
    
    subgraph Kafka_Cluster[Kafka Cluster]
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
        
        subgraph Topic_orders[Topic: orders]
            P0[Partition 0]
            P1_[Partition 1]
            P2_[Partition 2]
        end
    end
    
    subgraph Consumers
        C1[Consumer 1]
        C2[Consumer 2]
    end
    
    P1 --> B1
    P2 --> B2
    B1 --> P0
    B2 --> P1_
    B3 --> P2_
    P0 --> C1
    P1_ --> C2
    P2_ --> C2

핵심 개념

1. Topic (토픽)

메시지의 카테고리입니다. 예: user-events, order-logs

2. Partition (파티션)

토픽을 여러 파티션으로 나누어 병렬 처리합니다.

Topic: orders
├── Partition 0: [msg1, msg4, msg7]
├── Partition 1: [msg2, msg5, msg8]
└── Partition 2: [msg3, msg6, msg9]

3. Offset (오프셋)

파티션 내 메시지의 위치입니다. 소비자가 어디까지 읽었는지 추적합니다.

Kafka 사용 예시

Producer (Python)

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 메시지 발행
producer.send('orders', {
    'order_id': 123,
    'user_id': 456,
    'amount': 50000
})

producer.flush()

Consumer (Python)

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='order-processor',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    order = message.value
    print(f"Processing order {order['order_id']}")
    process_order(order)

Kafka의 강력한 기능

1. 메시지 재생 (Replay)

# 특정 오프셋부터 다시 읽기
consumer.seek(partition, offset=100)

2. Consumer Group

여러 소비자가 파티션을 나눠서 처리합니다.

Topic: orders (3 partitions)
Consumer Group: processors
├── Consumer 1 → Partition 0
├── Consumer 2 → Partition 1
└── Consumer 3 → Partition 2

3. 데이터 보존

# 7일간 메시지 보존
log.retention.hours=168

4. NATS: 클라우드 네이티브 메시징

NATS란?

NATS(Neural Autonomic Transport System)는 초고속, 경량 메시징 시스템입니다. 클라우드 네이티브 애플리케이션과 마이크로서비스를 위해 설계되었습니다.

NATS의 특징

  1. 초고속: 나노초 단위 지연시간
  2. 경량: 단일 바이너리 (~20MB)
  3. 간단한 API: Pub/Sub 기본 제공
  4. 확장성: 수백만 연결 지원
  5. 클라우드 네이티브: Kubernetes 친화적

NATS 아키텍처

flowchart TB
    subgraph Publishers
        P1[Publisher 1]
        P2[Publisher 2]
    end
    
    subgraph NATS_Server[NATS Server Cluster]
        N1[NATS Node 1]
        N2[NATS Node 2]
        N3[NATS Node 3]
        N1 <--> N2
        N2 <--> N3
        N3 <--> N1
    end
    
    subgraph Subscribers
        S1[Subscriber 1]
        S2[Subscriber 2]
        S3[Subscriber 3]
    end
    
    P1 --> N1
    P2 --> N2
    N1 --> S1
    N2 --> S2
    N3 --> S3

NATS 메시징 패턴

1. Pub/Sub (발행/구독)

import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    
    # Subscribe
    async def message_handler(msg):
        subject = msg.subject
        data = msg.data.decode()
        print(f"Received: {subject} - {data}")
    
    await nc.subscribe("orders.*", cb=message_handler)
    
    # Publish
    await nc.publish("orders.new", b"Order #123")
    await nc.publish("orders.updated", b"Order #456")
    
    await asyncio.sleep(1)
    await nc.close()

asyncio.run(main())

2. Request/Reply (요청/응답)

async def request_reply():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    
    # Responder
    async def handler(msg):
        await nc.publish(msg.reply, b"Response data")
    
    await nc.subscribe("calculate", cb=handler)
    
    # Requester
    response = await nc.request("calculate", b"10+20", timeout=1)
    print(f"Response: {response.data.decode()}")
    
    await nc.close()

3. Queue Groups (부하 분산)

async def queue_groups():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    
    # 여러 워커가 같은 큐 그룹에 속함
    # 메시지는 그룹 내 한 워커에게만 전달됨
    async def worker(msg):
        print(f"Worker processing: {msg.data.decode()}")
    
    # Worker 1
    await nc.subscribe("tasks", queue="workers", cb=worker)
    # Worker 2
    await nc.subscribe("tasks", queue="workers", cb=worker)
    
    # 메시지 발행 (한 워커만 받음)
    for i in range(10):
        await nc.publish("tasks", f"Task {i}".encode())
    
    await asyncio.sleep(1)
    await nc.close()

JetStream: NATS의 영속성 레이어

JetStream은 NATS에 메시지 영속성, At-Least-Once 전달, 메시지 재생 기능을 추가합니다.

async def jetstream_example():
    nc = NATS()
    await nc.connect("nats://localhost:4222")
    
    # JetStream 컨텍스트
    js = nc.jetstream()
    
    # Stream 생성
    await js.add_stream(name="ORDERS", subjects=["orders.*"])
    
    # 메시지 발행 (영속성)
    ack = await js.publish("orders.new", b"Order #123")
    print(f"Published: {ack.seq}")
    
    # Consumer 생성
    psub = await js.pull_subscribe("orders.*", "order-processor")
    
    # 메시지 소비
    msgs = await psub.fetch(10)
    for msg in msgs:
        print(f"Received: {msg.data.decode()}")
        await msg.ack()
    
    await nc.close()

NATS vs Kafka vs RabbitMQ

항목NATSKafkaRabbitMQ
지연시간나노초밀리초밀리초
처리량매우 높음매우 높음중간
영속성JetStream 필요기본 제공기본 제공
복잡도매우 낮음높음중간
메모리매우 적음많음중간
사용 사례마이크로서비스, IoT로그 수집, 이벤트 소싱작업 큐, 복잡한 라우팅

5. 메시징 시스템 비교와 선택

성능 비교

graph TB
    subgraph Performance[성능 비교]
        A[지연시간]
        B[처리량]
        C[메모리 사용]
    end
    
    subgraph NATS_perf[NATS]
        A1[나노초]
        B1[수백만 msg/s]
        C1[매우 낮음]
    end
    
    subgraph Kafka_perf[Kafka]
        A2[밀리초]
        B2[수백만 msg/s]
        C2[높음]
    end
    
    subgraph RabbitMQ_perf[RabbitMQ]
        A3[밀리초]
        B3[수만 msg/s]
        C3[중간]
    end

선택 가이드

NATS를 선택해야 할 때

적합한 경우:

  • 마이크로서비스 간 통신
  • IoT 디바이스 메시징
  • 실시간 알림 시스템
  • 초저지연이 필요한 경우
  • 클라우드 네이티브 환경

부적합한 경우:

  • 복잡한 메시지 라우팅 필요
  • 대용량 메시지 저장 및 재생
  • 트랜잭션 보장 필요

Kafka를 선택해야 할 때

적합한 경우:

  • 로그 수집 및 분석
  • 이벤트 소싱
  • 데이터 파이프라인
  • 메시지 재생 필요
  • 대용량 데이터 스트리밍

부적합한 경우:

  • 간단한 작업 큐
  • 초저지연 필요
  • 복잡한 운영 회피

RabbitMQ를 선택해야 할 때

적합한 경우:

  • 복잡한 라우팅 규칙
  • 작업 큐
  • RPC 패턴
  • 메시지 우선순위
  • 트랜잭션 보장

부적합한 경우:

  • 초고속 처리 필요
  • 대용량 스트리밍
  • 메시지 재생 필요

6. 실전 시나리오와 아키텍처

시나리오 1: 마이크로서비스 아키텍처 (NATS)

flowchart LR
    API[API Gateway] --> NATS[NATS Server]
    NATS --> Auth[Auth Service]
    NATS --> Order[Order Service]
    NATS --> Payment[Payment Service]
    NATS --> Inventory[Inventory Service]
    NATS --> Notification[Notification Service]

구성:

  • NATS: 마이크로서비스 간 초고속 통신
  • 각 서비스는 독립적으로 메시지 구독
  • Request/Reply로 동기 호출 구현

시나리오 2: 이벤트 소싱 + CQRS (Kafka)

flowchart TB
    Command[Command Handler] --> Kafka[Kafka]
    Kafka --> EventStore[Event Store]
    Kafka --> Read1[Read Model 1]
    Kafka --> Read2[Read Model 2]
    Kafka --> Read3[Read Model 3]
    
    Read1 --> Query[Query Handler]
    Read2 --> Query
    Read3 --> Query

구성:

  • Kafka: 모든 이벤트를 영구 저장
  • 이벤트 재생으로 Read Model 재구축 가능
  • 여러 Read Model을 독립적으로 유지

시나리오 3: 하이브리드 아키텍처

flowchart TB
    User[사용자] --> API[API Server]
    API --> NATS[NATS]
    API --> RabbitMQ[RabbitMQ]
    
    NATS --> Service1[Realtime Service]
    NATS --> Service2[Notification Service]
    
    RabbitMQ --> Worker1[Email Worker]
    RabbitMQ --> Worker2[Image Worker]
    
    Service1 --> Kafka[Kafka]
    Service2 --> Kafka
    Worker1 --> Kafka
    Worker2 --> Kafka
    
    Kafka --> Analytics[Analytics]
    Kafka --> Warehouse[Data Warehouse]

구성:

  • NATS: 실시간 마이크로서비스 통신
  • RabbitMQ: 비동기 작업 큐
  • Kafka: 이벤트 로그 수집 및 분석

7. 베스트 프랙티스

NATS 베스트 프랙티스

1. Subject 네이밍 규칙

# ✅ 계층적 구조 사용
await nc.publish("orders.created", data)
await nc.publish("orders.updated", data)
await nc.publish("orders.cancelled", data)

# ✅ 와일드카드 구독
await nc.subscribe("orders.*", cb=handler)
await nc.subscribe("orders.>", cb=handler)  # 모든 하위 포함

2. 연결 재시도

async def connect_with_retry():
    options = {
        "servers": ["nats://localhost:4222"],
        "max_reconnect_attempts": -1,  # 무한 재시도
        "reconnect_time_wait": 2,      # 2초 대기
    }
    nc = NATS()
    await nc.connect(**options)
    return nc

3. Graceful Shutdown

async def graceful_shutdown(nc):
    await nc.drain()  # 대기 중인 메시지 처리 후 종료
    await nc.close()

Kafka 베스트 프랙티스

1. 적절한 파티션 수

# 처리량 기반 계산
파티션 = 목표 처리량 / 소비자당 처리량

# 예: 1000 msg/s 목표, 소비자당 100 msg/s
파티션 = 1000 / 100 = 10

2. 키 기반 파티셔닝

# 같은 user_id는 같은 파티션으로
producer.send('orders', 
    key=str(user_id).encode('utf-8'),
    value=order_data
)

3. 오프셋 관리

# 수동 커밋으로 정확한 처리 보장
consumer = KafkaConsumer(
    'orders',
    enable_auto_commit=False
)

for message in consumer:
    try:
        process(message.value)
        consumer.commit()  # 처리 후 커밋
    except Exception as e:
        logger.error(f"Failed: {e}")
        # 커밋하지 않음 (재처리)

RabbitMQ 베스트 프랙티스

1. 멱등성 보장

# ✅ 멱등성 있는 처리
def process_order(order_id):
    if redis.exists(f"processed:{order_id}"):
        return  # 이미 처리됨
    
    # 주문 처리
    create_shipment(order_id)
    redis.setex(f"processed:{order_id}", 3600, "1")

2. 재시도 정책

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), 
       wait=wait_exponential(multiplier=1, min=4, max=10))
def send_email(to, subject, body):
    # 이메일 발송 (실패 시 재시도)
    smtp.send(to, subject, body)

3. Dead Letter Queue

def worker():
    while True:
        try:
            msg = queue.consume("tasks")
            process(msg)
        except Exception as e:
            # 실패한 메시지를 DLQ로 이동
            queue.publish("tasks_dlq", msg)
            logger.error(f"Failed: {e}")

정리

핵심 요약

시스템강점주요 사용 사례
NATS초고속, 경량, 간단마이크로서비스, IoT, 실시간 알림
Kafka대용량, 영속성, 재생로그 수집, 이벤트 소싱, 데이터 파이프라인
RabbitMQ복잡한 라우팅, 신뢰성작업 큐, RPC, 트랜잭션
Redis Queue매우 빠름, 간단간단한 작업 큐, 캐시

선택 플로우차트

flowchart TD
    Start[메시징 시스템 선택] --> Q1{초저지연 필요?}
    Q1 -->|Yes| NATS[NATS]
    Q1 -->|No| Q2{대용량 스트리밍?}
    Q2 -->|Yes| Kafka[Kafka]
    Q2 -->|No| Q3{복잡한 라우팅?}
    Q3 -->|Yes| RabbitMQ[RabbitMQ]
    Q3 -->|No| Redis[Redis Queue]

실무 팁

  1. 작게 시작: Redis Queue나 NATS로 시작하여 필요시 확장
  2. 하이브리드 사용: 각 시스템의 강점을 활용한 조합
  3. 모니터링: 메시지 지연, 처리량, 에러율 추적
  4. 백프레셔: 소비자가 처리할 수 있는 속도로 제한
  5. 멱등성: 중복 처리를 고려한 설계

참고 자료

한 줄 요약: NATS로 초고속 마이크로서비스 통신, Kafka로 대용량 이벤트 스트리밍, RabbitMQ/Redis로 작업 큐를 구현하여 확장 가능한 분산 시스템을 구축할 수 있습니다.

---
... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3