C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드
이 글의 핵심
RabbitMQ 심화: 토픽 라우팅, 우선순위 큐, Dead Letter Exchange, 메시지 TTL. 실전 문제 시나리오, 완전한 C++ 예제, 흔한 에러 해결, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: “단순 큐만으로는 부족해요”
기본 큐의 한계
RabbitMQ 기본(#52-7)에서 작업 큐를 구현했습니다. 하지만 실무에서는 다음 문제들이 자주 발생합니다.
시나리오 1: 로그 레벨별로 다른 처리
error, warning, info 로그를 한 큐에 넣으면, 에러 알림 서비스와 일반 로그 수집기가 같은 큐를 소비합니다. 에러만 별도 큐로 보내고 싶은데, 기본 큐는 라우팅이 없습니다.
시나리오 2: VIP 주문이 일반 주문 뒤에 밀림
주문 큐에 VIP와 일반 주문이 섞여 들어옵니다. VIP는 5분 내 처리해야 하는데, 앞에 쌓인 일반 주문 때문에 지연됩니다. “우선순위가 있는 큐가 필요해요.”
시나리오 3: 처리 실패 메시지가 무한 재시도
컨슈머가 잘못된 형식의 메시지를 받아 NAK(requeue)하면, 같은 메시지가 계속 돌아와 컨슈머를 죽입니다. “실패한 메시지만 따로 모아서 나중에 분석하고 싶어요.”
시나리오 4: 오래된 메시지는 버리고 싶음
이벤트성 알림(세일 종료 1시간 전)은 1시간이 지나면 의미가 없습니다. 큐에 오래 쌓여 있던 메시지는 자동으로 폐기하고 싶습니다.
시나리오 5: 여러 서비스가 같은 이벤트를 구독
”주문 생성됨” 이벤트를 재고·결제·알림·분석 서비스가 각각 받아야 합니다. 한 큐에 넣으면 한 서비스만 가져가고, 큐를 여러 개 만들면 프로듀서가 여러 번 발행해야 합니다.
이 글에서 다루는 것:
- 토픽 라우팅: Exchange + routing key로 메시지 분기
- 우선순위 큐: VIP·긴급 메시지 우선 처리
- Dead Letter Exchange (DLX): 실패 메시지 자동 수집
- 메시지 TTL: 만료된 메시지 자동 폐기
- 자주 발생하는 에러와 해결법
- 성능 최적화 및 프로덕션 패턴
요구 환경: C++17 이상, RabbitMQ 3.8+, SimpleAmqpClient 또는 rabbitmq-c
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
- 문제 시나리오와 해결 전략
- Exchange와 라우팅 개념
- 토픽 라우팅 완전 구현
- 우선순위 큐
- Dead Letter Exchange (DLX)
- 메시지 TTL과 큐 TTL
- 자주 발생하는 에러와 해결법
- 성능 최적화 팁
- 프로덕션 패턴
- 구현 체크리스트
1. 문제 시나리오와 해결 전략
실전 문제 → RabbitMQ 고급 기능 매핑
| 문제 | 해결 기능 | 핵심 개념 |
|---|---|---|
| 로그 레벨별 분기 (error/warning/info) | 토픽 Exchange | routing key 패턴 매칭 |
| VIP·긴급 주문 우선 처리 | 우선순위 큐 | x-max-priority |
| 실패 메시지 무한 재시도 | Dead Letter Exchange | x-dead-letter-exchange |
| 오래된 메시지 자동 폐기 | 메시지 TTL | expiration, x-message-ttl |
| 한 이벤트를 여러 서비스가 구독 | Fanout Exchange | 브로드캐스트 |
| 특정 키만 받고 싶음 | Direct Exchange | 정확한 routing key 매칭 |
아키텍처 개요
flowchart TB
subgraph Producer[프로듀서]
P1[로그 error]
P2[로그 warning]
P3[주문 VIP]
P4[주문 일반]
end
subgraph Exchange["Exchange (토픽/다이렉트)"]
EX[logs.orders]
end
subgraph Queues[큐]
Q1[error_queue]
Q2[all_logs_queue]
Q3[priority_queue]
Q4[dlq_dead_letter]
end
subgraph Consumers[컨슈머]
C1[에러 알림]
C2[로그 수집]
C3[주문 처리]
C4[실패 분석]
end
P1 -->|routing: logs.error| EX
P2 -->|routing: logs.warning| EX
P3 -->|priority: 10| EX
P4 -->|priority: 1| EX
EX --> Q1
EX --> Q2
EX --> Q3
Q3 -.->|실패 시| Q4
Q1 --> C1
Q2 --> C2
Q3 --> C3
Q4 --> C4
2. Exchange와 라우팅 개념
Exchange 타입 비교
| 타입 | 동작 | 사용 사례 |
|---|---|---|
| direct | routing_key가 정확히 일치하는 큐로 전달 | 작업 유형별 분기 (email, sms) |
| topic | 패턴 매칭 (* 한 단어, # 0개 이상) | 로그 레벨·카테고리 (logs.error, order.vip) |
| fanout | 모든 바인딩된 큐로 브로드캐스트 | 이벤트 복제 (주문 생성 → N개 서비스) |
| headers | 헤더 키-값 매칭 | 복잡한 라우팅 (선택적 사용) |
기본 Exchange vs 명시적 Exchange
// 기본 Exchange ("") 사용 시: routing_key = 큐 이름
// - 큐가 미리 존재해야 함
channel->BasicPublish("", "my_queue", msg);
// 명시적 Exchange 사용: Exchange → 바인딩된 큐로 라우팅
channel->BasicPublish("logs_topic", "logs.error", msg);
시퀀스: 토픽 라우팅 흐름
sequenceDiagram
participant P as 프로듀서
participant EX as logs_topic (Exchange)
participant Q1 as error_queue
participant Q2 as all_logs_queue
participant C1 as 에러 알림
participant C2 as 로그 수집
P->>EX: BasicPublish("logs.error", body)
EX->>EX: routing_key 매칭
EX->>Q1: logs.error 바인딩 매칭
EX->>Q2: logs.# 바인딩 매칭
Q1->>C1: 메시지 전달
Q2->>C2: 메시지 전달
3. 토픽 라우팅 완전 구현
3.1 토픽 Exchange 선언 (rabbitmq-c)
SimpleAmqpClient는 Exchange 선언 API가 제한적일 수 있어, rabbitmq-c로 토픽 Exchange를 선언하는 예시를 먼저 보겠습니다.
// topic_exchange_setup.cpp - Exchange + 큐 바인딩
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
#include <cstring>
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
if (amqp_socket_open(socket, "localhost", 5672) != AMQP_STATUS_OK) {
std::cerr << "연결 실패" << std::endl;
return 1;
}
amqp_rpc_reply_t reply = amqp_login(conn, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "로그인 실패" << std::endl;
amqp_destroy_connection(conn);
return 1;
}
amqp_channel_open(conn, 1);
reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "채널 열기 실패" << std::endl;
amqp_destroy_connection(conn);
return 1;
}
// 1. 토픽 Exchange 선언
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);
reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "Exchange 선언 실패" << std::endl;
amqp_destroy_connection(conn);
return 1;
}
// 2. 큐 선언
amqp_queue_declare(conn, 1, amqp_cstring_bytes("error_logs"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_declare(conn, 1, amqp_cstring_bytes("all_logs"), 0, 1, 0, 0, amqp_empty_table);
reply = amqp_get_rpc_reply(conn);
// 3. 바인딩: error_logs <- logs.error
amqp_queue_bind(conn, 1, amqp_cstring_bytes("error_logs"),
amqp_cstring_bytes("logs_topic"),
amqp_cstring_bytes("logs.error"), amqp_empty_table);
// 4. 바인딩: all_logs <- logs.# (모든 로그)
amqp_queue_bind(conn, 1, amqp_cstring_bytes("all_logs"),
amqp_cstring_bytes("logs_topic"),
amqp_cstring_bytes("logs.#"), amqp_empty_table);
reply = amqp_get_rpc_reply(conn);
std::cout << "Exchange + 바인딩 설정 완료" << std::endl;
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
컴파일:
g++ -std=c++17 topic_exchange_setup.cpp -o topic_setup -lrabbitmq
3.2 SimpleAmqpClient로 토픽 프로듀서
Exchange가 이미 선언되어 있다면, SimpleAmqpClient로 발행만 할 수 있습니다. Exchange 선언은 rabbitmq-c 또는 관리 UI에서 미리 해 두세요.
// topic_producer.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <string>
int main(int argc, char* argv[]) {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
// routing_key: logs.error, logs.warning, logs.info 등
std::string level = (argc > 1) ? argv[1] : "info";
std::string routing_key = "logs." + level;
std::string body = (argc > 2) ? argv[2] : "Sample log message";
auto msg = AmqpClient::BasicMessage::Create(body);
msg->DeliveryMode(2);
// Exchange "logs_topic"으로 발행, routing_key로 라우팅
channel->BasicPublish("logs_topic", routing_key, msg);
std::cout << " [x] Sent [" << routing_key << "] " << body << std::endl;
return 0;
}
3.3 토픽 컨슈머 (에러 전용)
// topic_consumer_error.cpp - error_logs 큐만 소비
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <csignal>
#include <iostream>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
int main() {
signal(SIGINT, sig_handler);
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->BasicQos(0, 1, false);
std::string consumer_tag = channel->BasicConsume("error_logs", "", false, false, false);
std::cout << " [*] error_logs 큐 대기 중... Ctrl+C로 종료" << std::endl;
while (g_running) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::string body = envelope->Message()->Body();
std::cout << " [x] ERROR: " << body << std::endl;
// 에러 알림 (Slack, PagerDuty 등) 발송
channel->BasicAck(envelope);
}
}
channel->BasicCancel(consumer_tag);
return 0;
}
3.4 Fanout Exchange: 브로드캐스트
한 이벤트를 여러 서비스가 각자 받을 때 사용합니다.
// fanout_setup.cpp - rabbitmq-c
// Exchange 선언 (fanout)
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("order_events"),
amqp_cstring_bytes("fanout"), 0, 1, 0, 0, amqp_empty_table);
// 각 서비스별 큐 생성 및 바인딩 (routing_key 불필요)
amqp_queue_declare(conn, 1, amqp_cstring_bytes("inventory_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("inventory_queue"),
amqp_cstring_bytes("order_events"), amqp_empty_bytes, amqp_empty_table);
amqp_queue_declare(conn, 1, amqp_cstring_bytes("notification_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("notification_queue"),
amqp_cstring_bytes("order_events"), amqp_empty_bytes, amqp_empty_table);
// fanout_producer.cpp - SimpleAmqpClient
// order_events Exchange로 발행 (routing_key 무시됨)
channel->BasicPublish("order_events", "", msg);
4. 우선순위 큐
4.1 우선순위 큐 선언
우선순위 큐는 x-max-priority 인자로 최대 우선순위 값을 설정합니다. 1~10 권장 (너무 크면 성능 저하).
// priority_queue_setup.cpp - rabbitmq-c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
amqp_socket_open(socket, "localhost", 5672);
amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
amqp_channel_open(conn, 1);
// x-max-priority: 1~255 (실무에서는 10 권장)
amqp_table_entry_t entries[1];
entries[0].key = amqp_cstring_bytes("x-max-priority");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 10;
amqp_table_t args;
args.num_entries = 1;
args.entries = entries;
amqp_queue_declare(conn, 1, amqp_cstring_bytes("priority_orders"),
0, 1, 0, 0, args);
amqp_rpc_reply_t reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "우선순위 큐 선언 실패" << std::endl;
return 1;
}
std::cout << "우선순위 큐 선언 완료 (max-priority=10)" << std::endl;
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
4.2 우선순위 메시지 발행
메시지에 priority 속성을 설정합니다. 숫자가 클수록 높은 우선순위.
// priority_producer.cpp - SimpleAmqpClient
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main() {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
// VIP 주문 (우선순위 10)
auto vip_msg = AmqpClient::BasicMessage::Create(R"({"order_id":1,"type":"vip"})");
vip_msg->DeliveryMode(2);
vip_msg->Priority(10); // 최고 우선순위
channel->BasicPublish("", "priority_orders", vip_msg);
std::cout << " [x] VIP 주문 발행 (priority=10)" << std::endl;
// 일반 주문 (우선순위 1)
auto normal_msg = AmqpClient::BasicMessage::Create(R"({"order_id":2,"type":"normal"})");
normal_msg->DeliveryMode(2);
normal_msg->Priority(1);
channel->BasicPublish("", "priority_orders", normal_msg);
std::cout << " [x] 일반 주문 발행 (priority=1)" << std::endl;
return 0;
}
주의: SimpleAmqpClient의 Priority() 지원 여부는 버전에 따라 다릅니다. 지원하지 않으면 rabbitmq-c로 amqp_basic_properties_t.priority를 직접 설정하세요.
4.3 rabbitmq-c로 우선순위 발행
// priority_publish_rabbitmq_c.cpp
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_PRIORITY_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2;
props.priority = 10; // VIP
amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("priority_orders"),
0, 0, &props, amqp_cstring_bytes(body.c_str()));
5. Dead Letter Exchange (DLX)
5.1 DLX 개념
메시지가 다음 경우에 DLX로 전달됩니다:
- 컨슈머가 NAK(requeue=false) 또는 BasicReject(requeue=false)
- 메시지 TTL 만료
- 큐 길이 초과 (x-max-length)
flowchart LR
subgraph Main[메인 큐]
Q1[order_queue]
end
subgraph DLX[Dead Letter]
EX[dlx_exchange]
Q2[dead_letter_queue]
end
Q1 -->|실패/만료/초과| EX
EX --> Q2
5.2 DLX 설정이 있는 큐 선언
// dlx_setup.cpp - rabbitmq-c
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
amqp_socket_open(socket, "localhost", 5672);
amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
amqp_channel_open(conn, 1);
// 1. DLX용 Exchange 선언
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("dlx_exchange"),
amqp_cstring_bytes("direct"), 0, 1, 0, 0, amqp_empty_table);
// 2. Dead Letter 큐 선언 및 바인딩
amqp_queue_declare(conn, 1, amqp_cstring_bytes("dead_letter_queue"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("dead_letter_queue"),
amqp_cstring_bytes("dlx_exchange"),
amqp_cstring_bytes("dead"), amqp_empty_table);
// 3. 메인 큐: x-dead-letter-exchange, x-dead-letter-routing-key 설정
amqp_table_entry_t entries[2];
entries[0].key = amqp_cstring_bytes("x-dead-letter-exchange");
entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
entries[0].value.value.bytes = amqp_cstring_bytes("dlx_exchange");
entries[1].key = amqp_cstring_bytes("x-dead-letter-routing-key");
entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
entries[1].value.value.bytes = amqp_cstring_bytes("dead");
amqp_table_t args;
args.num_entries = 2;
args.entries = entries;
amqp_queue_declare(conn, 1, amqp_cstring_bytes("order_queue"),
0, 1, 0, 0, args);
std::cout << "DLX 설정 완료: order_queue -> dlx_exchange -> dead_letter_queue" << std::endl;
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
5.3 실패 시 DLQ로 보내기 (requeue=false)
// dlx_consumer.cpp - 처리 실패 시 DLQ로
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <csignal>
#include <iostream>
#include <stdexcept>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
bool process_order(const std::string& body) {
// JSON 파싱, 비즈니스 검증 등
if (body.find("invalid") != std::string::npos) {
throw std::runtime_error("Invalid order format");
}
return true;
}
int main() {
signal(SIGINT, sig_handler);
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->BasicQos(0, 1, false);
std::string consumer_tag = channel->BasicConsume("order_queue", "", false, false, false);
std::cout << " [*] order_queue 대기 중..." << std::endl;
while (g_running) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::string body = envelope->Message()->Body();
try {
if (process_order(body)) {
channel->BasicAck(envelope);
std::cout << " [x] 처리 완료: " << body << std::endl;
}
} catch (const std::exception& e) {
std::cerr << " [!!] 처리 실패 (DLQ로 전달): " << e.what() << std::endl;
// requeue=false → Dead Letter Exchange로 전달
channel->BasicReject(envelope->GetDeliveryTag(), false);
}
}
}
channel->BasicCancel(consumer_tag);
return 0;
}
5.4 재시도 횟수 제한 후 DLQ (헤더 활용)
무한 재시도를 막으려면, 메시지 헤더에 재시도 횟수를 넣고, N회 초과 시 requeue=false로 DLQ에 보냅니다.
// retry_with_header.cpp
void handle_with_retry_limit(AmqpClient::Channel::ptr_t channel,
AmqpClient::Envelope::ptr_t envelope) {
std::string body = envelope->Message()->Body();
auto table = envelope->Message()->HeaderTable();
int retry_count = 0;
if (table && table->GetValue("x-retry-count")) {
retry_count = table->GetValue("x-retry-count")->GetInteger();
}
const int max_retries = 3;
try {
process_order(body);
channel->BasicAck(envelope);
} catch (const std::exception& e) {
if (retry_count >= max_retries) {
std::cerr << "최대 재시도 초과, DLQ로 전달" << std::endl;
channel->BasicReject(envelope->GetDeliveryTag(), false);
} else {
// requeue=true로 다시 큐에 넣음 (실무에서는 재시도 전용 큐+TTL 사용 권장)
channel->BasicReject(envelope->GetDeliveryTag(), true);
}
}
}
6. 메시지 TTL과 큐 TTL
6.1 메시지별 TTL (per-message expiration)
발행 시 각 메시지에 expiration 속성을 설정합니다. 밀리초 문자열.
// message_ttl_producer.cpp - SimpleAmqpClient
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main() {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->DeclareQueue("ttl_queue", false, true, false, false);
auto msg = AmqpClient::BasicMessage::Create("1시간 후 만료되는 메시지");
msg->DeliveryMode(2);
msg->Expiration("3600000"); // 3600000 ms = 1시간
channel->BasicPublish("", "ttl_queue", msg);
std::cout << " [x] TTL 1시간 메시지 발행" << std::endl;
return 0;
}
6.2 큐 TTL (x-message-ttl)
큐에 들어오는 모든 메시지에 기본 TTL을 적용합니다.
// queue_ttl_setup.cpp - rabbitmq-c
amqp_table_entry_t entries[1];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000; // 60초
amqp_table_t args;
args.num_entries = 1;
args.entries = entries;
amqp_queue_declare(conn, 1, amqp_cstring_bytes("expiring_queue"),
0, 1, 0, 0, args);
주의: 메시지 TTL과 큐 TTL이 둘 다 있으면 더 작은 값이 적용됩니다.
6.3 TTL + DLX: 만료 메시지를 DLQ로
만료된 메시지를 버리지 않고 DLQ로 보내 분석할 수 있습니다.
// ttl_dlx_setup.cpp - 큐 인자
// x-message-ttl: 60000 (60초)
// x-dead-letter-exchange: dlx_exchange
// x-dead-letter-routing-key: expired
amqp_table_entry_t entries[3];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000;
entries[1].key = amqp_cstring_bytes("x-dead-letter-exchange");
entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
entries[1].value.value.bytes = amqp_cstring_bytes("dlx_exchange");
entries[2].key = amqp_cstring_bytes("x-dead-letter-routing-key");
entries[2].value.kind = AMQP_FIELD_KIND_UTF8;
entries[2].value.value.bytes = amqp_cstring_bytes("expired");
7. 자주 발생하는 에러와 해결법
에러 1: “NOT_FOUND - no exchange ‘logs_topic’”
증상: BasicPublish("logs_topic", "logs.error", msg) 시 예외
원인: Exchange를 선언하지 않음. 기본 Exchange("")만 자동 존재.
해결법:
# RabbitMQ 관리 UI (http://localhost:15672)에서 수동 생성
# 또는 rabbitmq-c로 amqp_exchange_declare 호출
// ✅ Exchange 선언 후 사용
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
amqp_cstring_bytes("topic"), 0, 1, 0, 0, amqp_empty_table);
에러 2: “PRECONDITION_FAILED - inequivalent arg ‘x-max-priority’”
증상: 우선순위 큐 선언 시 기존 큐와 인자 불일치
원인: 이미 x-max-priority 없이 선언된 큐가 있음. RabbitMQ는 큐 인자 변경을 허용하지 않음.
해결법:
# 기존 큐 삭제 후 재선언
rabbitmqctl delete_queue order_queue
// ✅ 새 큐 이름 사용 또는 기존 큐 삭제 후 동일 인자로 선언
에러 3: 우선순위가 동작하지 않음
증상: Priority(10)으로 발행했는데 일반 메시지와 같은 순서로 처리됨
원인:
- 큐에
x-max-priority미설정 - 컨슈머가 이미 메시지를 가져온 상태 (우선순위는 큐 내부 정렬에만 영향)
- SimpleAmqpClient가 Priority 미지원
해결법:
// ✅ 큐 선언 시 x-max-priority 필수
// ✅ rabbitmq-c로 priority 설정 확인
props._flags |= AMQP_BASIC_PRIORITY_FLAG;
props.priority = 10;
에러 4: DLX로 메시지가 안 감
증상: BasicReject(requeue=false) 했는데 dead_letter_queue에 없음
원인:
- 큐 선언 시
x-dead-letter-exchange미설정 requeue=true로 호출 (requeue=true면 DLX로 안 감)- DLX Exchange 또는 dead_letter_queue 미선언
해결법:
// ✅ requeue=false 필수
channel->BasicReject(envelope->GetDeliveryTag(), false);
// ✅ 큐 인자에 x-dead-letter-exchange, x-dead-letter-routing-key 설정
// ✅ dlx_exchange, dead_letter_queue 미리 선언
에러 5: “ACCESS_REFUSED - exchange ‘logs_topic’ in vhost ’/’”
증상: Exchange 이름 오타 또는 다른 vhost
해결법:
// ✅ Exchange 이름 확인 (대소문자 구분)
// ✅ vhost 일치 (기본 "/")
channel->BasicPublish("logs_topic", "logs.error", msg); // logs_topic 정확히
에러 6: 토픽 바인딩 후 메시지가 큐에 안 옴
증상: logs.error 발행했는데 바인딩한 큐에 메시지 없음
원인: routing key 패턴 불일치. logs.#는 logs.로 시작하는 모든 키 매칭. logs.error는 logs.error 또는 logs.#에 매칭.
해결법:
# 패턴 규칙
logs.error → logs.error 바인딩에 매칭
logs.error → logs.# 바인딩에 매칭
logs.a.b → logs.# 에만 매칭 (logs.* 는 한 단계만)
에러 7: TTL이 적용되지 않음
증상: expiration 설정했는데 메시지가 만료되지 않음
원인: 메시지가 컨슈머에게 이미 전달된 상태. TTL은 큐에 대기 중일 때만 적용됩니다. 컨슈머가 가져가면 TTL은 무시됩니다.
해결법: TTL은 큐 대기 시간에만 적용됨을 이해하고, 컨슈머 처리 지연이 크면 prefetch를 줄이거나 워커를 늘립니다.
8. 성능 최적화 팁
팁 1: 토픽 바인딩 수 최소화
바인딩이 많을수록 라우팅 비용이 증가합니다. logs.# 하나로 모든 로그를 받는 것보다, 필요한 패턴만 바인딩하세요.
// ❌ 과도한 바인딩: logs.error, logs.warning, logs.info 각각
// ✅ 필요한 것만: error 알림용 logs.error, 전체 수집용 logs.#
팁 2: 우선순위 범위 제한
x-max-priority를 255로 하면 내부 힙 연산 비용이 커집니다. 1~10으로 제한하는 것이 성능에 유리합니다.
// ✅ 실무 권장
entries[0].value.value.i32 = 10; // 1~10
팁 3: DLQ 모니터링
DLQ에 메시지가 쌓이면 비정상입니다. 알림·대시보드로 모니터링하고, 주기적으로 원인 분석 후 재처리 또는 폐기합니다.
// DLQ 메시지 수 확인 (rabbitmqctl 또는 관리 API)
// rabbitmqctl list_queues name messages
팁 4: TTL 큐와 영구 큐 분리
TTL이 짧은 메시지(알림)와 영구 메시지(주문)를 같은 큐에 넣지 마세요. 큐별로 TTL 정책이 다르므로 분리하는 것이 관리에 유리합니다.
팁 5: Exchange 타입별 처리량
| 타입 | 상대적 처리량 | 비고 |
|---|---|---|
| direct | 높음 | 단순 라우팅 |
| topic | 중간 | 패턴 매칭 오버헤드 |
| fanout | 높음 | 바인딩 수만큼 복제 |
9. 프로덕션 패턴
패턴 1: Exchange·큐 선언 스크립트 분리
C++ 앱에서 매번 Exchange·큐를 선언하지 말고, 배포 스크립트나 설정 툴에서 한 번만 선언합니다.
# setup_rabbitmq.sh
#!/bin/bash
# rabbitmqadmin 또는 HTTP API로 Exchange/Queue 선언
rabbitmqadmin declare exchange name=logs_topic type=topic durable=true
rabbitmqadmin declare queue name=error_logs durable=true
rabbitmqadmin declare binding source=logs_topic destination=error_logs routing_key=logs.error
패턴 2: 재시도 큐 + TTL (지연 재처리)
실패 시 즉시 requeue하지 않고, 재시도 전용 큐에 TTL을 두어 지연 후 재처리합니다.
flowchart LR
Q1[main_queue] -->|실패| EX1[retry_exchange]
EX1 --> Q2[retry_queue]
Q2 -->|TTL 만료| EX2[main_exchange]
EX2 --> Q1
// retry_queue: x-message-ttl=60000 (60초)
// x-dead-letter-exchange=main_exchange
// x-dead-letter-routing-key=retry
// → 60초 후 main_queue로 다시 들어감
패턴 3: 설정 외부화
struct RabbitMQAdvancedConfig {
std::string topic_exchange = "logs_topic";
std::string priority_queue = "priority_orders";
std::string dlx_exchange = "dlx_exchange";
std::string dead_letter_queue = "dead_letter_queue";
int max_priority = 10;
int message_ttl_ms = 3600000;
};
RabbitMQAdvancedConfig loadFromEnv() {
RabbitMQAdvancedConfig c;
if (const char* x = std::getenv("RABBITMQ_TOPIC_EXCHANGE")) c.topic_exchange = x;
if (const char* ttl = std::getenv("RABBITMQ_MSG_TTL_MS")) c.message_ttl_ms = std::stoi(ttl);
return c;
}
패턴 4: DLQ 컨슈머 (분석·알림)
// dlq_consumer.cpp - 실패 메시지 수집 및 알림
void consume_dead_letters(AmqpClient::Channel::ptr_t channel) {
std::string tag = channel->BasicConsume("dead_letter_queue", "", false, false, false);
while (true) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(tag, envelope, 1000)) {
std::string body = envelope->Message()->Body();
// 로깅, 메트릭, PagerDuty 알림 등
log_failure(body);
notify_ops(body);
channel->BasicAck(envelope);
}
}
}
패턴 5: Docker Compose (고급 설정)
# docker-compose.yml
version: '3'
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: app
RABBITMQ_DEFAULT_PASS: secret
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "check_running"]
interval: 10s
timeout: 5s
retries: 3
app:
build: .
environment:
RABBITMQ_HOST: rabbitmq
RABBITMQ_PORT: 5672
RABBITMQ_USER: app
RABBITMQ_PASSWORD: secret
depends_on:
rabbitmq:
condition: service_healthy
10. 구현 체크리스트
토픽 라우팅
- Exchange 선언 (topic/direct/fanout)
- 큐 선언 및 바인딩 (routing key 패턴 확인)
- 프로듀서: BasicPublish(exchange, routing_key, msg)
- 컨슈머: 각 큐별 소비
우선순위 큐
- 큐에
x-max-priority설정 - 메시지에
priority속성 설정 - SimpleAmqpClient 미지원 시 rabbitmq-c 사용
Dead Letter
- DLX Exchange 선언
- Dead Letter 큐 선언 및 바인딩
- 메인 큐에
x-dead-letter-exchange,x-dead-letter-routing-key - 실패 시 BasicReject(requeue=false)
- DLQ 컨슈머 구현 (모니터링·알림)
메시지 TTL
- 메시지
expiration또는 큐x-message-ttl - TTL + DLX로 만료 메시지 수집 (선택)
에러 처리
- Exchange 미존재 시 선언 또는 에러 처리
- PRECONDITION_FAILED 시 큐 삭제 후 재선언
- DLX 미동작 시 requeue=false 확인
프로덕션
- Exchange·큐 선언 스크립트 분리
- 설정 외부화 (환경 변수)
- DLQ 모니터링·알림
- Health Check
문제 시나리오 해결 요약
| 문제 | RabbitMQ 고급 해결 방법 |
|---|---|
| 로그 레벨별 분기 | 토픽 Exchange + routing key (logs.error, logs.#) |
| VIP·긴급 우선 처리 | 우선순위 큐 (x-max-priority, priority) |
| 실패 메시지 무한 재시도 | DLX + BasicReject(requeue=false) |
| 오래된 메시지 폐기 | 메시지 TTL 또는 큐 x-message-ttl |
| 한 이벤트 N개 서비스 구독 | Fanout Exchange |
| 만료 메시지 분석 | TTL + DLX로 dead_letter_queue 수집 |
정리
| 항목 | 요약 |
|---|---|
| 토픽 라우팅 | Exchange(topic) + routing key 패턴 (*, #) |
| 우선순위 | x-max-priority, 메시지 priority 속성 |
| DLX | x-dead-letter-exchange, requeue=false |
| TTL | expiration(메시지), x-message-ttl(큐) |
| 에러 | NOT_FOUND, PRECONDITION_FAILED, DLX 미동작 |
| 성능 | 바인딩 최소화, 우선순위 범위 제한 |
| 프로덕션 | 선언 스크립트 분리, DLQ 모니터링 |
핵심 원칙:
- Exchange 선언 후 사용 (기본 Exchange 제외)
- 우선순위 큐는 x-max-priority 필수
- DLX는 requeue=false일 때만 동작
- TTL은 큐 대기 중에만 적용
자주 묻는 질문 (FAQ)
Q. 토픽과 다이렉트 중 뭘 써야 하나요?
A. routing key가 고정된 몇 개(예: email, sms)면 direct. 패턴 매칭(예: logs.error, order.vip.urgent)이 필요하면 topic.
Q. 우선순위 큐 성능 영향은?
A. 우선순위 범위가 크면(예: 255) 정렬 비용이 증가합니다. 1~10으로 제한하면 대부분의 실무 요구를 충족하면서 성능 부담이 적습니다.
Q. DLQ 메시지를 다시 메인 큐로 보낼 수 있나요?
A. 수동으로 가능합니다. DLQ 컨슈머가 메시지를 받아 수정 후 메인 큐에 재발행하거나, rabbitmqadmin으로 메시지를 이동할 수 있습니다. 자동 재처리는 재시도 큐 + TTL 패턴을 사용하세요.
Q. SimpleAmqpClient로 Exchange 선언이 안 되는데요?
A. SimpleAmqpClient는 Exchange 선언 API가 제한적입니다. rabbitmq-c로 Exchange·바인딩을 선언하거나, RabbitMQ 관리 UI, rabbitmqadmin, HTTP API로 미리 설정해 두세요.
한 줄 요약: 토픽 라우팅·우선순위 큐·Dead Letter·TTL을 활용하면 실무 RabbitMQ 이벤트 시스템을 안정적으로 구축할 수 있습니다.
다음 글: C++ Kafka 고급(#52-6) — 스트림 처리·트랜잭션
이전 글: C++ RabbitMQ 완벽 가이드(#52-7) — 프로듀서·컨슈머·작업 큐
참고 자료
- RabbitMQ Tutorials — 튜토리얼 4~6 (라우팅·토픽)
- RabbitMQ Dead Letter
- RabbitMQ Priority Queues
- rabbitmq-c GitHub
- SimpleAmqpClient GitHub
관련 글
- C++ 메시지 큐 시스템 | RabbitMQ·Kafka 통합 완벽 가이드 [#50-7]