본문으로 건너뛰기
Previous
Next
C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드

C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드

C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드

이 글의 핵심

RabbitMQ 심화: 토픽 라우팅, 우선순위 큐, Dead Letter Exchange, 메시지 TTL. 실전 문제 시나리오, 완전한 C++ 예제, 흔한 에러 해결, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.

들어가며: “단순 큐만으로는 부족해요”

기본 큐의 한계

RabbitMQ 기본(#52-7)에서 작업 큐를 구현했습니다. 하지만 실무에서는 다음 문제들이 자주 발생합니다. 시나리오 1: 로그 레벨별로 다른 처리

error, warning, info 로그를 한 큐에 넣으면, 에러 알림 서비스와 일반 로그 수집기가 같은 큐를 소비합니다. 에러만 별도 큐로 보내고 싶은데, 기본 큐는 라우팅이 없습니다. 시나리오 2: VIP 주문이 일반 주문 뒤에 밀림

주문 큐에 VIP와 일반 주문이 섞여 들어옵니다. VIP는 5분 내 처리해야 하는데, 앞에 쌓인 일반 주문 때문에 지연됩니다. “우선순위가 있는 큐가 필요해요.” 시나리오 3: 처리 실패 메시지가 무한 재시도

컨슈머가 잘못된 형식의 메시지를 받아 NAK(requeue)하면, 같은 메시지가 계속 돌아와 컨슈머를 죽입니다. “실패한 메시지만 따로 모아서 나중에 분석하고 싶어요.” 시나리오 4: 오래된 메시지는 버리고 싶음

이벤트성 알림(세일 종료 1시간 전)은 1시간이 지나면 의미가 없습니다. 큐에 오래 쌓여 있던 메시지는 자동으로 폐기하고 싶습니다. 시나리오 5: 여러 서비스가 같은 이벤트를 구독

“주문 생성됨” 이벤트를 재고·결제·알림·분석 서비스가 각각 받아야 합니다. 한 큐에 넣으면 한 서비스만 가져가고, 큐를 여러 개 만들면 프로듀서가 여러 번 발행해야 합니다. 이 글에서 다루는 것:

  • 토픽 라우팅: Exchange + routing key로 메시지 분기
  • 우선순위 큐: VIP·긴급 메시지 우선 처리
  • Dead Letter Exchange (DLX): 실패 메시지 자동 수집
  • 메시지 TTL: 만료된 메시지 자동 폐기
  • 자주 발생하는 에러와 해결법
  • 성능 최적화 및 프로덕션 패턴 요구 환경: C++17 이상, RabbitMQ 3.8+, SimpleAmqpClient 또는 rabbitmq-c

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

1. 문제 시나리오와 해결 전략

실전 문제 → RabbitMQ 고급 기능 매핑

문제해결 기능핵심 개념
로그 레벨별 분기 (error/warning/info)토픽 Exchangerouting key 패턴 매칭
VIP·긴급 주문 우선 처리우선순위 큐x-max-priority
실패 메시지 무한 재시도Dead Letter Exchangex-dead-letter-exchange
오래된 메시지 자동 폐기메시지 TTLexpiration, x-message-ttl
한 이벤트를 여러 서비스가 구독Fanout Exchange브로드캐스트
특정 키만 받고 싶음Direct Exchange정확한 routing key 매칭

아키텍처 개요

flowchart TB
    subgraph Producer[프로듀서]
        P1[로그 error]
        P2[로그 warning]
        P3[주문 VIP]
        P4[주문 일반]
    end
    subgraph Exchange["Exchange (토픽/다이렉트)"]
        EX[logs.orders]
    end
    subgraph Queues[큐]
        Q1[error_queue]
        Q2[all_logs_queue]
        Q3[priority_queue]
        Q4[dlq_dead_letter]
    end
    subgraph Consumers[컨슈머]
        C1[에러 알림]
        C2[로그 수집]
        C3[주문 처리]
        C4[실패 분석]
    end
    P1 -->|routing: logs.error| EX
    P2 -->|routing: logs.warning| EX
    P3 -->|priority: 10| EX
    P4 -->|priority: 1| EX
    EX --> Q1
    EX --> Q2
    EX --> Q3
    Q3 -.->|실패 시| Q4
    Q1 --> C1
    Q2 --> C2
    Q3 --> C3
    Q4 --> C4

2. Exchange와 라우팅 개념

Exchange 타입 비교

타입동작사용 사례
directrouting_key가 정확히 일치하는 큐로 전달작업 유형별 분기 (email, sms)
topic패턴 매칭 (* 한 단어, # 0개 이상)로그 레벨·카테고리 (logs.error, order.vip)
fanout모든 바인딩된 큐로 브로드캐스트이벤트 복제 (주문 생성 → N개 서비스)
headers헤더 키-값 매칭복잡한 라우팅 (선택적 사용)

기본 Exchange vs 명시적 Exchange

// 기본 Exchange ("") 사용 시: routing_key = 큐 이름
// - 큐가 미리 존재해야 함
channel->BasicPublish("", "my_queue", msg);
// 명시적 Exchange 사용: Exchange → 바인딩된 큐로 라우팅
channel->BasicPublish("logs_topic", "logs.error", msg);

시퀀스: 토픽 라우팅 흐름

sequenceDiagram
    participant P as 프로듀서
    participant EX as logs_topic (Exchange)
    participant Q1 as error_queue
    participant Q2 as all_logs_queue
    participant C1 as 에러 알림
    participant C2 as 로그 수집
    P->>EX: BasicPublish("logs.error", body)
    EX->>EX: routing_key 매칭
    EX->>Q1: logs.error 바인딩 매칭
    EX->>Q2: logs.# 바인딩 매칭
    Q1->>C1: 메시지 전달
    Q2->>C2: 메시지 전달

3. 토픽 라우팅 완전 구현

3.1 토픽 Exchange 선언 (rabbitmq-c)

SimpleAmqpClient는 Exchange 선언 API가 제한적일 수 있어, rabbitmq-c로 토픽 Exchange를 선언하는 예시를 먼저 보겠습니다.

// topic_exchange_setup.cpp - Exchange + 큐 바인딩
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
#include <cstring>
int main() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    if (amqp_socket_open(socket, "localhost", 5672) != AMQP_STATUS_OK) {
        std::cerr << "연결 실패" << std::endl;
        return 1;
    }
    amqp_rpc_reply_t reply = amqp_login(conn, "/", 0, 131072, 0,
                                        AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "로그인 실패" << std::endl;
        amqp_destroy_connection(conn);
        return 1;
    }
    amqp_channel_open(conn, 1);
    reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "채널 열기 실패" << std::endl;
        amqp_destroy_connection(conn);
        return 1;
    }
    // 1. 토픽 Exchange 선언
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
                         amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);
    reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Exchange 선언 실패" << std::endl;
        amqp_destroy_connection(conn);
        return 1;
    }
    // 2. 큐 선언
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("error_logs"), 0, 1, 0, 0, amqp_empty_table);
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("all_logs"), 0, 1, 0, 0, amqp_empty_table);
    reply = amqp_get_rpc_reply(conn);
    // 3. 바인딩: error_logs <- logs.error
    amqp_queue_bind(conn, 1, amqp_cstring_bytes("error_logs"),
                    amqp_cstring_bytes("logs_topic"),
                    amqp_cstring_bytes("logs.error"), amqp_empty_table);
    // 4. 바인딩: all_logs <- logs.# (모든 로그)
    amqp_queue_bind(conn, 1, amqp_cstring_bytes("all_logs"),
                    amqp_cstring_bytes("logs_topic"),
                    amqp_cstring_bytes("logs.#"), amqp_empty_table);
    reply = amqp_get_rpc_reply(conn);
    std::cout << "Exchange + 바인딩 설정 완료" << std::endl;
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

컴파일:

g++ -std=c++17 topic_exchange_setup.cpp -o topic_setup -lrabbitmq

3.2 SimpleAmqpClient로 토픽 프로듀서

Exchange가 이미 선언되어 있다면, SimpleAmqpClient로 발행만 할 수 있습니다. Exchange 선언은 rabbitmq-c 또는 관리 UI에서 미리 해 두세요.

// topic_producer.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <string>
int main(int argc, char* argv[]) {
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    // routing_key: logs.error, logs.warning, logs.info 등
    std::string level = (argc > 1) ? argv[1] : "info";
    std::string routing_key = "logs." + level;
    std::string body = (argc > 2) ? argv[2] : "Sample log message";
    auto msg = AmqpClient::BasicMessage::Create(body);
    msg->DeliveryMode(2);
    // Exchange "logs_topic"으로 발행, routing_key로 라우팅
    channel->BasicPublish("logs_topic", routing_key, msg);
    std::cout << " [x] Sent [" << routing_key << "] " << body << std::endl;
    return 0;
}

3.3 토픽 컨슈머 (에러 전용)

// topic_consumer_error.cpp - error_logs 큐만 소비
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <csignal>
#include <iostream>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
int main() {
    signal(SIGINT, sig_handler);
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    channel->BasicQos(0, 1, false);
    std::string consumer_tag = channel->BasicConsume("error_logs", "", false, false, false);
    std::cout << " [*] error_logs 큐 대기 중....Ctrl+C로 종료" << std::endl;
    while (g_running) {
        AmqpClient::Envelope::ptr_t envelope;
        if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
            std::string body = envelope->Message()->Body();
            std::cout << " [x] ERROR: " << body << std::endl;
            // 에러 알림 (Slack, PagerDuty 등) 발송
            channel->BasicAck(envelope);
        }
    }
    channel->BasicCancel(consumer_tag);
    return 0;
}

3.4 Fanout Exchange: 브로드캐스트

한 이벤트를 여러 서비스가 각자 받을 때 사용합니다.

// fanout_setup.cpp - rabbitmq-c
// Exchange 선언 (fanout)
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("order_events"),
                     amqp_cstring_bytes("fanout"), 0, 1, 0, 0, amqp_empty_table);
// 각 서비스별 큐 생성 및 바인딩 (routing_key 불필요)
amqp_queue_declare(conn, 1, amqp_cstring_bytes("inventory_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("inventory_queue"),
                amqp_cstring_bytes("order_events"), amqp_empty_bytes, amqp_empty_table);
amqp_queue_declare(conn, 1, amqp_cstring_bytes("notification_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("notification_queue"),
                amqp_cstring_bytes("order_events"), amqp_empty_bytes, amqp_empty_table);
// fanout_producer.cpp - SimpleAmqpClient
// order_events Exchange로 발행 (routing_key 무시됨)
channel->BasicPublish("order_events", "", msg);

4. 우선순위 큐

4.1 우선순위 큐 선언

우선순위 큐는 x-max-priority 인자로 최대 우선순위 값을 설정합니다. 1~10 권장 (너무 크면 성능 저하).

// priority_queue_setup.cpp - rabbitmq-c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
int main() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    amqp_socket_open(socket, "localhost", 5672);
    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);
    // x-max-priority: 1~255 (실무에서는 10 권장)
    amqp_table_entry_t entries[1];
    entries[0].key = amqp_cstring_bytes("x-max-priority");
    entries[0].value.kind = AMQP_FIELD_KIND_I32;
    entries[0].value.value.i32 = 10;
    amqp_table_t args;
    args.num_entries = 1;
    args.entries = entries;
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("priority_orders"),
                       0, 1, 0, 0, args);
    amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
    if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "우선순위 큐 선언 실패" << std::endl;
        return 1;
    }
    std::cout << "우선순위 큐 선언 완료 (max-priority=10)" << std::endl;
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

4.2 우선순위 메시지 발행

메시지에 priority 속성을 설정합니다. 숫자가 클수록 높은 우선순위.

// priority_producer.cpp - SimpleAmqpClient
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main() {
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    // VIP 주문 (우선순위 10)
    auto vip_msg = AmqpClient::BasicMessage::Create(R"({"order_id":1,"type":"vip"})");
    vip_msg->DeliveryMode(2);
    vip_msg->Priority(10);  // 최고 우선순위
    channel->BasicPublish("", "priority_orders", vip_msg);
    std::cout << " [x] VIP 주문 발행 (priority=10)" << std::endl;
    // 일반 주문 (우선순위 1)
    auto normal_msg = AmqpClient::BasicMessage::Create(R"({"order_id":2,"type":"normal"})");
    normal_msg->DeliveryMode(2);
    normal_msg->Priority(1);
    channel->BasicPublish("", "priority_orders", normal_msg);
    std::cout << " [x] 일반 주문 발행 (priority=1)" << std::endl;
    return 0;
}

주의: SimpleAmqpClient의 Priority() 지원 여부는 버전에 따라 다릅니다. 지원하지 않으면 rabbitmq-c로 amqp_basic_properties_t.priority를 직접 설정하세요.

4.3 rabbitmq-c로 우선순위 발행

// priority_publish_rabbitmq_c.cpp
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_PRIORITY_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2;
props.priority = 10;  // VIP
amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("priority_orders"),
                   0, 0, &props, amqp_cstring_bytes(body.c_str()));

5. Dead Letter Exchange (DLX)

5.1 DLX 개념

메시지가 다음 경우에 DLX로 전달됩니다:

  • 컨슈머가 NAK(requeue=false) 또는 BasicReject(requeue=false)
  • 메시지 TTL 만료
  • 큐 길이 초과 (x-max-length)
flowchart LR
    subgraph Main[메인 큐]
        Q1[order_queue]
    end
    subgraph DLX[Dead Letter]
        EX[dlx_exchange]
        Q2[dead_letter_queue]
    end
    Q1 -->|실패/만료/초과| EX
    EX --> Q2

5.2 DLX 설정이 있는 큐 선언

// dlx_setup.cpp - rabbitmq-c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
int main() {
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    amqp_socket_open(socket, "localhost", 5672);
    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);
    // 1. DLX용 Exchange 선언
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes("dlx_exchange"),
                         amqp_cstring_bytes("direct"), 0, 1, 0, 0, amqp_empty_table);
    // 2. Dead Letter 큐 선언 및 바인딩
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("dead_letter_queue"), 0, 1, 0, 0, amqp_empty_table);
    amqp_queue_bind(conn, 1, amqp_cstring_bytes("dead_letter_queue"),
                    amqp_cstring_bytes("dlx_exchange"),
                    amqp_cstring_bytes("dead"), amqp_empty_table);
    // 3. 메인 큐: x-dead-letter-exchange, x-dead-letter-routing-key 설정
    amqp_table_entry_t entries[2];
    entries[0].key = amqp_cstring_bytes("x-dead-letter-exchange");
    entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
    entries[0].value.value.bytes = amqp_cstring_bytes("dlx_exchange");
    entries[1].key = amqp_cstring_bytes("x-dead-letter-routing-key");
    entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
    entries[1].value.value.bytes = amqp_cstring_bytes("dead");
    amqp_table_t args;
    args.num_entries = 2;
    args.entries = entries;
    amqp_queue_declare(conn, 1, amqp_cstring_bytes("order_queue"),
                       0, 1, 0, 0, args);
    std::cout << "DLX 설정 완료: order_queue -> dlx_exchange -> dead_letter_queue" << std::endl;
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);
    return 0;
}

5.3 실패 시 DLQ로 보내기 (requeue=false)

// dlx_consumer.cpp - 처리 실패 시 DLQ로
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <csignal>
#include <iostream>
#include <stdexcept>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
bool process_order(const std::string& body) {
    // JSON 파싱, 비즈니스 검증 등
    if (body.find("invalid") != std::string::npos) {
        throw std::runtime_error("Invalid order format");
    }
    return true;
}
int main() {
    signal(SIGINT, sig_handler);
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    channel->BasicQos(0, 1, false);
    std::string consumer_tag = channel->BasicConsume("order_queue", "", false, false, false);
    std::cout << " [*] order_queue 대기 중..." << std::endl;
    while (g_running) {
        AmqpClient::Envelope::ptr_t envelope;
        if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
            std::string body = envelope->Message()->Body();
            try {
                if (process_order(body)) {
                    channel->BasicAck(envelope);
                    std::cout << " [x] 처리 완료: " << body << std::endl;
                }
            } catch (const std::exception& e) {
                std::cerr << " [!!] 처리 실패 (DLQ로 전달): " << e.what() << std::endl;
                // requeue=false → Dead Letter Exchange로 전달
                channel->BasicReject(envelope->GetDeliveryTag(), false);
            }
        }
    }
    channel->BasicCancel(consumer_tag);
    return 0;
}

5.4 재시도 횟수 제한 후 DLQ (헤더 활용)

무한 재시도를 막으려면, 메시지 헤더에 재시도 횟수를 넣고, N회 초과 시 requeue=false로 DLQ에 보냅니다.

// retry_with_header.cpp
void handle_with_retry_limit(AmqpClient::Channel::ptr_t channel,
                             AmqpClient::Envelope::ptr_t envelope) {
    std::string body = envelope->Message()->Body();
    auto table = envelope->Message()->HeaderTable();
    int retry_count = 0;
    if (table && table->GetValue("x-retry-count")) {
        retry_count = table->GetValue("x-retry-count")->GetInteger();
    }
    const int max_retries = 3;
    try {
        process_order(body);
        channel->BasicAck(envelope);
    } catch (const std::exception& e) {
        if (retry_count >= max_retries) {
            std::cerr << "최대 재시도 초과, DLQ로 전달" << std::endl;
            channel->BasicReject(envelope->GetDeliveryTag(), false);
        } else {
            // requeue=true로 다시 큐에 넣음 (실무에서는 재시도 전용 큐+TTL 사용 권장)
            channel->BasicReject(envelope->GetDeliveryTag(), true);
        }
    }
}

6. 메시지 TTL과 큐 TTL

6.1 메시지별 TTL (per-message expiration)

발행 시 각 메시지에 expiration 속성을 설정합니다. 밀리초 문자열.

// message_ttl_producer.cpp - SimpleAmqpClient
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main() {
    auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
    channel->DeclareQueue("ttl_queue", false, true, false, false);
    auto msg = AmqpClient::BasicMessage::Create("1시간 후 만료되는 메시지");
    msg->DeliveryMode(2);
    msg->Expiration("3600000");  // 3600000 ms = 1시간
    channel->BasicPublish("", "ttl_queue", msg);
    std::cout << " [x] TTL 1시간 메시지 발행" << std::endl;
    return 0;
}

6.2 큐 TTL (x-message-ttl)

큐에 들어오는 모든 메시지에 기본 TTL을 적용합니다.

// queue_ttl_setup.cpp - rabbitmq-c
amqp_table_entry_t entries[1];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000;  // 60초
amqp_table_t args;
args.num_entries = 1;
args.entries = entries;
amqp_queue_declare(conn, 1, amqp_cstring_bytes("expiring_queue"),
                   0, 1, 0, 0, args);

주의: 메시지 TTL과 큐 TTL이 둘 다 있으면 더 작은 값이 적용됩니다.

6.3 TTL + DLX: 만료 메시지를 DLQ로

만료된 메시지를 버리지 않고 DLQ로 보내 분석할 수 있습니다.

// ttl_dlx_setup.cpp - 큐 인자
// x-message-ttl: 60000 (60초)
// x-dead-letter-exchange: dlx_exchange
// x-dead-letter-routing-key: expired
amqp_table_entry_t entries[3];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000;
entries[1].key = amqp_cstring_bytes("x-dead-letter-exchange");
entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
entries[1].value.value.bytes = amqp_cstring_bytes("dlx_exchange");
entries[2].key = amqp_cstring_bytes("x-dead-letter-routing-key");
entries[2].value.kind = AMQP_FIELD_KIND_UTF8;
entries[2].value.value.bytes = amqp_cstring_bytes("expired");

7. 자주 발생하는 에러와 해결법

에러 1: “NOT_FOUND - no exchange ‘logs_topic’”

증상: BasicPublish("logs_topic", "logs.error", msg) 시 예외 원인: Exchange를 선언하지 않음. 기본 Exchange("")만 자동 존재. 해결법:

# RabbitMQ 관리 UI (http://localhost:15672)에서 수동 생성
# 또는 rabbitmq-c로 amqp_exchange_declare 호출
// ✅ Exchange 선언 후 사용
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
                     amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);

에러 2: “PRECONDITION_FAILED - inequivalent arg ‘x-max-priority’”

증상: 우선순위 큐 선언 시 기존 큐와 인자 불일치 원인: 이미 x-max-priority 없이 선언된 큐가 있음. RabbitMQ는 큐 인자 변경을 허용하지 않음. 해결법:

# 기존 큐 삭제 후 재선언
rabbitmqctl delete_queue order_queue
// ✅ 새 큐 이름 사용 또는 기존 큐 삭제 후 동일 인자로 선언

에러 3: 우선순위가 동작하지 않음

증상: Priority(10)으로 발행했는데 일반 메시지와 같은 순서로 처리됨 원인:

  1. 큐에 x-max-priority 미설정
  2. 컨슈머가 이미 메시지를 가져온 상태 (우선순위는 큐 내부 정렬에만 영향)
  3. SimpleAmqpClient가 Priority 미지원 해결법:
// ✅ 큐 선언 시 x-max-priority 필수
// ✅ rabbitmq-c로 priority 설정 확인
props._flags |= AMQP_BASIC_PRIORITY_FLAG;
props.priority = 10;

에러 4: DLX로 메시지가 안 감

증상: BasicReject(requeue=false) 했는데 dead_letter_queue에 없음 원인:

  1. 큐 선언 시 x-dead-letter-exchange 미설정
  2. requeue=true로 호출 (requeue=true면 DLX로 안 감)
  3. DLX Exchange 또는 dead_letter_queue 미선언 해결법:
// ✅ requeue=false 필수
channel->BasicReject(envelope->GetDeliveryTag(), false);
// ✅ 큐 인자에 x-dead-letter-exchange, x-dead-letter-routing-key 설정
// ✅ dlx_exchange, dead_letter_queue 미리 선언

에러 5: “ACCESS_REFUSED - exchange ‘logs_topic’ in vhost ’/’”

증상: Exchange 이름 오타 또는 다른 vhost 해결법:

// ✅ Exchange 이름 확인 (대소문자 구분)
// ✅ vhost 일치 (기본 "/")
channel->BasicPublish("logs_topic", "logs.error", msg);  // logs_topic 정확히

에러 6: 토픽 바인딩 후 메시지가 큐에 안 옴

증상: logs.error 발행했는데 바인딩한 큐에 메시지 없음 원인: routing key 패턴 불일치. logs.#logs.로 시작하는 모든 키 매칭. logs.errorlogs.error 또는 logs.#에 매칭. 해결법:

# 패턴 규칙
logs.error  → logs.error 바인딩에 매칭
logs.error  → logs.# 바인딩에 매칭
logs.a.b    → logs.# 에만 매칭 (logs.* 는 한 단계만)

에러 7: TTL이 적용되지 않음

증상: expiration 설정했는데 메시지가 만료되지 않음 원인: 메시지가 컨슈머에게 이미 전달된 상태. TTL은 큐에 대기 중일 때만 적용됩니다. 컨슈머가 가져가면 TTL은 무시됩니다. 해결법: TTL은 큐 대기 시간에만 적용됨을 이해하고, 컨슈머 처리 지연이 크면 prefetch를 줄이거나 워커를 늘립니다.

8. 성능 최적화 팁

팁 1: 토픽 바인딩 수 최소화

바인딩이 많을수록 라우팅 비용이 증가합니다. logs.# 하나로 모든 로그를 받는 것보다, 필요한 패턴만 바인딩하세요.

// ❌ 과도한 바인딩: logs.error, logs.warning, logs.info 각각
// ✅ 필요한 것만: error 알림용 logs.error, 전체 수집용 logs.#

팁 2: 우선순위 범위 제한

x-max-priority를 255로 하면 내부 힙 연산 비용이 커집니다. 1~10으로 제한하는 것이 성능에 유리합니다.

// ✅ 실무 권장
entries[0].value.value.i32 = 10;  // 1~10

팁 3: DLQ 모니터링

DLQ에 메시지가 쌓이면 비정상입니다. 알림·대시보드로 모니터링하고, 주기적으로 원인 분석 후 재처리 또는 폐기합니다.

// DLQ 메시지 수 확인 (rabbitmqctl 또는 관리 API)
// rabbitmqctl list_queues name messages

팁 4: TTL 큐와 영구 큐 분리

TTL이 짧은 메시지(알림)와 영구 메시지(주문)를 같은 큐에 넣지 마세요. 큐별로 TTL 정책이 다르므로 분리하는 것이 관리에 유리합니다.

팁 5: Exchange 타입별 처리량

타입상대적 처리량비고
direct높음단순 라우팅
topic중간패턴 매칭 오버헤드
fanout높음바인딩 수만큼 복제

9. 프로덕션 패턴

패턴 1: Exchange·큐 선언 스크립트 분리

C++ 앱에서 매번 Exchange·큐를 선언하지 말고, 배포 스크립트나 설정 툴에서 한 번만 선언합니다.

# setup_rabbitmq.sh
#!/bin/bash
# rabbitmqadmin 또는 HTTP API로 Exchange/Queue 선언
rabbitmqadmin declare exchange name=logs_topic type=topic durable=true
rabbitmqadmin declare queue name=error_logs durable=true
rabbitmqadmin declare binding source=logs_topic destination=error_logs routing_key=logs.error

패턴 2: 재시도 큐 + TTL (지연 재처리)

실패 시 즉시 requeue하지 않고, 재시도 전용 큐에 TTL을 두어 지연 후 재처리합니다.

flowchart LR
    Q1[main_queue] -->|실패| EX1[retry_exchange]
    EX1 --> Q2[retry_queue]
    Q2 -->|TTL 만료| EX2[main_exchange]
    EX2 --> Q1
// retry_queue: x-message-ttl=60000 (60초)
// x-dead-letter-exchange=main_exchange
// x-dead-letter-routing-key=retry
// → 60초 후 main_queue로 다시 들어감

패턴 3: 설정 외부화

struct RabbitMQAdvancedConfig {
    std::string topic_exchange = "logs_topic";
    std::string priority_queue = "priority_orders";
    std::string dlx_exchange = "dlx_exchange";
    std::string dead_letter_queue = "dead_letter_queue";
    int max_priority = 10;
    int message_ttl_ms = 3600000;
};
RabbitMQAdvancedConfig loadFromEnv() {
    RabbitMQAdvancedConfig c;
    if (const char* x = std::getenv("RABBITMQ_TOPIC_EXCHANGE")) c.topic_exchange = x;
    if (const char* ttl = std::getenv("RABBITMQ_MSG_TTL_MS")) c.message_ttl_ms = std::stoi(ttl);
    return c;
}

패턴 4: DLQ 컨슈머 (분석·알림)

// dlq_consumer.cpp - 실패 메시지 수집 및 알림
void consume_dead_letters(AmqpClient::Channel::ptr_t channel) {
    std::string tag = channel->BasicConsume("dead_letter_queue", "", false, false, false);
    while (true) {
        AmqpClient::Envelope::ptr_t envelope;
        if (channel->BasicConsumeMessage(tag, envelope, 1000)) {
            std::string body = envelope->Message()->Body();
            // 로깅, 메트릭, PagerDuty 알림 등
            log_failure(body);
            notify_ops(body);
            channel->BasicAck(envelope);
        }
    }
}

패턴 5: Docker Compose (고급 설정)

# docker-compose.yml
version: '3'
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: app
      RABBITMQ_DEFAULT_PASS: secret
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "check_running"]
      interval: 10s
      timeout: 5s
      retries: 3
  app:
    build: .
    environment:
      RABBITMQ_HOST: rabbitmq
      RABBITMQ_PORT: 5672
      RABBITMQ_USER: app
      RABBITMQ_PASSWORD: secret
    depends_on:
      rabbitmq:
        condition: service_healthy

10. 구현 체크리스트

토픽 라우팅

  • Exchange 선언 (topic/direct/fanout)
  • 큐 선언 및 바인딩 (routing key 패턴 확인)
  • 프로듀서: BasicPublish(exchange, routing_key, msg)
  • 컨슈머: 각 큐별 소비

우선순위 큐

  • 큐에 x-max-priority 설정
  • 메시지에 priority 속성 설정
  • SimpleAmqpClient 미지원 시 rabbitmq-c 사용

Dead Letter

  • DLX Exchange 선언
  • Dead Letter 큐 선언 및 바인딩
  • 메인 큐에 x-dead-letter-exchange, x-dead-letter-routing-key
  • 실패 시 BasicReject(requeue=false)
  • DLQ 컨슈머 구현 (모니터링·알림)

메시지 TTL

  • 메시지 expiration 또는 큐 x-message-ttl
  • TTL + DLX로 만료 메시지 수집 (선택)

에러 처리

  • Exchange 미존재 시 선언 또는 에러 처리
  • PRECONDITION_FAILED 시 큐 삭제 후 재선언
  • DLX 미동작 시 requeue=false 확인

프로덕션

  • Exchange·큐 선언 스크립트 분리
  • 설정 외부화 (환경 변수)
  • DLQ 모니터링·알림
  • Health Check

문제 시나리오 해결 요약

문제RabbitMQ 고급 해결 방법
로그 레벨별 분기토픽 Exchange + routing key (logs.error, logs.#)
VIP·긴급 우선 처리우선순위 큐 (x-max-priority, priority)
실패 메시지 무한 재시도DLX + BasicReject(requeue=false)
오래된 메시지 폐기메시지 TTL 또는 큐 x-message-ttl
한 이벤트 N개 서비스 구독Fanout Exchange
만료 메시지 분석TTL + DLX로 dead_letter_queue 수집

정리

항목요약
토픽 라우팅Exchange(topic) + routing key 패턴 (*, #)
우선순위x-max-priority, 메시지 priority 속성
DLXx-dead-letter-exchange, requeue=false
TTLexpiration(메시지), x-message-ttl(큐)
에러NOT_FOUND, PRECONDITION_FAILED, DLX 미동작
성능바인딩 최소화, 우선순위 범위 제한
프로덕션선언 스크립트 분리, DLQ 모니터링
핵심 원칙:
  1. Exchange 선언 후 사용 (기본 Exchange 제외)
  2. 우선순위 큐는 x-max-priority 필수
  3. DLX는 requeue=false일 때만 동작
  4. TTL은 큐 대기 중에만 적용

자주 묻는 질문 (FAQ)

Q. 토픽과 다이렉트 중 뭘 써야 하나요?

A. routing key가 고정된 몇 개(예: email, sms)면 direct. 패턴 매칭(예: logs.error, order.vip.urgent)이 필요하면 topic.

Q. 우선순위 큐 성능 영향은?

A. 우선순위 범위가 크면(예: 255) 정렬 비용이 증가합니다. 1~10으로 제한하면 대부분의 실무 요구를 충족하면서 성능 부담이 적습니다.

Q. DLQ 메시지를 다시 메인 큐로 보낼 수 있나요?

A. 수동으로 가능합니다. DLQ 컨슈머가 메시지를 받아 수정 후 메인 큐에 재발행하거나, rabbitmqadmin으로 메시지를 이동할 수 있습니다. 자동 재처리는 재시도 큐 + TTL 패턴을 사용하세요.

Q. SimpleAmqpClient로 Exchange 선언이 안 되는데요?

A. SimpleAmqpClient는 Exchange 선언 API가 제한적입니다. rabbitmq-c로 Exchange·바인딩을 선언하거나, RabbitMQ 관리 UI, rabbitmqadmin, HTTP API로 미리 설정해 두세요. 한 줄 요약: 토픽 라우팅·우선순위 큐·Dead Letter·TTL을 활용하면 실무 RabbitMQ 이벤트 시스템을 안정적으로 구축할 수 있습니다. 다음 글: C++ Kafka 고급(#52-6) — 스트림 처리·트랜잭션 이전 글: C++ RabbitMQ 완벽 가이드(#52-7) — 프로듀서·컨슈머·작업 큐

참고 자료


관련 글

심화 부록: 구현·운영 관점

이 부록은 앞선 본문에서 다룬 주제(「C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드」)를 구현·런타임·운영 관점에서 다시 압축합니다. 도메인별 세부 구현은 글마다 다르지만, 입력 검증 → 핵심 연산 → 부작용(I/O·네트워크·동시성) → 관측의 흐름으로 장애를 나누면 원인 추적이 빨라집니다.

내부 동작과 핵심 메커니즘

flowchart TD
  A[입력·요청·이벤트] --> B[파싱·검증·디코딩]
  B --> C[핵심 연산·상태 전이]
  C --> D[부작용: I/O·네트워크·동시성]
  D --> E[결과·관측·저장]
sequenceDiagram
  participant C as 클라이언트/호출자
  participant B as 경계(런타임·게이트웨이·프로세스)
  participant D as 의존성(API·DB·큐·파일)
  C->>B: 요청/이벤트
  B->>D: 조회·쓰기·RPC
  D-->>B: 지연·부분 실패·재시도 가능
  B-->>C: 응답 또는 오류(코드·상관 ID)
  • 불변 조건(Invariant): 버퍼 경계, 프로토콜 상태, 트랜잭션 격리, FD 상한 등 단계별로 문장으로 적어 두면 디버깅 비용이 줄어듭니다.
  • 결정성: 순수 층과 시간·네트워크·스케줄에 의존하는 층을 분리해야 테스트와 장애 분석이 쉬워집니다.
  • 경계 비용: 직렬화, 인코딩, syscall 횟수, 락 경합, 할당·GC, 캐시 미스를 의심 목록에 둡니다.
  • 백프레셔: 생산자가 소비자보다 빠를 때 버퍼·큐·스트림에서 속도를 줄이는 신호를 어디에 둘지 정의합니다.

프로덕션 운영 패턴

영역운영 관점 질문
관측성요청 단위 상관 ID, 에러율·지연 p95/p99, 의존성 타임아웃·재시도가 대시보드에 보이는가
안전성입력 검증·권한·비밀·감사 로그가 코드 경로마다 일관적인가
신뢰성재시도는 멱등 연산에만 적용되는가, 서킷 브레이커·백오프·DLQ가 있는가
성능캐시·배치 크기·커넥션 풀·인덱스·백프레셔가 데이터 규모에 맞는가
배포롤백 룬북, 카나리/블루그린, 마이그레이션·피처 플래그가 문서화되어 있는가
용량피크 트래픽·디스크·FD·스레드 풀 상한을 주기적으로 검증하는가

스테이징은 데이터 양·네트워크 RTT·동시성을 프로덕션에 가깝게 맞출수록 재현율이 올라갑니다.

확장 예시: 엔드투엔드 미니 시나리오

앞선 본문 주제(「C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드」)를 배포·운영 흐름에 맞춰 옮긴 체크리스트입니다. 도메인에 맞게 단계 이름만 바꿔 적용할 수 있습니다.

  1. 입력 계약 고정: 스키마·버전·최대 페이로드·타임아웃·에러 코드를 경계에 둔다.
  2. 핵심 경로 계측: 요청 ID, 단계별 지연, 외부 호출 결과 코드를 로그·메트릭·트레이스에서 한 흐름으로 본다.
  3. 실패 주입: 의존성 타임아웃·5xx·부분 데이터·락 대기를 스테이징에서 재현한다.
  4. 호환·롤백: 설정/마이그레이션/클라이언트 버전을 되돌릴 수 있는지 확인한다.
  5. 부하 후 검증: 피크 대비 p95/p99, 에러율, 리소스 상한, 알림 임계값을 점검한다.
handle(request):
  ctx = newCorrelationId()
  validated = validateSchema(request)
  authorize(validated, ctx)
  result = domainCore(validated)
  persistOrEmit(result, idempotentKey)
  recordMetrics(ctx, latency, outcome)
  return result

문제 해결(Troubleshooting)

증상가능 원인조치
간헐적 실패레이스, 타임아웃, 외부 의존성, DNS최소 재현 스크립트, 분산 트레이스·로그 상관관계, 재시도·서킷 설정 점검
성능 저하N+1, 동기 I/O, 락 경합, 과도한 직렬화, 캐시 미스프로파일러·APM으로 핫스팟 확인 후 한 가지씩 제거
메모리 증가캐시 무제한, 구독/리스너 누수, 대용량 버퍼, 커넥션 미반납상한·TTL·힙/FD 스냅샷 비교
빌드·배포만 실패환경 변수, 권한, 플랫폼 차이, lockfileCI 로그와 로컬 diff, 런타임·이미지 버전 핀
설정 불일치프로필·시크릿·기본값, 리전스키마 검증된 설정 단일 소스와 배포 매트릭스 표준화
데이터 불일치비멱등 재시도, 부분 쓰기, 캐시 무효화 누락멱등 키·아웃박스·트랜잭션 경계 재검토

권장 순서: (1) 최소 재현 (2) 최근 변경 범위 축소 (3) 환경·의존성 차이 (4) 관측으로 가설 검증 (5) 수정 후 회귀·부하 테스트.

배포 전에는 git addgit commitgit pushnpm run deploy 순서를 권장합니다.


같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.


이 글에서 다루는 키워드 (관련 검색어)

C++, RabbitMQ, 메시지큐, 라우팅, AMQP, Dead Letter, 토픽 등으로 검색하시면 이 글이 도움이 됩니다.