C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c
이 글의 핵심
C++에서 RabbitMQ 연동: SimpleAmqpClient·rabbitmq-c 설치·연결, 프로듀서·컨슈머·작업 큐·토픽 라우팅·우선순위 큐·DLX·메시지 TTL 실전 코드.
들어가며: C++에서 RabbitMQ를 왜 쓰나요?
실제 겪는 문제 시나리오
시나리오 1: 이메일 발송으로 API 응답 지연
회원가입·주문 완료 시 이메일을 동기적으로 발송합니다. SMTP 서버가 느리면 전체 API가 멈춥니다. “이메일 때문에 10초씩 기다려요.”
시나리오 2: 이미지 리사이징으로 워커 과부하
업로드된 이미지를 여러 워커가 처리합니다. HTTP 폴링으로 작업을 가져오면 빈번한 요청·경쟁 조건이 발생합니다. “어떤 워커가 어떤 작업을 가져갔는지 추적이 안 돼요.”
시나리오 3: 마이크로서비스 간 동기 호출
주문 서비스가 재고·결제·알림 서비스를 순차 HTTP로 호출합니다. 한 서비스 장애 시 전체가 멈추고, 타임아웃 설정이 복잡합니다. “서비스가 늘어날수록 호출 체인이 끊어져요.”
시나리오 4: rabbitmq-c 도입 후 “Connection refused” 에러
RabbitMQ 브로커 주소를 설정했는데 연결이 안 됩니다. localhost:5672 vs 127.0.0.1:5672, 방화벽, Docker 네트워크 등 확인할 것이 많아 막막합니다.
시나리오 5: 메시지 중복 처리 또는 유실
컨슈머가 메시지 처리 중 크래시했습니다. 재시작 후 같은 메시지를 다시 처리하거나, ACK 전에 죽어서 유실됩니다. “주문이 두 번 처리되거나 아예 빠져요.”
시나리오 6: 워커 간 공정 분배 실패
워커 A는 빠르게 처리하고 워커 B는 느린데, 메시지가 균등하게 분배되어 B가 쌓일 때만 처리합니다. “prefetch 없이 하면 한 워커만 바쁘고 나머지는 놀아요.”
시나리오 7: 로그 레벨별 라우팅 필요
error, warn, info 로그를 각각 다른 큐로 보내고 싶은데, if-else로 분기하면 코드가 복잡해집니다. “토픽 라우팅으로 logs.error, logs.# 패턴 구독이 필요해요.”
시나리오 8: VIP 주문 우선 처리
일반 주문보다 긴급 주문을 먼저 처리해야 합니다. “우선순위 큐 없이는 FIFO만 되서 VIP가 뒤로 밀려요.”
시나리오 9: 처리 실패 메시지 수집
재시도 후에도 실패한 메시지를 별도 큐에 모아 분석·알림을 보내고 싶습니다. “Dead Letter Exchange 없이는 reject 시 그냥 버려져요.”
시나리오 10: 오래된 메시지 자동 삭제
세션 토큰·임시 작업처럼 1시간 지나면 의미 없는 메시지를 큐에 쌓아두고 싶지 않습니다. “메시지 TTL로 자동 만료가 필요해요.”
RabbitMQ C++ 클라이언트로 해결:
- 작업 큐: 프로듀서가 메시지를 큐에 넣고, 워커가 가져가 처리. API는 발행만 하고 즉시 반환
- 공정 분배: prefetch=1로 한 번에 하나씩만 가져와 워커 간 공정 분배
- 메시지 유지: persistent 메시지로 브로커 재시작 시에도 유지
- ACK/NAK: 처리 완료 후 ACK, 실패 시 NAK으로 재큐잉
flowchart LR
subgraph Producer["프로듀서 (C++)"]
P1[주문 이벤트]
P2[이메일 작업]
P3[이미지 처리]
end
subgraph RabbitMQ[RabbitMQ]
Q1[task_queue]
Q2[email_queue]
end
subgraph Consumer["컨슈머 (C++)"]
C1[주문 처리]
C2[이메일 발송]
C3[이미지 리사이징]
end
P1 --> Q1
P2 --> Q2
P3 --> Q1
Q1 --> C1
Q1 --> C3
Q2 --> C2
RabbitMQ vs Kafka 비교
| 항목 | RabbitMQ | Kafka |
|---|---|---|
| 모델 | 큐, Exchange | 토픽, 파티션 |
| 메시지 보존 | 소비 후 삭제 (기본) | 보존 기간 동안 유지 |
| 재처리 | 별도 구현 | 오프셋 이동으로 가능 |
| 처리량 | 수만 msg/s | 수백만 msg/s |
| C++ 클라이언트 | rabbitmq-c, SimpleAmqpClient | librdkafka |
이 글에서 다루는 것:
- SimpleAmqpClient·rabbitmq-c 설치 및 환경 설정
- 완전한 프로듀서·컨슈머 C++ 예제 (작업 큐, durable, ACK)
- 토픽 라우팅·우선순위 큐·Dead Letter Exchange·메시지 TTL
- 자주 발생하는 에러와 해결법 (12가지)
- 베스트 프랙티스·성능 최적화·프로덕션 패턴
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
- 환경 설정 및 설치
- 기본 연결 및 Hello World
- 완전한 프로듀서·컨슈머 예제
- 토픽 라우팅·우선순위 큐·DLX·메시지 TTL
- 자주 발생하는 에러와 해결법
- 베스트 프랙티스
- 성능 최적화 팁
- 프로덕션 패턴
- 구현 체크리스트
1. 환경 설정 및 설치
필수 의존성
| 항목 | 버전 | 비고 |
|---|---|---|
| C++ | C++14 이상 | C++17 권장 |
| RabbitMQ | 3.8+ | 브로커 (Docker 권장) |
| SimpleAmqpClient | 2.x | rabbitmq-c C++ 래퍼 |
| rabbitmq-c | 0.14+ | AMQP C 라이브러리 |
| CMake | 3.16+ | find_package 지원 |
RabbitMQ 브로커 실행 (Docker)
# RabbitMQ + 관리 UI (포트 5672 AMQP, 15672 웹)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
rabbitmq:3.12-management
# 기본 계정: guest / guest (로컬만)
# 관리 UI: http://localhost:15672
SimpleAmqpClient 설치 (vcpkg 권장)
# vcpkg로 설치 (rabbitmq-c 자동 의존)
vcpkg install simpleamqpclient
# CMake 사용 시
cmake -B build -DCMAKE_TOOLCHAIN_FILE=[vcpkg]/scripts/buildsystems/vcpkg.cmake
rabbitmq-c 직접 설치 (SimpleAmqpClient 없이)
# Ubuntu/Debian
sudo apt-get install librabbitmq-dev
# macOS (Homebrew)
brew install rabbitmq-c
# 소스 빌드
git clone https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
cmake --build .
sudo cmake --build . --target install
CMakeLists.txt 기본 설정
cmake_minimum_required(VERSION 3.16)
project(rabbitmq_demo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(SimpleAmqpClient REQUIRED)
add_executable(rabbitmq_producer producer.cpp)
target_link_libraries(rabbitmq_producer PRIVATE SimpleAmqpClient::SimpleAmqpClient)
add_executable(rabbitmq_consumer consumer.cpp)
target_link_libraries(rabbitmq_consumer PRIVATE SimpleAmqpClient::SimpleAmqpClient)
rabbitmq-c만 사용 시 CMakeLists.txt
find_package(rabbitmq-c REQUIRED)
target_link_libraries(rabbitmq_producer PRIVATE rabbitmq::rabbitmq)
2. 기본 연결 및 Hello World
SimpleAmqpClient로 최소 연결
AmqpClient::Channel::Create()로 호스트·포트·인증 정보를 넘겨 채널을 생성합니다. 채널 하나가 논리적 연결 단위입니다.
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main() {
try {
// 1. 채널 생성 (연결 + 로그인 자동)
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
// 2. 큐 선언 (없으면 생성)
channel->DeclareQueue("hello", /*passive=*/false,
/*durable=*/false, /*exclusive=*/false,
/*auto_delete=*/false);
// 3. 메시지 발행
auto msg = AmqpClient::BasicMessage::Create("Hello, RabbitMQ!");
channel->BasicPublish("", "hello", msg);
std::cout << "메시지 발행 완료" << std::endl;
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
코드 설명:
DeclareQueue("hello", ...): 큐가 없으면 생성, 있으면 기존 사용.passive=true면 존재 여부만 확인BasicPublish("", "hello", msg): 빈 exchange("")는 기본 exchange로, routing_key가 큐 이름과 동일하면 해당 큐로 전달BasicMessage::Create(): 메시지 본문 생성
Hello World: 프로듀서 + 컨슈머
// producer_hello.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main() {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->DeclareQueue("hello", false, false, false, false);
for (int i = 0; i < 5; ++i) {
std::string body = "Hello " + std::to_string(i);
auto msg = AmqpClient::BasicMessage::Create(body);
channel->BasicPublish("", "hello", msg);
std::cout << "Sent: " << body << std::endl;
}
return 0;
}
// consumer_hello.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <csignal>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
int main() {
signal(SIGINT, sig_handler);
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->DeclareQueue("hello", false, false, false, false);
std::string consumer_tag = channel->BasicConsume("hello", "", /*no_ack=*/true,
/*exclusive=*/false, /*no_local=*/false);
std::cout << "메시지 수신 대기 중... (Ctrl+C로 종료)" << std::endl;
while (g_running) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::cout << "Received: " << envelope->Message()->Body() << std::endl;
}
}
return 0;
}
주의: no_ack=true면 서버가 메시지를 보내자마자 삭제합니다. 처리 실패 시 유실되므로, 실전에서는 no_ack=false로 ACK를 수동 처리해야 합니다.
rabbitmq-c로 직접 연결 (C API)
SimpleAmqpClient 없이 rabbitmq-c만 사용하는 예시입니다.
// rabbitmq_c_hello.cpp - rabbitmq-c C API
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
#include <cstring>
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
if (!socket) {
std::cerr << "소켓 생성 실패" << std::endl;
return 1;
}
if (amqp_socket_open(socket, "localhost", 5672) != AMQP_STATUS_OK) {
std::cerr << "연결 실패" << std::endl;
amqp_destroy_connection(conn);
return 1;
}
amqp_rpc_reply_t reply = amqp_login(conn, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "로그인 실패" << std::endl;
amqp_destroy_connection(conn);
return 1;
}
amqp_channel_open(conn, 1);
reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "채널 열기 실패" << std::endl;
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 1;
}
// 큐 선언
amqp_queue_declare_ok_t* r = amqp_queue_declare(conn, 1, amqp_cstring_bytes("hello"),
0, 0, 0, 0, amqp_empty_table);
reply = amqp_get_rpc_reply(conn);
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
std::cerr << "큐 선언 실패" << std::endl;
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 1;
}
// 메시지 발행
std::string body = "Hello from rabbitmq-c!";
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 1; // non-persistent
amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("hello"),
0, 0, &props, amqp_cstring_bytes(body.c_str()));
std::cout << "메시지 발행 완료" << std::endl;
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
컴파일:
g++ -std=c++17 rabbitmq_c_hello.cpp -o rabbitmq_c_hello -lrabbitmq
3. 완전한 프로듀서·컨슈머 예제
3.1 작업 큐 (Task Queue) — durable + persistent
실전에서는 큐와 메시지를 영구 저장해 브로커 재시작 시에도 유지합니다.
// task_producer.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <string>
int main(int argc, char* argv[]) {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
// durable=true: 큐를 디스크에 저장
channel->DeclareQueue("task_queue", false, true, false, false);
std::string message = (argc > 1) ? argv[1] : "Hello World!";
auto msg = AmqpClient::BasicMessage::Create(message);
msg->DeliveryMode(2); // 2 = persistent (디스크에 저장)
channel->BasicPublish("", "task_queue", msg);
std::cout << " [x] Sent '" << message << "'" << std::endl;
return 0;
}
3.2 작업 큐 컨슈머 — prefetch + ACK
// task_consumer.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <algorithm>
#include <chrono>
#include <csignal>
#include <iostream>
#include <thread>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
int main() {
signal(SIGINT, sig_handler);
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->DeclareQueue("task_queue", false, true, false, false);
// prefetch_count=1: 한 번에 하나씩만 가져옴 → 워커 간 공정 분배
channel->BasicQos(0, 1, false);
std::string consumer_tag = channel->BasicConsume("task_queue", "",
/*no_ack=*/false, // 수동 ACK
false, false);
std::cout << " [*] Waiting for messages. Ctrl+C to exit." << std::endl;
while (g_running) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::string body = envelope->Message()->Body();
std::cout << " [x] Received '" << body << "'" << std::endl;
// 작업 시뮬레이션 (점 하나당 1초)
size_t count = std::count(body.begin(), body.end(), '.');
std::this_thread::sleep_for(std::chrono::seconds(count > 0 ? count : 1));
std::cout << " [x] Done" << std::endl;
channel->BasicAck(envelope); // 처리 완료 후 ACK
}
}
return 0;
}
핵심:
BasicQos(0, 1, false): prefetch=1로 한 번에 하나씩만 가져와 워커 간 공정 분배no_ack=false: 수동 ACK. 처리 완료 후BasicAck호출BasicAck(envelope): 해당 메시지를 서버에서 삭제. 호출하지 않으면 재연결 시 다시 전달됨
3.3 실패 시 NAK (재큐잉)
// 처리 실패 시 BasicReject로 재큐잉
void process_message(AmqpClient::Channel::ptr_t channel,
AmqpClient::Envelope::ptr_t envelope) {
std::string body = envelope->Message()->Body();
try {
do_work(body);
channel->BasicAck(envelope); // 성공
} catch (const std::exception& e) {
std::cerr << "처리 실패: " << e.what() << std::endl;
// requeue=true: 다시 큐에 넣음. false면 버림 (DLQ로 보내려면 별도 설정)
channel->BasicReject(envelope->GetDeliveryTag(), true);
}
}
3.4 RAII 래퍼 클래스 (재사용 가능)
// rabbitmq_producer.hpp
#pragma once
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <memory>
#include <stdexcept>
#include <string>
class RabbitMQProducer {
public:
RabbitMQProducer(const std::string& host = "localhost",
int port = 5672,
const std::string& user = "guest",
const std::string& password = "guest")
: host_(host), port_(port), user_(user), password_(password) {}
void connect() {
channel_ = AmqpClient::Channel::Create(host_, port_, user_, password_);
}
void declareQueue(const std::string& queue, bool durable = true) {
if (!channel_) throw std::runtime_error("Not connected");
channel_->DeclareQueue(queue, false, durable, false, false);
}
void publish(const std::string& queue, const std::string& message,
bool persistent = true) {
if (!channel_) throw std::runtime_error("Not connected");
auto msg = AmqpClient::BasicMessage::Create(message);
msg->DeliveryMode(persistent ? 2 : 1);
channel_->BasicPublish("", queue, msg);
}
private:
std::string host_, user_, password_;
int port_;
AmqpClient::Channel::ptr_t channel_;
};
// rabbitmq_consumer.hpp
#pragma once
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <atomic>
#include <functional>
#include <string>
class RabbitMQConsumer {
public:
using MessageHandler = std::function<bool(const std::string&)>;
RabbitMQConsumer(const std::string& host = "localhost", int port = 5672,
const std::string& user = "guest",
const std::string& password = "guest")
: host_(host), port_(port), user_(user), password_(password) {}
void connect() {
channel_ = AmqpClient::Channel::Create(host_, port_, user_, password_);
}
void declareQueue(const std::string& queue, bool durable = true) {
if (!channel_) throw std::runtime_error("Not connected");
channel_->DeclareQueue(queue, false, durable, false, false);
}
void setPrefetch(int count) {
if (!channel_) throw std::runtime_error("Not connected");
channel_->BasicQos(0, count, false);
}
void consume(const std::string& queue, MessageHandler handler) {
if (!channel_) throw std::runtime_error("Not connected");
std::string consumer_tag = channel_->BasicConsume(queue, "", false, false, false);
running_.store(true);
while (running_.load()) {
AmqpClient::Envelope::ptr_t envelope;
if (channel_->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::string body = envelope->Message()->Body();
if (handler(body)) {
channel_->BasicAck(envelope);
} else {
channel_->BasicReject(envelope->GetDeliveryTag(), true);
}
}
}
}
void stop() { running_.store(false); }
private:
std::string host_, user_, password_;
int port_;
AmqpClient::Channel::ptr_t channel_;
std::atomic<bool> running_{true};
};
3.5 실전 예제: 주문 이벤트 파이프라인
// order_pipeline.cpp
#include "rabbitmq_producer.hpp"
#include "rabbitmq_consumer.hpp"
#include <iostream>
void run_producer() {
RabbitMQProducer producer("localhost", 5672, "guest", "guest");
producer.connect();
producer.declareQueue("orders");
for (int i = 0; i < 10; ++i) {
std::string order = R"({"order_id":)" + std::to_string(i) +
R"(,"user_id":"u1","amount":100})";
producer.publish("orders", order);
std::cout << "Published order " << i << std::endl;
}
}
void run_consumer() {
RabbitMQConsumer consumer("localhost", 5672, "guest", "guest");
consumer.connect();
consumer.declareQueue("orders");
consumer.setPrefetch(1);
consumer.consume("orders", -> bool {
std::cout << "Processing: " << body << std::endl;
// 주문 검증, 재고 차감 등
return true; // 성공 시 ACK
});
}
int main(int argc, char* argv[]) {
if (argc > 1 && std::string(argv[1]) == "producer") {
run_producer();
} else {
run_consumer();
}
return 0;
}
4. 토픽 라우팅·우선순위 큐·Dead Letter·메시지 TTL
4.1 토픽 라우팅 (Topic Exchange)
로그 레벨별·서비스별로 메시지를 라우팅할 때 topic exchange를 사용합니다. *(한 단어), #(여러 단어) 와일드카드로 바인딩합니다.
flowchart LR
subgraph Producer[프로듀서]
P[BasicPublish]
end
subgraph Exchange["logs (topic)"]
E[Exchange]
end
subgraph Queues[큐]
Q1[logs.error]
Q2[logs.*.critical]
Q3[logs.#]
end
P -->|logs.error.critical| E
E --> Q1
E --> Q2
E --> Q3
SimpleAmqpClient로 토픽 라우팅:
// topic_producer.cpp
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
int main(int argc, char* argv[]) {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
// topic 타입 exchange 선언
channel->DeclareExchange("logs_topic", "topic", false, false, false);
std::string routing_key = (argc > 1) ? argv[1] : "app.info.default";
std::string message = (argc > 2) ? argv[2] : "Hello from topic!";
auto msg = AmqpClient::BasicMessage::Create(message);
msg->DeliveryMode(2);
channel->BasicPublish("logs_topic", routing_key, msg);
std::cout << " [x] Sent '" << routing_key << "':" << message << std::endl;
return 0;
}
// topic_consumer.cpp - 특정 패턴만 구독
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <iostream>
#include <csignal>
static volatile sig_atomic_t g_running = 1;
void sig_handler(int) { g_running = 0; }
int main(int argc, char* argv[]) {
std::string binding_key = (argc > 1) ? argv[1] : "logs.#"; // 모든 로그
signal(SIGINT, sig_handler);
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->DeclareExchange("logs_topic", "topic", false, false, false);
// 익명 큐 생성 (exclusive, 연결 끊기면 삭제)
std::string queue = channel->DeclareQueue("", false, false, true, true);
channel->BindQueue(queue, "logs_topic", binding_key);
std::string consumer_tag = channel->BasicConsume(queue, "", false, false, false);
std::cout << " [*] Waiting for " << binding_key << ". Ctrl+C to exit." << std::endl;
while (g_running) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::cout << " [x] " << envelope->RoutingKey() << ": "
<< envelope->Message()->Body() << std::endl;
channel->BasicAck(envelope);
}
}
return 0;
}
rabbitmq-c로 토픽 exchange:
// topic_rabbitmq_c.cpp
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_framing.h>
#include <iostream>
int main() {
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
amqp_socket_open(socket, "localhost", 5672);
amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
amqp_channel_open(conn, 1);
// topic exchange 선언
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("logs_topic"),
amqp_cstring_bytes("topic"), 0, 0, 0, 0, amqp_empty_table);
amqp_get_rpc_reply(conn);
// 메시지 발행
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
props.delivery_mode = 2;
amqp_basic_publish(conn, 1, amqp_cstring_bytes("logs_topic"),
amqp_cstring_bytes("app.error.critical"),
0, 0, &props, amqp_cstring_bytes("Critical error!"));
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
return 0;
}
4.2 우선순위 큐 (Priority Queue)
긴급 주문·VIP 고객 메시지를 먼저 처리하려면 x-max-priority로 우선순위 큐를 선언합니다. rabbitmq-c로 큐 인자를 전달해야 합니다.
// priority_queue_rabbitmq_c.cpp - x-max-priority (0~10)
amqp_table_entry_t entries[1];
entries[0].key = amqp_cstring_bytes("x-max-priority");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 10;
amqp_table_t args = {.num_entries = 1, .entries = entries};
amqp_queue_declare(conn, 1, amqp_cstring_bytes("priority_queue"), 0, 1, 0, 0, args);
// 우선순위 9 메시지 발행 (높을수록 먼저 처리)
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_PRIORITY_FLAG;
props.delivery_mode = 2;
props.priority = 9;
amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("priority_queue"),
0, 0, &props, amqp_cstring_bytes("Urgent task!"));
주의: SimpleAmqpClient는 DeclareQueue에 인자를 넘기는 오버로드가 제한적일 수 있어, 우선순위 큐·DLX·TTL은 rabbitmq-c로 구현하는 것이 안전합니다.
4.3 Dead Letter Exchange (DLX)
처리 실패·만료·거부된 메시지를 별도 큐로 보내 재처리·분석할 때 사용합니다.
flowchart TB
subgraph Main[메인 큐]
Q1[orders]
end
subgraph DLX[Dead Letter]
E[dlx_exchange]
Q2[orders.dlq]
end
Q1 -->|reject/expire/maxlen| E
E --> Q2
// dlx_rabbitmq_c.cpp - DLX용 exchange + 메인 큐 인자
amqp_exchange_declare(conn, 1, amqp_cstring_bytes("dlx_exchange"),
amqp_cstring_bytes("direct"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_declare(conn, 1, amqp_cstring_bytes("orders.dlq"), 0, 1, 0, 0, amqp_empty_table);
amqp_queue_bind(conn, 1, amqp_cstring_bytes("orders.dlq"),
amqp_cstring_bytes("dlx_exchange"), amqp_cstring_bytes("dlq"), amqp_empty_table);
amqp_table_entry_t entries[2];
entries[0].key = amqp_cstring_bytes("x-dead-letter-exchange");
entries[0].value.kind = AMQP_FIELD_KIND_UTF8;
entries[0].value.value.bytes = amqp_cstring_bytes("dlx_exchange");
entries[1].key = amqp_cstring_bytes("x-dead-letter-routing-key");
entries[1].value.kind = AMQP_FIELD_KIND_UTF8;
entries[1].value.value.bytes = amqp_cstring_bytes("dlq");
amqp_table_t args = {.num_entries = 2, .entries = entries};
amqp_queue_declare(conn, 1, amqp_cstring_bytes("orders"), 0, 1, 0, 0, args);
DLX로 가는 경우: BasicReject(requeue=false), 메시지 TTL 만료, 큐 x-max-length 초과.
4.4 메시지 TTL (Time-To-Live)
큐 단위 또는 메시지 단위로 TTL을 설정해 오래된 메시지를 자동 삭제합니다.
// ttl_rabbitmq_c.cpp - x-message-ttl: 60000ms (60초)
amqp_table_entry_t entries[1];
entries[0].key = amqp_cstring_bytes("x-message-ttl");
entries[0].value.kind = AMQP_FIELD_KIND_I32;
entries[0].value.value.i32 = 60000;
amqp_table_t args = {.num_entries = 1, .entries = entries};
amqp_queue_declare(conn, 1, amqp_cstring_bytes("ttl_queue"), 0, 1, 0, 0, args);
메시지별 TTL (SimpleAmqpClient):
// 메시지 단위 TTL - Expiration 헤더 (밀리초 문자열)
auto msg = AmqpClient::BasicMessage::Create("Short-lived");
msg->Expiration("5000"); // 5초 후 만료
channel->BasicPublish("", "queue", msg);
5. 자주 발생하는 에러와 해결법
에러 1: “Connection refused” / “Connection failed”
증상: Channel::Create() 호출 시 예외 또는 “Connection refused”
원인:
- RabbitMQ 브로커가 실행 중이 아님
- 잘못된 호스트/포트
- 방화벽 차단
- Docker 환경에서
localhostvshost.docker.internal혼동
해결법:
# RabbitMQ 실행 확인
docker ps | grep rabbitmq
# Docker 컨테이너에서 호스트 RabbitMQ 접속 시
# host.docker.internal:5672 (macOS/Windows)
# 172.17.0.1:5672 (Linux)
// ✅ 연결 전 브로커 확인
try {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
} catch (const AmqpClient::ConnectionClosedException& e) {
std::cerr << "연결 실패: " << e.what() << std::endl;
// 재시도 또는 폴백
}
에러 2: “Queue not found” / “NOT_FOUND”
증상: BasicPublish 또는 BasicConsume 시 큐를 찾을 수 없음
원인: 큐를 선언하지 않고 사용. 기본 exchange("")는 routing_key가 큐 이름과 같아야 하며, 큐가 미리 존재해야 합니다.
해결법:
// ❌ 잘못된 코드: 큐 선언 없이 발행
channel->BasicPublish("", "my_queue", msg); // my_queue가 없으면 에러
// ✅ 올바른 코드: 선언 후 사용
channel->DeclareQueue("my_queue", false, true, false, false);
channel->BasicPublish("", "my_queue", msg);
에러 3: 메시지 유실 (브로커 재시작 시)
증상: RabbitMQ 재시작 후 큐에 있던 메시지가 사라짐
원인: 큐가 durable=false이거나, 메시지가 DeliveryMode(1)(non-persistent)
해결법:
// ✅ durable 큐 + persistent 메시지
channel->DeclareQueue("task_queue", false, true, false, false); // durable=true
auto msg = AmqpClient::BasicMessage::Create(body);
msg->DeliveryMode(2); // 2 = persistent
channel->BasicPublish("", "task_queue", msg);
에러 4: 메시지 중복 처리
증상: 컨슈머 재시작 후 같은 메시지를 다시 처리
원인: 처리 완료 전에 ACK를 보내거나, 처리 중 크래시 후 재시작 시 서버가 미확인 메시지를 다시 전달
해결법:
// ✅ 처리 완료 후에만 ACK
void handle_message(AmqpClient::Envelope::ptr_t envelope) {
std::string body = envelope->Message()->Body();
process(body); // 비즈니스 로직
channel->BasicAck(envelope); // 처리 성공 후 ACK
}
// 멱등성: 처리 로직이 중복 호출되어도 안전하게 (예: DB upsert)
에러 5: “PRECONDITION_FAILED” — 큐 선언 불일치
증상: DeclareQueue 시 “PRECONDITION_FAILED” 에러
원인: 이미 존재하는 큐와 다른 옵션(durable, exclusive 등)으로 선언 시도
해결법:
// 기존 큐와 동일한 옵션으로 선언해야 함
// 또는 passive=true로 존재 확인만
channel->DeclareQueue("task_queue", true, false, false, false); // passive: 존재만 확인
에러 6: “ACCESS_REFUSED” — 인증 실패
증상: “ACCESS_REFUSED - Login was refused”
원인: 잘못된 사용자명/비밀번호. guest는 기본적으로 localhost에서만 접속 가능
해결법:
# RabbitMQ에서 새 사용자 생성
rabbitmqctl add_user myuser mypassword
rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
// ✅ 올바른 인증 정보
auto channel = AmqpClient::Channel::Create("localhost", 5672, "myuser", "mypassword");
에러 7: 워커 간 공정 분배 실패
증상: 한 워커만 바쁘고 나머지는 놀음
원인: BasicQos(prefetch)를 설정하지 않음. 기본적으로 RabbitMQ는 모든 메시지를 한 번에 컨슈머에게 전달
해결법:
// ✅ prefetch=1로 한 번에 하나씩만 가져와 공정 분배
channel->BasicQos(0, 1, false);
에러 8: SimpleAmqpClient 링크 에러 (dyld, .so)
증상: dyld: Library not loaded 또는 undefined symbol
원인: 라이브러리 경로가 링커에 전달되지 않음
해결법:
# macOS에서 rpath 추가
c++ -std=c++17 main.cpp -Wl,-rpath,/usr/local/lib \
-lSimpleAmqpClient -lrabbitmq -o app
# Linux: LD_LIBRARY_PATH
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
./app
에러 9: “Connection closed” — 장시간 유지 연결 끊김
증상: 한동안 사용하지 않은 채널로 메시지 발행 시 실패
원인: 브로커의 heartbeat 타임아웃 또는 네트워크 불안정
해결법:
// ✅ 재연결 로직
AmqpClient::Channel::ptr_t get_channel() {
static AmqpClient::Channel::ptr_t ch;
try {
if (!ch || !ch->CheckIsConnected()) {
ch = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
}
} catch (...) {
ch.reset();
throw;
}
return ch;
}
에러 10: 토픽 exchange “NOT_FOUND”
증상: BasicPublish로 topic exchange에 발행 시 “NOT_FOUND”
원인: exchange를 선언하지 않음. 기본 exchange("")가 아닌 named exchange는 반드시 DeclareExchange로 선언해야 합니다.
해결법:
// ✅ exchange 선언 후 발행
channel->DeclareExchange("logs_topic", "topic", false, false, false);
channel->BasicPublish("logs_topic", "app.error", msg);
에러 11: 우선순위 큐 “PRECONDITION_FAILED”
증상: x-max-priority 큐 선언 시 에러
원인: 이미 존재하는 큐에 다른 x-max-priority 값으로 재선언. RabbitMQ 3.8+ 필요.
해결법:
# 기존 큐 삭제 후 재선언 (개발 환경)
rabbitmqctl delete_queue priority_queue
에러 12: DLX 무한 루프
증상: DLQ에서 reject 시 다시 메인 큐로 돌아감
원인: DLQ에도 같은 DLX를 설정해 reject 시 메인 큐로 재전달
해결법: DLQ에는 DLX를 설정하지 않거나, 별도 “final” DLQ로 이중 전달 구조를 사용합니다.
6. 베스트 프랙티스
BP 1: 멱등성 설계
메시지가 중복 전달되어도 안전하게 처리합니다.
// ✅ 멱등성: order_id로 중복 체크
bool process_order(const std::string& body) {
auto order_id = parse_order_id(body);
if (db->order_already_processed(order_id)) {
return true; // 이미 처리됨 → ACK
}
db->process_and_mark(order_id, body);
return true;
}
BP 2: 메시지 크기 제한
너무 큰 메시지는 네트워크·메모리 부담을 줍니다. RabbitMQ 기본 최대 프레임 약 128KB.
constexpr size_t MAX_MSG_SIZE = 64 * 1024; // 64KB
if (message.size() > MAX_MSG_SIZE) {
// 대용량은 외부 스토리지 URL만 전달
message = R"({"ref":"s3://bucket/large-file.json"})";
}
BP 3: Correlation ID로 추적
분산 환경에서 요청-응답을 연결할 때 correlation_id 헤더를 사용합니다.
auto msg = AmqpClient::BasicMessage::Create(body);
msg->CorrelationId("req-" + std::to_string(request_id));
channel->BasicPublish("", "orders", msg);
BP 4: 큐·Exchange 네이밍 규칙
권장: {서비스}.{도메인}.{용도}
예: order.payment.events, notification.email.tasks
BP 5: 연결 실패 시 지수 백오프
int retry_delay_ms = 100;
for (int attempt = 0; attempt < 5; ++attempt) {
try {
channel_ = AmqpClient::Channel::Create(host_, port_, user_, password_);
break;
} catch (const std::exception& e) {
if (attempt == 4) throw;
std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms));
retry_delay_ms *= 2;
}
}
7. 성능 최적화 팁
팁 1: 채널 재사용
매 메시지마다 새 채널을 만들지 마세요. 채널은 스레드당 하나 또는 공유 풀로 재사용합니다.
// ❌ 나쁜 예
void send_message(const std::string& msg) {
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel->BasicPublish("", "queue", AmqpClient::BasicMessage::Create(msg));
}
// ✅ 좋은 예: 채널 재사용
class Producer {
AmqpClient::Channel::ptr_t channel_;
public:
Producer() {
channel_ = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
channel_->DeclareQueue("queue", false, true, false, false);
}
void send(const std::string& msg) {
channel_->BasicPublish("", "queue", AmqpClient::BasicMessage::Create(msg));
}
};
팁 2: Prefetch 조정
처리량이 높을 때 prefetch를 늘리면 왕복 횟수를 줄일 수 있습니다. 하지만 너무 크면 한 워커에 메시지가 몰립니다.
// 처리 속도가 빠른 작업: prefetch 증가
channel->BasicQos(0, 10, false); // 한 번에 10개까지
// 처리 속도가 느린 작업: prefetch=1 유지
channel->BasicQos(0, 1, false);
팁 3: Publisher Confirms (신뢰성)
SimpleAmqpClient는 기본적으로 publisher confirm을 지원하지 않습니다. rabbitmq-c로 직접 구현하거나, 메시지 유실이 치명적이지 않다면 persistent 메시지로 충분합니다.
팁 4: 메시지 크기 최적화
큰 메시지는 압축 후 발행하는 것을 고려하세요.
#include <zlib.h>
// 압축 후 Base64 또는 바이너리로 발행
팁 5: 연결 풀 (다중 스레드)
여러 스레드가 동시에 발행할 때, 채널은 스레드 안전하지 않을 수 있습니다. 스레드당 채널 또는 락으로 보호합니다.
class ChannelPool {
std::vector<AmqpClient::Channel::ptr_t> channels_;
std::mutex mtx_;
size_t index_ = 0;
public:
AmqpClient::Channel::ptr_t get() {
std::lock_guard<std::mutex> lock(mtx_);
auto& ch = channels_[index_ % channels_.size()];
index_++;
return ch;
}
};
성능 비교 (참고)
| 방식 | 초당 메시지 (대략) | 비고 |
|---|---|---|
| 채널 매번 생성 | 수백 | 비권장 |
| 채널 재사용 | 수만 | 권장 |
| prefetch=1 | 공정 분배 | 작업 큐 기본 |
| prefetch=10 | 처리량 증가 | 빠른 작업에 적합 |
8. 프로덕션 패턴
패턴 1: Graceful Shutdown
static std::atomic<bool> g_running{true};
void sig_handler(int) {
g_running = false;
}
int main() {
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
auto channel = AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
std::string consumer_tag = channel->BasicConsume("queue", "", false, false, false);
while (g_running) {
AmqpClient::Envelope::ptr_t envelope;
if (channel->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
process(envelope);
channel->BasicAck(envelope);
}
}
channel->BasicCancel(consumer_tag);
return 0;
}
패턴 2: 설정 외부화
struct RabbitMQConfig {
std::string host = "localhost";
int port = 5672;
std::string user = "guest";
std::string password = "guest";
};
RabbitMQConfig loadFromEnv() {
RabbitMQConfig c;
if (const char* h = std::getenv("RABBITMQ_HOST")) c.host = h;
if (const char* p = std::getenv("RABBITMQ_PORT")) c.port = std::stoi(p);
if (const char* u = std::getenv("RABBITMQ_USER")) c.user = u;
if (const char* pw = std::getenv("RABBITMQ_PASSWORD")) c.password = pw;
return c;
}
패턴 3: Health Check
bool check_rabbitmq(const std::string& host, int port) {
try {
auto channel = AmqpClient::Channel::Create(host, port, "guest", "guest");
return channel && channel->CheckIsConnected();
} catch (...) {
return false;
}
}
패턴 4: 재시도 (지수 백오프)
template<typename Func>
auto withRetry(Func&& f, int maxRetries = 3) -> decltype(f()) {
for (int i = 0; i < maxRetries; ++i) {
try {
return f();
} catch (const std::exception& e) {
if (i == maxRetries - 1) throw;
auto delay = std::chrono::milliseconds(100 * (1 << i));
std::this_thread::sleep_for(delay);
}
}
throw std::runtime_error("Unreachable");
}
// 사용
auto channel = withRetry([&]() {
return AmqpClient::Channel::Create("localhost", 5672, "guest", "guest");
});
패턴 5: 로깅·메트릭
template<typename Func>
auto withTiming(const char* op, Func&& f) -> decltype(f()) {
auto start = std::chrono::steady_clock::now();
auto result = f();
auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start).count();
LOG(INFO) << "RabbitMQ " << op << " took " << dur << "ms";
return result;
}
패턴 6: Docker Compose 연동
# docker-compose.yml
version: '3'
services:
app:
build: .
environment:
RABBITMQ_HOST: rabbitmq
RABBITMQ_PORT: 5672
depends_on:
- rabbitmq
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672"
9. 구현 체크리스트
환경 설정
- RabbitMQ 브로커 실행 확인 (
docker ps또는rabbitmqctl status) - SimpleAmqpClient 또는 rabbitmq-c 설치 (vcpkg, apt, Homebrew)
- CMake 또는 컴파일러 연동
연결
-
Channel::Create또는 rabbitmq-camqp_login성공 확인 - 큐
DeclareQueue선언 후 사용 - 인증 정보 (guest는 localhost만)
프로듀서
-
DeliveryMode(2)(persistent) 설정 (필요 시) - 큐
durable=true선언 (필요 시) - 채널 재사용 (매 메시지마다 새 채널 금지)
컨슈머
-
BasicQos(prefetch) 설정 -
no_ack=false시BasicAck호출 - 처리 실패 시
BasicReject(requeue 여부 결정) - Graceful Shutdown (
BasicCancel)
에러 처리
-
ConnectionClosedException등 예외 처리 - 재연결·재시도 로직 (필요 시)
- PRECONDITION_FAILED 시 큐 옵션 확인
프로덕션
- 설정 외부화 (환경 변수)
- Health Check
- 로깅·메트릭
문제 시나리오 해결 요약
| 문제 | RabbitMQ C++ 해결 방법 |
|---|---|
| API 응답 지연 | 메시지 발행 후 즉시 반환, 워커가 비동기 처리 |
| 워커 공정 분배 | BasicQos(prefetch=1) |
| 메시지 유실 | durable 큐 + persistent 메시지 |
| 메시지 중복 | 처리 완료 후 ACK, 멱등성 설계 |
| Connection refused | 브로커 확인, 호스트/포트, Docker 네트워크 |
| 큐 없음 | DeclareQueue 선언 후 사용 |
| 로그 레벨별 라우팅 | topic exchange + 바인딩 패턴 (logs.error, logs.#) |
| VIP 우선 처리 | 우선순위 큐 (x-max-priority, priority 헤더) |
| 실패 메시지 수집 | Dead Letter Exchange (x-dead-letter-exchange) |
| 오래된 메시지 삭제 | 메시지 TTL (x-message-ttl, Expiration) |
정리
| 항목 | 요약 |
|---|---|
| 라이브러리 | SimpleAmqpClient (C++), rabbitmq-c (C) |
| 연결 | Channel::Create(host, port, user, password) |
| 프로듀서 | DeclareQueue → BasicPublish, DeliveryMode(2) |
| 컨슈머 | BasicQos → BasicConsume → BasicConsumeMessage → BasicAck |
| 에러 | Connection refused, Queue not found, PRECONDITION_FAILED |
| 성능 | 채널 재사용, prefetch 조정 |
| 프로덕션 | Graceful Shutdown, 설정 외부화, 재시도 |
핵심 원칙:
- 큐 선언 후 사용
- durable + persistent로 메시지 유실 방지
- prefetch=1로 워커 간 공정 분배
- 처리 완료 후 ACK로 신뢰성 확보
자주 묻는 질문 (FAQ)
Q. RabbitMQ와 Kafka 중 어떤 것을 써야 하나요?
A. 작업 큐, RPC, 복잡한 라우팅이 필요하면 RabbitMQ. 로그 수집, 이벤트 스트리밍, 초당 수십만 메시지가 필요하면 Kafka. 둘 다 쓰는 하이브리드 구성도 많습니다.
Q. SimpleAmqpClient vs rabbitmq-c vs AMQP-CPP?
A. SimpleAmqpClient는 rabbitmq-c 기반 C++ 래퍼로 사용이 쉽습니다. rabbitmq-c는 C API로 더 세밀한 제어가 가능합니다. AMQP-CPP는 C++ 네이티브 비동기 라이브러리로, 이벤트 루프와 통합할 때 유용합니다.
Q. 메시지 순서가 보장되나요?
A. 단일 큐 + 단일 컨슈머에서는 순서가 보장됩니다. 여러 컨슈머가 같은 큐를 소비하면 메시지가 분산되어 순서가 보장되지 않습니다. 순서가 중요하면 단일 컨슈머 또는 Kafka 파티션을 고려하세요.
Q. Publisher Confirms는 어떻게 하나요?
A. SimpleAmqpClient는 publisher confirm API를 직접 노출하지 않습니다. rabbitmq-c의 amqp_confirm_select를 사용하거나, AMQP-CPP로 전환해야 합니다. 대부분의 경우 persistent 메시지로 충분합니다.
한 줄 요약: SimpleAmqpClient·rabbitmq-c로 C++에서 RabbitMQ 작업 큐를 구현하고, prefetch·ACK·프로덕션 패턴으로 안정적으로 운영할 수 있습니다.
다음 글: RabbitMQ 고급: 토픽 라우팅·우선순위·Dead Letter(#52-8)
이전 글: C++ Kafka 완벽 가이드(#52-5)
참고 자료
- RabbitMQ 공식 문서
- SimpleAmqpClient GitHub
- rabbitmq-c GitHub
- AMQP-CPP GitHub
- 메시지 큐 개요(#50-7) — RabbitMQ vs Kafka
관련 글
- C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드
- C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달
- C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인