C++ 메시지 큐 시스템 | RabbitMQ·Kafka 통합 완벽 가이드 [#50-7]
이 글의 핵심
C++ 메시지 큐 시스템에 대한 실전 가이드입니다. RabbitMQ·Kafka 통합 완벽 가이드 [#50-7] 등을 예제와 함께 설명합니다.
들어가며: “동기 호출이 병목이라 서비스가 느려요”
메시지 큐가 필요한 이유
REST API로 서비스 A가 B를 호출하면, B가 응답할 때까지 A가 블로킹됩니다. B가 DB 쿼리·외부 API·무거운 연산으로 5초 걸리면, A도 5초 기다려야 합니다. 사용자가 “주문 완료” 버튼을 눌렀을 때, 결제·재고 차감·이메일 발송·로그 기록을 모두 동기로 처리하면 응답이 10초 넘게 걸릴 수 있습니다. 메시지 큐는 “메시지를 보내고 바로 다음 일을 한다”는 비동기 패턴으로, 느슨한 결합과 부하 분산을 가능하게 합니다.
이 글에서 다루는 것:
- RabbitMQ 프로듀서/컨슈머 구현 (SimpleAmqpClient)
- Kafka 프로듀서/컨슈머 구현 (librdkafka)
- 메시지 직렬화 (JSON, Protobuf)
- 자주 발생하는 에러와 해결법
- 성능 최적화 및 프로덕션 패턴
요구 환경: C++17 이상
이 글을 읽으면:
- RabbitMQ·Kafka 통합 시스템을 구현할 수 있습니다.
- 실전 문제를 해결할 수 있습니다.
- 프로덕션 수준의 코드를 작성할 수 있습니다.
문제 시나리오: 메시지 큐가 필요한 상황
시나리오 1: 주문 처리 지연
이커머스에서 주문이 들어오면 결제·재고·이메일·로그를 순차 처리합니다. 동기 호출이면 하나가 느려도 전체가 지연됩니다. 메시지 큐에 작업을 넣고, 각 워커가 비동기로 처리하면 사용자는 즉시 “주문 접수됨” 응답을 받고, 백엔드는 여유 있게 처리할 수 있습니다.
시나리오 2: 트래픽 급증 시 서비스 다운
블랙 프라이데이에 주문이 100배로 늘면, 동기 API는 DB·외부 API에 과부하를 주고 결국 타임아웃으로 연쇄 장애가 납니다. 메시지 큐는 버퍼 역할을 해, 피크 시 메시지를 쌓아 두고 컨슈머가 처리 가능한 속도로 소비합니다.
시나리오 3: 마이크로서비스 간 통신
서비스 A가 B, C, D를 각각 호출하면, B가 다운되면 A도 실패합니다. 이벤트 기반으로 A가 “주문 생성됨” 이벤트를 큐에 발행하면, B·C·D는 각자 구독해 독립적으로 처리합니다. B가 잠시 다운돼도 메시지는 큐에 남아 복구 후 처리됩니다.
시나리오 4: 로그·메트릭 수집
수십 대 서버에서 로그를 중앙으로 보낼 때, 동기 HTTP는 수집 서버가 병목이 됩니다. Kafka 같은 스트리밍 플랫폼에 로그를 발행하면, 여러 컨슈머가 병렬로 소비해 실시간 분석·저장이 가능합니다.
시나리오 5: 작업 큐 (Task Queue)
이미지 리사이징, 비디오 인코딩처럼 무거운 작업을 워커 풀에 분배할 때, 메시지 큐가 작업을 공정하게 분배하고, 워커 장애 시 다른 워커가 재시도할 수 있게 합니다.
개념을 잡는 비유
이 글의 주제는 여러 부품이 맞물리는 시스템으로 보시면 이해가 빠릅니다. 한 레이어(저장·네트워크·관측)의 선택이 옆 레이어에도 영향을 주므로, 본문에서는 트레이드오프를 숫자와 패턴으로 정리합니다.
목차
1. 시스템 아키텍처
RabbitMQ vs Kafka 비교
| 항목 | RabbitMQ | Kafka |
|---|---|---|
| 모델 | 메시지 브로커 (Push) | 이벤트 스트리밍 (Pull) |
| 메시지 보존 | 컨슈머 ACK 후 삭제 (기본) | 로그 보존 (일정 기간) |
| 처리량 | 수만 msg/s | 수십만~수백만 msg/s |
| 순서 | 큐 단위 보장 | 파티션 단위 보장 |
| 적합 용도 | 작업 큐, RPC, 라우팅 | 로그, 스트리밍, 대용량 이벤트 |
전체 아키텍처
flowchart TB
subgraph Producer["프로듀서 (C++)"]
P1[메시지 생성]
P2[직렬화]
P3[발행]
P1 --> P2 --> P3
end
subgraph RabbitMQ["RabbitMQ"]
R1[Exchange]
R2[Queue]
R1 --> R2
end
subgraph Kafka["Kafka"]
K1[Topic]
K2[Partition 0]
K3[Partition 1]
K1 --> K2
K1 --> K3
end
subgraph Consumer["컨슈머 (C++)"]
C1[구독/폴링]
C2[역직렬화]
C3[처리]
C1 --> C2 --> C3
end
Producer -->|AMQP| RabbitMQ
Producer -->|Kafka Protocol| Kafka
RabbitMQ -->|Push/Pull| Consumer
Kafka -->|Pull| Consumer
시퀀스 다이어그램 (RabbitMQ)
sequenceDiagram
participant P as C++ 프로듀서
participant R as RabbitMQ
participant C as C++ 컨슈머
P->>R: Connection + Channel 생성
P->>R: BasicPublish(exchange, routing_key, body)
R->>R: Exchange → Queue 라우팅
C->>R: BasicConsume(queue)
loop 메시지 처리
R->>C: BasicConsumeMessage() → 메시지 전달
C->>C: 비즈니스 로직 처리
C->>R: BasicAck(delivery_tag)
end
시퀀스 다이어그램 (Kafka)
sequenceDiagram
participant P as C++ 프로듀서
participant K as Kafka Broker
participant C as C++ 컨슈머
P->>K: produce(topic, partition, key, value)
K->>K: 파티션에 append
C->>K: subscribe(topic)
loop poll()
C->>K: fetch(partition, offset)
K-->>C: 메시지 배치 반환
C->>C: 처리 후 commit(offset)
end
2. RabbitMQ 핵심 구현
환경 설정
의존성: SimpleAmqpClient (rabbitmq-c 래퍼)
# vcpkg로 설치
vcpkg install simpleamqpclient
# CMakeLists.txt
find_package(SimpleAmqpClient REQUIRED)
target_link_libraries(myapp SimpleAmqpClient::SimpleAmqpClient)
프로듀서 구현
// rabbitmq_producer.hpp
#pragma once
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <string>
#include <stdexcept>
class RabbitMQProducer {
public:
RabbitMQProducer(const std::string& host = "localhost",
int port = 5672,
const std::string& user = "guest",
const std::string& password = "guest")
: host_(host), port_(port), user_(user), password_(password) {}
void connect() {
channel_ = AmqpClient::Channel::Create(host_, port_, user_, password_);
channel_->DeclareQueue("task_queue", /*passive=*/false,
/*durable=*/true, /*exclusive=*/false,
/*auto_delete=*/false);
}
void publish(const std::string& queue, const std::string& message) {
if (!channel_) throw std::runtime_error("Not connected");
auto msg = AmqpClient::BasicMessage::Create(message);
msg->DeliveryMode(2); // persistent (디스크에 저장)
channel_->BasicPublish("", queue, msg);
}
private:
std::string host_, user_, password_;
int port_;
AmqpClient::Channel::ptr_t channel_;
};
주의점:
DeliveryMode(2): 브로커 재시작 시에도 메시지 유지DeclareQueue의durable=true: 큐 자체도 영구 저장
컨슈머 구현
// rabbitmq_consumer.hpp
#pragma once
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include <string>
#include <functional>
#include <atomic>
class RabbitMQConsumer {
public:
using MessageHandler = std::function<bool(const std::string&)>;
RabbitMQConsumer(const std::string& host = "localhost", int port = 5672,
const std::string& user = "guest",
const std::string& password = "guest")
: host_(host), port_(port), user_(user), password_(password) {}
void connect() {
channel_ = AmqpClient::Channel::Create(host_, port_, user_, password_);
channel_->DeclareQueue("task_queue", false, true, false, false);
// QoS: 한 번에 하나씩 처리 (fair dispatch)
channel_->BasicQos(0, 1, false);
}
void consume(const std::string& queue, MessageHandler handler) {
if (!channel_) throw std::runtime_error("Not connected");
std::string consumer_tag = channel_->BasicConsume(queue, "", true,
false, false);
running_.store(true);
while (running_.load()) {
AmqpClient::Envelope::ptr_t envelope;
if (channel_->BasicConsumeMessage(consumer_tag, envelope, 1000)) {
std::string body = envelope->Message()->Body();
if (handler(body)) {
channel_->BasicAck(envelope);
} else {
channel_->BasicReject(envelope->GetDeliveryTag(), true);
}
}
}
}
void stop() { running_.store(false); }
private:
std::string host_, user_, password_;
int port_;
AmqpClient::Channel::ptr_t channel_;
std::atomic<bool> running_{true};
};
핵심:
BasicQos(0, 1, false): prefetch=1로 한 번에 하나씩만 가져와 워커 간 공정 분배BasicAck: 처리 성공 시 ACK, 실패 시BasicReject(..., true)로 재큐잉
3. Kafka 핵심 구현
환경 설정
의존성: librdkafka
# vcpkg로 설치
vcpkg install rdkafka
# CMakeLists.txt
find_package(RdKafka REQUIRED)
target_link_libraries(myapp RdKafka::rdkafka)
프로듀서 구현
// kafka_producer.hpp
#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <string>
#include <iostream>
class KafkaProducer {
public:
KafkaProducer(const std::string& brokers) {
std::string errstr;
auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("dr_cb", &dr_cb_, errstr);
producer_.reset(RdKafka::Producer::create(conf, errstr));
if (!producer_) {
throw std::runtime_error("Producer create failed: " + errstr);
}
}
void produce(const std::string& topic, const std::string& key,
const std::string& value) {
RdKafka::ErrorCode err = producer_->produce(
topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(value.data()), value.size(),
key.empty() ? nullptr : key.c_str(), key.size(),
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "Produce failed: " << RdKafka::err2str(err) << "\n";
return;
}
producer_->poll(0);
}
void flush(int timeout_ms = 10000) {
while (producer_->outq_len() > 0) {
producer_->poll(100);
}
}
private:
struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
void dr_cb(RdKafka::Message& msg) override {
if (msg.err())
std::cerr << "Delivery failed: " << msg.errstr() << "\n";
}
} dr_cb_;
std::unique_ptr<RdKafka::Producer> producer_;
};
컨슈머 구현
// kafka_consumer.hpp
#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <string>
#include <functional>
#include <atomic>
class KafkaConsumer {
public:
using MessageHandler = std::function<bool(const std::string&)>;
KafkaConsumer(const std::string& brokers, const std::string& group_id) {
std::string errstr;
auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
conf->set("enable.auto.commit", "false", errstr); // 수동 commit
consumer_.reset(RdKafka::KafkaConsumer::create(conf, errstr));
if (!consumer_) {
throw std::runtime_error("Consumer create failed: " + errstr);
}
}
void subscribe(const std::string& topic) {
std::vector<std::string> topics = {topic};
RdKafka::ErrorCode err = consumer_->subscribe(topics);
if (err) {
throw std::runtime_error("Subscribe failed: " +
RdKafka::err2str(err));
}
}
void consume(MessageHandler handler) {
running_.store(true);
while (running_.load()) {
RdKafka::Message* msg = consumer_->consume(1000);
if (!msg) continue;
if (msg->err()) {
if (msg->err() == RdKafka::ERR__PARTITION_EOF) continue;
std::cerr << "Consume error: " << msg->errstr() << "\n";
delete msg;
continue;
}
std::string payload(static_cast<const char*>(msg->payload()),
msg->len());
if (handler(payload)) {
consumer_->commit(msg);
}
delete msg;
}
}
void stop() { running_.store(false); }
private:
std::unique_ptr<RdKafka::KafkaConsumer> consumer_;
std::atomic<bool> running_{true};
};
4. 완전한 메시지 큐 예제
예제 1: 주문 처리 파이프라인 (RabbitMQ)
주문 생성 → 큐에 발행 → 워커가 결제·재고·이메일 처리
// order_processor.cpp
#include "rabbitmq_producer.hpp"
#include "rabbitmq_consumer.hpp"
#include <nlohmann/json.hpp>
#include <iostream>
using json = nlohmann::json;
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " [producer|consumer]\n";
return 1;
}
std::string mode = argv[1];
if (mode == "producer") {
RabbitMQProducer producer("localhost", 5672, "guest", "guest");
producer.connect();
json order = {
{"order_id", "ORD-001"},
{"user_id", "user_123"},
{"amount", 29900},
{"items", {"item_a", "item_b"}}
};
producer.publish("order_queue", order.dump());
std::cout << "Order published\n";
} else if (mode == "consumer") {
RabbitMQConsumer consumer("localhost", 5672, "guest", "guest");
consumer.connect();
consumer.consume("order_queue", {
try {
auto order = json::parse(body);
std::cout << "Processing order: " << order["order_id"] << "\n";
// 결제, 재고 차감, 이메일 발송 등
return true; // 성공 → ACK
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << "\n";
return false; // 실패 → NACK, 재큐잉
}
});
}
return 0;
}
예제 2: 로그 수집 파이프라인 (Kafka)
여러 서버에서 로그를 Kafka에 발행, 컨슈머가 Elasticsearch/파일에 저장
// log_collector.cpp
#include "kafka_producer.hpp"
#include "kafka_consumer.hpp"
#include <nlohmann/json.hpp>
#include <chrono>
#include <iostream>
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " [producer|consumer] [brokers]\n";
return 1;
}
std::string mode = argv[1];
std::string brokers = argc > 2 ? argv[2] : "localhost:9092";
if (mode == "producer") {
KafkaProducer producer(brokers);
json log = {
{"timestamp", std::time(nullptr)},
{"level", "INFO"},
{"service", "api-server"},
{"message", "Request processed"}
};
producer.produce("app-logs", "api-server", log.dump());
producer.flush();
} else if (mode == "consumer") {
KafkaConsumer consumer(brokers, "log-consumer-group");
consumer.subscribe("app-logs");
consumer.consume( {
auto log = json::parse(body);
std::cout << "[" << log["level"] << "] " << log["message"] << "\n";
return true;
});
}
return 0;
}
예제 3: 메시지 직렬화 (Protobuf)
// order.proto
syntax = "proto3";
package order;
message OrderEvent {
string order_id = 1;
string user_id = 2;
int64 amount = 3;
repeated string items = 4;
}
// protobuf_serialization.cpp
#include "order.pb.h"
#include "rabbitmq_producer.hpp"
#include <iostream>
int main() {
order::OrderEvent evt;
evt.set_order_id("ORD-002");
evt.set_user_id("user_456");
evt.set_amount(15000);
evt.add_items("item_x");
std::string serialized;
evt.SerializeToString(&serialized);
RabbitMQProducer producer("localhost");
producer.connect();
producer.publish("order_queue", serialized);
return 0;
}
예제 4: Docker Compose로 전체 스택 실행
# docker-compose.mq.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
5. 자주 발생하는 에러와 해결법
문제 1: “Connection refused” — RabbitMQ/Kafka에 연결 실패
증상: Connection refused 또는 ECONNREFUSED
원인:
- 브로커가 실행 중이 아님
- 호스트/포트 오류
- 방화벽 차단
해결법:
# RabbitMQ 상태 확인
docker ps | grep rabbitmq
# 또는
rabbitmqctl status
# Kafka 상태 확인
docker exec -it kafka kafka-broker-api-versions --bootstrap-server localhost:9092
// ✅ 연결 재시도 로직 추가
void connect_with_retry(int max_retries = 5) {
for (int i = 0; i < max_retries; ++i) {
try {
channel_ = AmqpClient::Channel::Create(host_, port_, user_, password_);
return;
} catch (const std::exception& e) {
std::cerr << "Connect failed, retry " << (i+1) << "/" << max_retries << "\n";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
throw std::runtime_error("Connection failed after retries");
}
문제 2: “Queue not found” — RabbitMQ 큐 미선언
증상: NOT_FOUND - no queue 'task_queue'
원인: 프로듀서가 발행 전에 큐를 Declare하지 않음
해결법:
// ❌ 나쁜 예: 큐 선언 없이 발행
channel_->BasicPublish("", "task_queue", msg); // 큐 없으면 에러
// ✅ 좋은 예: 발행 전 큐 선언 (멱등)
channel_->DeclareQueue("task_queue", false, true, false, false);
channel_->BasicPublish("", "task_queue", msg);
문제 3: 메시지 유실 — ACK 전 컨슈머 크래시
증상: 메시지가 처리됐는데 큐에서 사라짐 (유실)
원인: BasicConsume의 no_ack=true로 설정하면, 메시지 전달 즉시 삭제됨. 컨슈머 크래시 시 유실.
해결법:
// ❌ 나쁜 예: no_ack=true → 전달 즉시 삭제
channel_->BasicConsume(queue, "", true, false, false); // no_ack=true
// ✅ 좋은 예: no_ack=false, 처리 후 명시적 ACK
channel_->BasicConsume(queue, "", false, false, false); // no_ack=false
// ... 처리 ...
channel_->BasicAck(envelope->GetDeliveryTag());
문제 4: Kafka “Broker: Topic authorization failed”
증상: TopicAuthorizationException 또는 Not authorized to access topic
원인: ACL 설정 또는 SASL 인증 오류
해결법:
// SASL/SSL 설정 (librdkafka)
conf->set("security.protocol", "SASL_PLAINTEXT", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", "user", errstr);
conf->set("sasl.password", "pass", errstr);
# Kafka ACL 확인
kafka-acls --bootstrap-server localhost:9092 --list
문제 5: “Consumer group rebalance” — 처리 중 오프셋 커밋 실패
증상: 같은 메시지가 여러 번 처리되거나, 일부 메시지 건너뜀
원인: enable.auto.commit=true이면 폴링 시점에 자동 커밋. 처리 완료 전 리밸런스되면 다른 컨슈머가 같은 메시지를 가져감.
해결법:
// ✅ 수동 커밋: 처리 성공 후에만 commit
conf->set("enable.auto.commit", "false", errstr);
// ...
if (handler(payload)) {
consumer_->commit(msg); // 처리 성공 시에만
}
문제 6: RabbitMQ 메모리 부족 — 메시지 적체
증상: RabbitMQ 메모리 사용량 급증, memory_alarm 발생
원인: 프로듀서가 컨슈머보다 빠르게 발행, 큐에 메시지 적체
해결법:
// 1. 프로듀서 발행 속도 제한 (throttling)
// 2. publisher confirm 사용해 브로커 수용 여부 확인
channel_->ConfirmSelect();
channel_->BasicPublish("", "task_queue", msg);
channel_->WaitForConfirms(); // 브로커가 수신할 때까지 대기
// 3. 큐 max-length 설정 (RabbitMQ 정책)
// rabbitmqctl set_policy max_len "task_queue" '{"max-length":10000}' --apply-to queues
6. 성능 최적화 팁
1. RabbitMQ: 배치 발행 (Batch Publish)
// ❌ 메시지마다 네트워크 왕복
for (const auto& msg : messages) {
channel_->BasicPublish("", queue, msg);
}
// ✅ 배치로 발행 (가능한 경우)
// SimpleAmqpClient는 배치 API가 없음 → 채널 재사용으로 오버헤드 감소
// 또는 publisher confirms를 비동기로 처리해 블로킹 최소화
2. Kafka: 배치 크기 및 압축
conf->set("batch.size", "16384", errstr); // 16KB 배치
conf->set("linger.ms", "5", errstr); // 5ms 대기 후 발행
conf->set("compression.type", "snappy", errstr); // 압축
3. Kafka: 파티션 수 조정
파티션 수 ≥ 컨슈머 수일 때 병렬 처리 극대화
# 토픽 생성 시 파티션 수 지정
kafka-topics --create --topic orders --partitions 8 --replication-factor 1
4. RabbitMQ: Prefetch 조정
// prefetch=1: 공정 분배, 처리 시간 긴 작업에 적합
channel_->BasicQos(0, 1, false);
// prefetch=10: 처리량 우선, 짧은 작업에 적합
channel_->BasicQos(0, 10, false);
5. 메시지 직렬화 최적화
// JSON vs Protobuf: Protobuf가 3~10배 작고 빠름
// 대용량/고빈도 메시지는 Protobuf 권장
7. 프로덕션 패턴
패턴 1: Dead Letter Queue (DLQ)
처리 실패 메시지를 별도 큐로 보내 나중에 수동 처리
# RabbitMQ 정책으로 DLQ 설정 (권장: 앱 재배포 없이 변경 가능)
rabbitmqctl set_policy dlx_policy "task_queue" \
'{"dead-letter-exchange":"dlx","dead-letter-routing-key":"failed"}' \
--apply-to queues
// 또는 큐 선언 시 arguments로 설정 (rabbitmq-c/AMQP-CPP 등)
// x-dead-letter-exchange, x-dead-letter-routing-key를 Table에 포함
패턴 2: 재시도 (Exponential Backoff)
bool process_with_retry(const std::string& body) {
const int max_retries = 5;
for (int i = 0; i < max_retries; ++i) {
try {
return handle_message(body);
} catch (const std::exception& e) {
if (i == max_retries - 1) return false;
std::this_thread::sleep_for(
std::chrono::milliseconds(100 * (1 << i)));
}
}
return false;
}
패턴 3: Idempotent Consumer
같은 메시지가 여러 번 전달돼도 한 번만 처리
std::unordered_set<std::string> processed_ids;
bool handle_order(const std::string& body) {
auto order = json::parse(body);
std::string id = order["order_id"];
if (processed_ids.count(id)) return true; // 이미 처리됨 → ACK
// ... 실제 처리 ...
processed_ids.insert(id);
return true;
}
패턴 4: 헬스 체크
// RabbitMQ 연결 상태 확인
bool is_connected() {
try {
channel_->GetChannelId();
return true;
} catch (...) {
return false;
}
}
패턴 5: 메트릭 수집
// 발행/소비 메트릭 (Prometheus 등)
void record_published(const std::string& queue, size_t size) {
metrics_.counter("mq_messages_published_total", {{"queue", queue}}).inc();
metrics_.histogram("mq_message_size_bytes", buckets_, {{"queue", queue}})
.observe(static_cast<double>(size));
}
8. 구현 체크리스트
환경 설정
- RabbitMQ/Kafka Docker 또는 로컬 설치
- SimpleAmqpClient, librdkafka (vcpkg) 설치
- 연결 정보 (host, port, credentials) 환경 변수화
RabbitMQ
- 큐 선언 (durable=true)
- 메시지 persistent (DeliveryMode=2)
- no_ack=false, 처리 후 BasicAck
- BasicQos(prefetch) 설정
- DLQ 설정 (선택)
Kafka
- enable.auto.commit=false, 수동 commit
- 처리 성공 후에만 commit
- consumer group.id 설정
- 파티션 수 및 replication factor 검토
공통
- 연결 재시도 로직
- 메시지 직렬화 (JSON/Protobuf) 일관성
- 에러 처리 및 로깅
- 메트릭 수집 (발행/소비 수, 지연)
보안
- 프로덕션에서 기본 guest/guest 비밀번호 변경
- TLS/SSL 적용 (필요 시)
- Kafka SASL 인증 (필요 시)
정리
| 항목 | 설명 |
|---|---|
| RabbitMQ | 작업 큐, RPC, 라우팅, Push 모델 |
| Kafka | 로그, 스트리밍, 대용량, Pull 모델 |
| 직렬화 | JSON(간편), Protobuf(성능) |
| 에러 처리 | ACK/NACK, DLQ, 재시도 |
| 프로덕션 | 수동 commit, DLQ, 멱등성, 메트릭 |
핵심 원칙:
- 비동기로 서비스 간 결합도 낮추기
- 메시지 유실 방지: ACK, persistent, 수동 commit
- 멱등성으로 중복 처리 방지
- 모니터링으로 적체·지연 감지
자주 묻는 질문 (FAQ)
Q. RabbitMQ와 Kafka 중 뭘 써야 하나요?
A. 작업 큐, RPC, 복잡한 라우팅이 필요하면 RabbitMQ. 로그 수집, 이벤트 스트리밍, 초당 수십만 메시지가 필요하면 Kafka. 둘 다 쓰는 하이브리드 구성도 많습니다.
Q. C++에서 다른 메시지 큐 라이브러리는?
A. RabbitMQ는 SimpleAmqpClient, AMQP-CPP 등이 있습니다. Kafka는 librdkafka가 사실상 표준입니다. ZeroMQ는 브로커 없이 P2P 메시징에 적합합니다.
Q. 메시지 순서를 보장하려면?
A. RabbitMQ: 단일 큐 + 단일 컨슈머. Kafka: 같은 키는 같은 파티션으로 가므로 파티션 내 순서 보장. 파티션 키를 메시지별로 바꾸면 순서가 깨질 수 있습니다.
Q. 선행으로 읽으면 좋은 글은?
A. 각 글 하단의 이전 글 링크를 따라가면 순서대로 배울 수 있습니다. C++ 시리즈 목차에서 전체 흐름을 확인할 수 있습니다.
한 줄 요약: RabbitMQ·Kafka 통합으로 비동기 메시징 시스템을 구축하고, 작업 큐·이벤트 스트리밍·마이크로서비스 통신을 실전 수준으로 구현할 수 있습니다.
다음 글: [C++ 실전 가이드 #50-8] 캐싱 전략
이전 글: [C++ 실전 가이드 #50-6] 실시간 모니터링 대시보드
관련 글
- C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c
- C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드