C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드

C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드

이 글의 핵심

RabbitMQ 심화: 토픽 라우팅, 우선순위 큐, Dead Letter Exchange, 메시지 TTL. 실전 문제 시나리오, 완전한 C++ 예제, 흔한 에러 해결, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.

들어가며: “단순 큐만으로는 부족해요”

기본 큐의 한계

RabbitMQ 기본(#52-7)에서 작업 큐를 구현했습니다. 하지만 실무에서는 다음 문제들이 자주 발생합니다.

시나리오 1: 로그 레벨별로 다른 처리
error, warning, info 로그를 한 큐에 넣으면, 에러 알림 서비스와 일반 로그 수집기가 같은 큐를 소비합니다. 에러만 별도 큐로 보내고 싶은데, 기본 큐는 라우팅이 없습니다.

시나리오 2: VIP 주문이 일반 주문 뒤에 밀림
주문 큐에 VIP와 일반 주문이 섞여 들어옵니다. VIP는 5분 내 처리해야 하는데, 앞에 쌓인 일반 주문 때문에 지연됩니다. “우선순위가 있는 큐가 필요해요.”

시나리오 3: 처리 실패 메시지가 무한 재시도
컨슈머가 잘못된 형식의 메시지를 받아 NAK(requeue)하면, 같은 메시지가 계속 돌아와 컨슈머를 죽입니다. “실패한 메시지만 따로 모아서 나중에 분석하고 싶어요.”

시나리오 4: 오래된 메시지는 버리고 싶음
이벤트성 알림(세일 종료 1시간 전)은 1시간이 지나면 의미가 없습니다. 큐에 오래 쌓여 있던 메시지는 자동으로 폐기하고 싶습니다.

시나리오 5: 여러 서비스가 같은 이벤트를 구독
”주문 생성됨” 이벤트를 재고·결제·알림·분석 서비스가 각각 받아야 합니다. 한 큐에 넣으면 한 서비스만 가져가고, 큐를 여러 개 만들면 프로듀서가 여러 번 발행해야 합니다.

이 글에서 다루는 것:

  • 토픽 라우팅: Exchange + routing key로 메시지 분기
  • 우선순위 큐: VIP·긴급 메시지 우선 처리
  • Dead Letter Exchange (DLX): 실패 메시지 자동 수집
  • 메시지 TTL: 만료된 메시지 자동 폐기
  • 자주 발생하는 에러와 해결법
  • 성능 최적화 및 프로덕션 패턴

요구 환경: C++17 이상, RabbitMQ 3.8+, SimpleAmqpClient 또는 rabbitmq-c


실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

목차

  1. 문제 시나리오와 해결 전략
  2. Exchange와 라우팅 개념
  3. 토픽 라우팅 완전 구현
  4. 우선순위 큐
  5. Dead Letter Exchange (DLX)
  6. 메시지 TTL과 큐 TTL
  7. 자주 발생하는 에러와 해결법
  8. 성능 최적화 팁
  9. 프로덕션 패턴
  10. 구현 체크리스트

1. 문제 시나리오와 해결 전략

실전 문제 → RabbitMQ 고급 기능 매핑

문제해결 기능핵심 개념
로그 레벨별 분기 (error/warning/info)토픽 Exchangerouting key 패턴 매칭
VIP·긴급 주문 우선 처리우선순위 큐x-max-priority
실패 메시지 무한 재시도Dead Letter Exchangex-dead-letter-exchange
오래된 메시지 자동 폐기메시지 TTLexpiration, x-message-ttl
한 이벤트를 여러 서비스가 구독Fanout Exchange브로드캐스트
특정 키만 받고 싶음Direct Exchange정확한 routing key 매칭

아키텍처 개요

flowchart TB
    subgraph Producer[프로듀서]
        P1[로그 error]
        P2[로그 warning]
        P3[주문 VIP]
        P4[주문 일반]
    end

    subgraph Exchange["Exchange (토픽/다이렉트)"]
        EX[logs.orders]
    end

    subgraph Queues[큐]
        Q1[error_queue]
        Q2[all_logs_queue]
        Q3[priority_queue]
        Q4[dlq_dead_letter]
    end

    subgraph Consumers[컨슈머]
        C1[에러 알림]
        C2[로그 수집]
        C3[주문 처리]
        C4[실패 분석]
    end

    P1 -->|routing: logs.error| EX
    P2 -->|routing: logs.warning| EX
    P3 -->|priority: 10| EX
    P4 -->|priority: 1| EX
    EX --> Q1
    EX --> Q2
    EX --> Q3
    Q3 -.->|실패 시| Q4
    Q1 --> C1
    Q2 --> C2
    Q3 --> C3
    Q4 --> C4

2. Exchange와 라우팅 개념

Exchange 타입 비교

타입동작사용 사례
directrouting_key가 정확히 일치하는 큐로 전달작업 유형별 분기 (email, sms)
topic패턴 매칭 (* 한 단어, # 0개 이상)로그 레벨·카테고리 (logs.error, order.vip)
fanout모든 바인딩된 큐로 브로드캐스트이벤트 복제 (주문 생성 → N개 서비스)
headers헤더 키-값 매칭복잡한 라우팅 (선택적 사용)

기본 Exchange vs 명시적 Exchange

// 기본 Exchange ("") 사용 시: routing_key = 큐 이름
// - 큐가 미리 존재해야 함
channel->BasicPublish("", "my_queue", msg);

// 명시적 Exchange 사용: Exchange → 바인딩된 큐로 라우팅
channel->BasicPublish("logs_topic", "logs.error", msg);

시퀀스: 토픽 라우팅 흐름

sequenceDiagram
    participant P as 프로듀서
    participant EX as logs_topic (Exchange)
    participant Q1 as error_queue
    participant Q2 as all_logs_queue
    participant C1 as 에러 알림
    participant C2 as 로그 수집

    P->>EX: BasicPublish("logs.error", body)
    EX->>EX: routing_key 매칭
    EX->>Q1: logs.error 바인딩 매칭
    EX->>Q2: logs.# 바인딩 매칭
    Q1->>C1: 메시지 전달
    Q2->>C2: 메시지 전달

3. 토픽 라우팅 완전 구현

3.1 토픽 Exchange 선언 (rabbitmq-c)

SimpleAmqpClient는 Exchange 선언 API가 제한적일 수 있어, rabbitmq-c로 토픽 Exchange를 선언하는 예시를 먼저 보겠습니다.

// topic_exchange_setup.cpp - Exchange + 큐 바인딩
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
#include <cstring>

int main() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    if (amqp_socket_open(socket, "localhost", 5672) != AMQP_STATUS_OK) {
        std::cerr << "연결 실패" << std::endl;
        return 1;
    }

    amqp_rpc_reply_t reply = amqp_login(conn, "/", 0, 131072, 0,
                                        AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "로그인 실패" << std::endl;
        amqp_destroy_connection(conn);
        return 1;
    }

    amqp_channel_open(conn, 1);
    reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "채널 열기 실패" << std::endl;
        amqp_destroy_connection(conn);
        return 1;
    }

    // 1. 토픽 Exchange 선언
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
                         amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);
    reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Exchange 선언 실패" << std::endl;
        amqp_destroy_connection(conn);
        return 1;
    }

    // 2. 큐 선언
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("error_logs"), 0, 1, 0, 0, amqp_empty_table);
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("all_logs"), 0, 1, 0, 0, amqp_empty_table);
    reply = amqp_get_rpc_reply(conn);

    // 3. 바인딩: error_logs <- logs.error
    amqp_queue_bind(conn, 1, amqp_cstring_bytes("error_logs"),
                    amqp_cstring_bytes("logs_topic"),
                    amqp_cstring_bytes("logs.error"), amqp_empty_table);
    // 4. 바인딩: all_logs <- logs.# (모든 로그)
    amqp_queue_bind(conn, 1, amqp_cstring_bytes("all_logs"),
                    amqp_cstring_bytes("logs_topic"),
                    amqp_cstring_bytes("logs.#"), amqp_empty_table);
    reply = amqp_get_rpc_reply(conn);

    std::cout << "Exchange + 바인딩 설정 완료" << std::endl;

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

컴파일:

g++ -std=c++17 topic_exchange_setup.cpp -o topic_setup -lrabbitmq

3.2 SimpleAmqpClient로 토픽 프로듀서

Exchange가 이미 선언되어 있다면, SimpleAmqpClient로 발행만 할 수 있습니다. Exchange 선언은 rabbitmq-c 또는 관리 UI에서 미리 해 두세요.

// topic_producer.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <string>

int main(int argc, char* argv[]) {
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");

    // routing_key: logs.error, logs.warning, logs.info 등
    std::string level = (argc > 1) ? argv[1] : "info";
    std::string routing_key = "logs." + level;
    std::string body = (argc > 2) ? argv[2] : "Sample log message";

    auto msg = AmqpClient::BasicMessage::Create(body);
    msg->DeliveryMode(2);

    // Exchange "logs_topic"으로 발행, routing_key로 라우팅
    channel->BasicPublish("logs_topic", routing_key, msg);

    std::cout << " [x] Sent [" << routing_key << "] " << body << std::endl;
    return 0;
}

3.3 토픽 컨슈머 (에러 전용)

// topic_consumer_error.cpp - error_logs 큐만 소비
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <csignal>
#include <iostream>

static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }

int main() {
    signal(SIGINT, sig_handler);

    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    channel->BasicQos(0, 1, false);

    std::string consumer_tag = channel->BasicConsume("error_logs", "", false, false, false);
    std::cout << " [*] error_logs 큐 대기 중... Ctrl+C로 종료" << std::endl;

    while (g_running) {
        AmqpClient::Envelope::ptr_t envelope;
        if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
            std::string body = envelope->Message()->Body();
            std::cout << " [x] ERROR: " << body << std::endl;
            // 에러 알림 (Slack, PagerDuty 등) 발송
            channel->BasicAck(envelope);
        }
    }
    channel->BasicCancel(consumer_tag);
    return 0;
}

3.4 Fanout Exchange: 브로드캐스트

한 이벤트를 여러 서비스가 각자 받을 때 사용합니다.

// fanout_setup.cpp - rabbitmq-c
// Exchange 선언 (fanout)
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("order_events"),
                     amqp_cstring_bytes("fanout"), 0, 1, 0, 0, amqp_empty_table);

// 각 서비스별 큐 생성 및 바인딩 (routing_key 불필요)
amqp_queue_declare(conn, 1, amqp_cstring_bytes("inventory_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("inventory_queue"),
                amqp_cstring_bytes("order_events"), amqp_empty_bytes, amqp_empty_table);

amqp_queue_declare(conn, 1, amqp_cstring_bytes("notification_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("notification_queue"),
                amqp_cstring_bytes("order_events"), amqp_empty_bytes, amqp_empty_table);
// fanout_producer.cpp - SimpleAmqpClient
// order_events Exchange로 발행 (routing_key 무시됨)
channel->BasicPublish("order_events", "", msg);

4. 우선순위 큐

4.1 우선순위 큐 선언

우선순위 큐는 x-max-priority 인자로 최대 우선순위 값을 설정합니다. 1~10 권장 (너무 크면 성능 저하).

// priority_queue_setup.cpp - rabbitmq-c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>

int main() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    amqp_socket_open(socket, "localhost", 5672);
    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);

    // x-max-priority: 1~255 (실무에서는 10 권장)
    amqp_table_entry_t entries[1];
    entries[0].key = amqp_cstring_bytes("x-max-priority");
    entries[0].value.kind = AMQP_FIELD_KIND_I32;
    entries[0].value.value.i32 = 10;

    amqp_table_t args;
    args.num_entries = 1;
    args.entries = entries;

    amqp_queue_declare(conn, 1, amqp_cstring_bytes("priority_orders"),
                       0, 1, 0, 0, args);

    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "우선순위 큐 선언 실패" << std::endl;
        return 1;
    }

    std::cout << "우선순위 큐 선언 완료 (max-priority=10)" << std::endl;
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

4.2 우선순위 메시지 발행

메시지에 priority 속성을 설정합니다. 숫자가 클수록 높은 우선순위.

// priority_producer.cpp - SimpleAmqpClient
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>

int main() {
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");

    // VIP 주문 (우선순위 10)
    auto vip_msg = AmqpClient::BasicMessage::Create(R"({"order_id":1,"type":"vip"})");
    vip_msg->DeliveryMode(2);
    vip_msg->Priority(10);  // 최고 우선순위
    channel->BasicPublish("", "priority_orders", vip_msg);
    std::cout << " [x] VIP 주문 발행 (priority=10)" << std::endl;

    // 일반 주문 (우선순위 1)
    auto normal_msg = AmqpClient::BasicMessage::Create(R"({"order_id":2,"type":"normal"})");
    normal_msg->DeliveryMode(2);
    normal_msg->Priority(1);
    channel->BasicPublish("", "priority_orders", normal_msg);
    std::cout << " [x] 일반 주문 발행 (priority=1)" << std::endl;

    return 0;
}

주의: SimpleAmqpClient의 Priority() 지원 여부는 버전에 따라 다릅니다. 지원하지 않으면 rabbitmq-c로 amqp_basic_properties_t.priority를 직접 설정하세요.

4.3 rabbitmq-c로 우선순위 발행

// priority_publish_rabbitmq_c.cpp
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_PRIORITY_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2;
props.priority = 10;  // VIP

amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("priority_orders"),
                   0, 0, &props, amqp_cstring_bytes(body.c_str()));

5. Dead Letter Exchange (DLX)

5.1 DLX 개념

메시지가 다음 경우에 DLX로 전달됩니다:

  • 컨슈머가 NAK(requeue=false) 또는 BasicReject(requeue=false)
  • 메시지 TTL 만료
  • 큐 길이 초과 (x-max-length)
flowchart LR
    subgraph Main[메인 큐]
        Q1[order_queue]
    end

    subgraph DLX[Dead Letter]
        EX[dlx_exchange]
        Q2[dead_letter_queue]
    end

    Q1 -->|실패/만료/초과| EX
    EX --> Q2

5.2 DLX 설정이 있는 큐 선언

// dlx_setup.cpp - rabbitmq-c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>

int main() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    amqp_socket_open(socket, "localhost", 5672);
    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);

    // 1. DLX용 Exchange 선언
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes("dlx_exchange"),
                         amqp_cstring_bytes("direct"), 0, 1, 0, 0, amqp_empty_table);

    // 2. Dead Letter 큐 선언 및 바인딩
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("dead_letter_queue"), 0, 1, 0, 0, amqp_empty_table);
    amqp_queue_bind(conn, 1, amqp_cstring_bytes("dead_letter_queue"),
                    amqp_cstring_bytes("dlx_exchange"),
                    amqp_cstring_bytes("dead"), amqp_empty_table);

    // 3. 메인 큐: x-dead-letter-exchange, x-dead-letter-routing-key 설정
    amqp_table_entry_t entries[2];
    entries[0].key = amqp_cstring_bytes("x-dead-letter-exchange");
    entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
    entries[0].value.value.bytes = amqp_cstring_bytes("dlx_exchange");

    entries[1].key = amqp_cstring_bytes("x-dead-letter-routing-key");
    entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
    entries[1].value.value.bytes = amqp_cstring_bytes("dead");

    amqp_table_t args;
    args.num_entries = 2;
    args.entries = entries;

    amqp_queue_declare(conn, 1, amqp_cstring_bytes("order_queue"),
                       0, 1, 0, 0, args);

    std::cout << "DLX 설정 완료: order_queue -> dlx_exchange -> dead_letter_queue" << std::endl;

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

5.3 실패 시 DLQ로 보내기 (requeue=false)

// dlx_consumer.cpp - 처리 실패 시 DLQ로
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <csignal>
#include <iostream>
#include <stdexcept>

static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }

bool process_order(const std::string& body) {
    // JSON 파싱, 비즈니스 검증 등
    if (body.find("invalid") != std::string::npos) {
        throw std::runtime_error("Invalid order format");
    }
    return true;
}

int main() {
    signal(SIGINT, sig_handler);

    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    channel->BasicQos(0, 1, false);

    std::string consumer_tag = channel->BasicConsume("order_queue", "", false, false, false);
    std::cout << " [*] order_queue 대기 중..." << std::endl;

    while (g_running) {
        AmqpClient::Envelope::ptr_t envelope;
        if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
            std::string body = envelope->Message()->Body();
            try {
                if (process_order(body)) {
                    channel->BasicAck(envelope);
                    std::cout << " [x] 처리 완료: " << body << std::endl;
                }
            } catch (const std::exception& e) {
                std::cerr << " [!!] 처리 실패 (DLQ로 전달): " << e.what() << std::endl;
                // requeue=false → Dead Letter Exchange로 전달
                channel->BasicReject(envelope->GetDeliveryTag(), false);
            }
        }
    }
    channel->BasicCancel(consumer_tag);
    return 0;
}

5.4 재시도 횟수 제한 후 DLQ (헤더 활용)

무한 재시도를 막으려면, 메시지 헤더에 재시도 횟수를 넣고, N회 초과 시 requeue=false로 DLQ에 보냅니다.

// retry_with_header.cpp
void handle_with_retry_limit(AmqpClient::Channel::ptr_t channel,
                             AmqpClient::Envelope::ptr_t envelope) {
    std::string body = envelope->Message()->Body();
    auto table = envelope->Message()->HeaderTable();

    int retry_count = 0;
    if (table && table->GetValue("x-retry-count")) {
        retry_count = table->GetValue("x-retry-count")->GetInteger();
    }

    const int max_retries = 3;
    try {
        process_order(body);
        channel->BasicAck(envelope);
    } catch (const std::exception& e) {
        if (retry_count >= max_retries) {
            std::cerr << "최대 재시도 초과, DLQ로 전달" << std::endl;
            channel->BasicReject(envelope->GetDeliveryTag(), false);
        } else {
            // requeue=true로 다시 큐에 넣음 (실무에서는 재시도 전용 큐+TTL 사용 권장)
            channel->BasicReject(envelope->GetDeliveryTag(), true);
        }
    }
}

6. 메시지 TTL과 큐 TTL

6.1 메시지별 TTL (per-message expiration)

발행 시 각 메시지에 expiration 속성을 설정합니다. 밀리초 문자열.

// message_ttl_producer.cpp - SimpleAmqpClient
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>

int main() {
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    channel->DeclareQueue("ttl_queue", false, true, false, false);

    auto msg = AmqpClient::BasicMessage::Create("1시간 후 만료되는 메시지");
    msg->DeliveryMode(2);
    msg->Expiration("3600000");  // 3600000 ms = 1시간

    channel->BasicPublish("", "ttl_queue", msg);
    std::cout << " [x] TTL 1시간 메시지 발행" << std::endl;

    return 0;
}

6.2 큐 TTL (x-message-ttl)

큐에 들어오는 모든 메시지에 기본 TTL을 적용합니다.

// queue_ttl_setup.cpp - rabbitmq-c
amqp_table_entry_t entries[1];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000;  // 60초

amqp_table_t args;
args.num_entries = 1;
args.entries = entries;

amqp_queue_declare(conn, 1, amqp_cstring_bytes("expiring_queue"),
                   0, 1, 0, 0, args);

주의: 메시지 TTL과 큐 TTL이 둘 다 있으면 더 작은 값이 적용됩니다.

6.3 TTL + DLX: 만료 메시지를 DLQ로

만료된 메시지를 버리지 않고 DLQ로 보내 분석할 수 있습니다.

// ttl_dlx_setup.cpp - 큐 인자
// x-message-ttl: 60000 (60초)
// x-dead-letter-exchange: dlx_exchange
// x-dead-letter-routing-key: expired

amqp_table_entry_t entries[3];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000;

entries[1].key = amqp_cstring_bytes("x-dead-letter-exchange");
entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
entries[1].value.value.bytes = amqp_cstring_bytes("dlx_exchange");

entries[2].key = amqp_cstring_bytes("x-dead-letter-routing-key");
entries[2].value.kind = AMQP_FIELD_KIND_UTF8;
entries[2].value.value.bytes = amqp_cstring_bytes("expired");

7. 자주 발생하는 에러와 해결법

에러 1: “NOT_FOUND - no exchange ‘logs_topic’”

증상: BasicPublish("logs_topic", "logs.error", msg) 시 예외

원인: Exchange를 선언하지 않음. 기본 Exchange("")만 자동 존재.

해결법:

# RabbitMQ 관리 UI (http://localhost:15672)에서 수동 생성
# 또는 rabbitmq-c로 amqp_exchange_declare 호출
// ✅ Exchange 선언 후 사용
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
                     amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);

에러 2: “PRECONDITION_FAILED - inequivalent arg ‘x-max-priority’”

증상: 우선순위 큐 선언 시 기존 큐와 인자 불일치

원인: 이미 x-max-priority 없이 선언된 큐가 있음. RabbitMQ는 큐 인자 변경을 허용하지 않음.

해결법:

# 기존 큐 삭제 후 재선언
rabbitmqctl delete_queue order_queue
// ✅ 새 큐 이름 사용 또는 기존 큐 삭제 후 동일 인자로 선언

에러 3: 우선순위가 동작하지 않음

증상: Priority(10)으로 발행했는데 일반 메시지와 같은 순서로 처리됨

원인:

  1. 큐에 x-max-priority 미설정
  2. 컨슈머가 이미 메시지를 가져온 상태 (우선순위는 큐 내부 정렬에만 영향)
  3. SimpleAmqpClient가 Priority 미지원

해결법:

// ✅ 큐 선언 시 x-max-priority 필수
// ✅ rabbitmq-c로 priority 설정 확인
props._flags |= AMQP_BASIC_PRIORITY_FLAG;
props.priority = 10;

에러 4: DLX로 메시지가 안 감

증상: BasicReject(requeue=false) 했는데 dead_letter_queue에 없음

원인:

  1. 큐 선언 시 x-dead-letter-exchange 미설정
  2. requeue=true로 호출 (requeue=true면 DLX로 안 감)
  3. DLX Exchange 또는 dead_letter_queue 미선언

해결법:

// ✅ requeue=false 필수
channel->BasicReject(envelope->GetDeliveryTag(), false);

// ✅ 큐 인자에 x-dead-letter-exchange, x-dead-letter-routing-key 설정
// ✅ dlx_exchange, dead_letter_queue 미리 선언

에러 5: “ACCESS_REFUSED - exchange ‘logs_topic’ in vhost ’/’”

증상: Exchange 이름 오타 또는 다른 vhost

해결법:

// ✅ Exchange 이름 확인 (대소문자 구분)
// ✅ vhost 일치 (기본 "/")
channel->BasicPublish("logs_topic", "logs.error", msg);  // logs_topic 정확히

에러 6: 토픽 바인딩 후 메시지가 큐에 안 옴

증상: logs.error 발행했는데 바인딩한 큐에 메시지 없음

원인: routing key 패턴 불일치. logs.#logs.로 시작하는 모든 키 매칭. logs.errorlogs.error 또는 logs.#에 매칭.

해결법:

# 패턴 규칙
logs.error  → logs.error 바인딩에 매칭
logs.error  → logs.# 바인딩에 매칭
logs.a.b    → logs.# 에만 매칭 (logs.* 는 한 단계만)

에러 7: TTL이 적용되지 않음

증상: expiration 설정했는데 메시지가 만료되지 않음

원인: 메시지가 컨슈머에게 이미 전달된 상태. TTL은 큐에 대기 중일 때만 적용됩니다. 컨슈머가 가져가면 TTL은 무시됩니다.

해결법: TTL은 큐 대기 시간에만 적용됨을 이해하고, 컨슈머 처리 지연이 크면 prefetch를 줄이거나 워커를 늘립니다.


8. 성능 최적화 팁

팁 1: 토픽 바인딩 수 최소화

바인딩이 많을수록 라우팅 비용이 증가합니다. logs.# 하나로 모든 로그를 받는 것보다, 필요한 패턴만 바인딩하세요.

// ❌ 과도한 바인딩: logs.error, logs.warning, logs.info 각각
// ✅ 필요한 것만: error 알림용 logs.error, 전체 수집용 logs.#

팁 2: 우선순위 범위 제한

x-max-priority를 255로 하면 내부 힙 연산 비용이 커집니다. 1~10으로 제한하는 것이 성능에 유리합니다.

// ✅ 실무 권장
entries[0].value.value.i32 = 10;  // 1~10

팁 3: DLQ 모니터링

DLQ에 메시지가 쌓이면 비정상입니다. 알림·대시보드로 모니터링하고, 주기적으로 원인 분석 후 재처리 또는 폐기합니다.

// DLQ 메시지 수 확인 (rabbitmqctl 또는 관리 API)
// rabbitmqctl list_queues name messages

팁 4: TTL 큐와 영구 큐 분리

TTL이 짧은 메시지(알림)와 영구 메시지(주문)를 같은 큐에 넣지 마세요. 큐별로 TTL 정책이 다르므로 분리하는 것이 관리에 유리합니다.

팁 5: Exchange 타입별 처리량

타입상대적 처리량비고
direct높음단순 라우팅
topic중간패턴 매칭 오버헤드
fanout높음바인딩 수만큼 복제

9. 프로덕션 패턴

패턴 1: Exchange·큐 선언 스크립트 분리

C++ 앱에서 매번 Exchange·큐를 선언하지 말고, 배포 스크립트나 설정 툴에서 한 번만 선언합니다.

# setup_rabbitmq.sh
#!/bin/bash
# rabbitmqadmin 또는 HTTP API로 Exchange/Queue 선언
rabbitmqadmin declare exchange name=logs_topic type=topic durable=true
rabbitmqadmin declare queue name=error_logs durable=true
rabbitmqadmin declare binding source=logs_topic destination=error_logs routing_key=logs.error

패턴 2: 재시도 큐 + TTL (지연 재처리)

실패 시 즉시 requeue하지 않고, 재시도 전용 큐에 TTL을 두어 지연 후 재처리합니다.

flowchart LR
    Q1[main_queue] -->|실패| EX1[retry_exchange]
    EX1 --> Q2[retry_queue]
    Q2 -->|TTL 만료| EX2[main_exchange]
    EX2 --> Q1
// retry_queue: x-message-ttl=60000 (60초)
// x-dead-letter-exchange=main_exchange
// x-dead-letter-routing-key=retry
// → 60초 후 main_queue로 다시 들어감

패턴 3: 설정 외부화

struct RabbitMQAdvancedConfig {
    std::string topic_exchange = "logs_topic";
    std::string priority_queue = "priority_orders";
    std::string dlx_exchange = "dlx_exchange";
    std::string dead_letter_queue = "dead_letter_queue";
    int max_priority = 10;
    int message_ttl_ms = 3600000;
};

RabbitMQAdvancedConfig loadFromEnv() {
    RabbitMQAdvancedConfig c;
    if (const char* x = std::getenv("RABBITMQ_TOPIC_EXCHANGE")) c.topic_exchange = x;
    if (const char* ttl = std::getenv("RABBITMQ_MSG_TTL_MS")) c.message_ttl_ms = std::stoi(ttl);
    return c;
}

패턴 4: DLQ 컨슈머 (분석·알림)

// dlq_consumer.cpp - 실패 메시지 수집 및 알림
void consume_dead_letters(AmqpClient::Channel::ptr_t channel) {
    std::string tag = channel->BasicConsume("dead_letter_queue", "", false, false, false);

    while (true) {
        AmqpClient::Envelope::ptr_t envelope;
        if (channel->BasicConsumeMessage(tag, envelope, 1000)) {
            std::string body = envelope->Message()->Body();
            // 로깅, 메트릭, PagerDuty 알림 등
            log_failure(body);
            notify_ops(body);
            channel->BasicAck(envelope);
        }
    }
}

패턴 5: Docker Compose (고급 설정)

# docker-compose.yml
version: '3'
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: app
      RABBITMQ_DEFAULT_PASS: secret
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "check_running"]
      interval: 10s
      timeout: 5s
      retries: 3

  app:
    build: .
    environment:
      RABBITMQ_HOST: rabbitmq
      RABBITMQ_PORT: 5672
      RABBITMQ_USER: app
      RABBITMQ_PASSWORD: secret
    depends_on:
      rabbitmq:
        condition: service_healthy

10. 구현 체크리스트

토픽 라우팅

  • Exchange 선언 (topic/direct/fanout)
  • 큐 선언 및 바인딩 (routing key 패턴 확인)
  • 프로듀서: BasicPublish(exchange, routing_key, msg)
  • 컨슈머: 각 큐별 소비

우선순위 큐

  • 큐에 x-max-priority 설정
  • 메시지에 priority 속성 설정
  • SimpleAmqpClient 미지원 시 rabbitmq-c 사용

Dead Letter

  • DLX Exchange 선언
  • Dead Letter 큐 선언 및 바인딩
  • 메인 큐에 x-dead-letter-exchange, x-dead-letter-routing-key
  • 실패 시 BasicReject(requeue=false)
  • DLQ 컨슈머 구현 (모니터링·알림)

메시지 TTL

  • 메시지 expiration 또는 큐 x-message-ttl
  • TTL + DLX로 만료 메시지 수집 (선택)

에러 처리

  • Exchange 미존재 시 선언 또는 에러 처리
  • PRECONDITION_FAILED 시 큐 삭제 후 재선언
  • DLX 미동작 시 requeue=false 확인

프로덕션

  • Exchange·큐 선언 스크립트 분리
  • 설정 외부화 (환경 변수)
  • DLQ 모니터링·알림
  • Health Check

문제 시나리오 해결 요약

문제RabbitMQ 고급 해결 방법
로그 레벨별 분기토픽 Exchange + routing key (logs.error, logs.#)
VIP·긴급 우선 처리우선순위 큐 (x-max-priority, priority)
실패 메시지 무한 재시도DLX + BasicReject(requeue=false)
오래된 메시지 폐기메시지 TTL 또는 큐 x-message-ttl
한 이벤트 N개 서비스 구독Fanout Exchange
만료 메시지 분석TTL + DLX로 dead_letter_queue 수집

정리

항목요약
토픽 라우팅Exchange(topic) + routing key 패턴 (*, #)
우선순위x-max-priority, 메시지 priority 속성
DLXx-dead-letter-exchange, requeue=false
TTLexpiration(메시지), x-message-ttl(큐)
에러NOT_FOUND, PRECONDITION_FAILED, DLX 미동작
성능바인딩 최소화, 우선순위 범위 제한
프로덕션선언 스크립트 분리, DLQ 모니터링

핵심 원칙:

  1. Exchange 선언 후 사용 (기본 Exchange 제외)
  2. 우선순위 큐는 x-max-priority 필수
  3. DLX는 requeue=false일 때만 동작
  4. TTL은 큐 대기 중에만 적용

자주 묻는 질문 (FAQ)

Q. 토픽과 다이렉트 중 뭘 써야 하나요?

A. routing key가 고정된 몇 개(예: email, sms)면 direct. 패턴 매칭(예: logs.error, order.vip.urgent)이 필요하면 topic.

Q. 우선순위 큐 성능 영향은?

A. 우선순위 범위가 크면(예: 255) 정렬 비용이 증가합니다. 1~10으로 제한하면 대부분의 실무 요구를 충족하면서 성능 부담이 적습니다.

Q. DLQ 메시지를 다시 메인 큐로 보낼 수 있나요?

A. 수동으로 가능합니다. DLQ 컨슈머가 메시지를 받아 수정 후 메인 큐에 재발행하거나, rabbitmqadmin으로 메시지를 이동할 수 있습니다. 자동 재처리는 재시도 큐 + TTL 패턴을 사용하세요.

Q. SimpleAmqpClient로 Exchange 선언이 안 되는데요?

A. SimpleAmqpClient는 Exchange 선언 API가 제한적입니다. rabbitmq-c로 Exchange·바인딩을 선언하거나, RabbitMQ 관리 UI, rabbitmqadmin, HTTP API로 미리 설정해 두세요.

한 줄 요약: 토픽 라우팅·우선순위 큐·Dead Letter·TTL을 활용하면 실무 RabbitMQ 이벤트 시스템을 안정적으로 구축할 수 있습니다.

다음 글: C++ Kafka 고급(#52-6) — 스트림 처리·트랜잭션

이전 글: C++ RabbitMQ 완벽 가이드(#52-7) — 프로듀서·컨슈머·작업 큐


참고 자료


관련 글

  • C++ 메시지 큐 시스템 | RabbitMQ·Kafka 통합 완벽 가이드 [#50-7]
... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3