C++ Message Queues: RabbitMQ and Kafka Integration Complete
이 글의 핵심
Complete message queue guide: Decouple services with AMQP and Kafka, producers and consumers, serialization, backpressure, performance comparison, real-world examples, and production patterns.
Introduction: “Sync calls are the bottleneck”
If service A calls B over HTTP and waits, A pays B’s full latency (DB, external APIs, heavy CPU). Message queues let A publish work and return quickly—decoupling producers and consumers and buffering spikes. Topics:
- RabbitMQ (AMQP) — exchanges, queues, acks (e.g. SimpleAmqpClient patterns)
- Kafka — topics, partitions, consumer groups (e.g. librdkafka)
- Serialization — JSON, Protobuf
- Errors, tuning, production patterns Environment: C++17+.
Table of contents
- When queues help
- RabbitMQ vs Kafka
- RabbitMQ implementation
- Kafka implementation
- Serialization strategies
- Real-world examples
- Performance comparison
- Error handling
- Common mistakes
- Best practices
- Production patterns
1. When queues help
Use cases
- Order pipeline — user gets immediate ACK; payment, stock, email, analytics run asynchronously.
- Traffic spikes — queue absorbs bursts; workers drain at sustainable rate.
- Microservices — publish OrderCreated; downstream services subscribe independently.
- Centralized logs/metrics — Kafka-style log aggregation.
- Heavy jobs — image resize, encoding, fan-out to worker pools.
Architecture comparison
flowchart LR
subgraph sync["Synchronous (HTTP)"]
A1[Service A] -->|wait| B1[Service B]
B1 -->|wait| C1[Service C]
end
subgraph async["Asynchronous (Queue)"]
A2[Service A] -->|publish| Q[Queue]
Q --> B2[Service B]
Q --> C2[Service C]
end
Benefits:
- Decoupling: Services don’t need to know about each other
- Buffering: Handle traffic spikes
- Reliability: Retry failed messages
- Scalability: Add more consumers
2. RabbitMQ vs Kafka
| RabbitMQ | Kafka | |
|---|---|---|
| Model | Classic broker, routing | Durable log / streaming |
| Retention | Often delete after ack | Time/size retention |
| Throughput | Very high (100k+ msg/s) | Extremely high (1M+ msg/s) |
| Ordering | Per queue | Per partition |
| Latency | Low (ms) | Low-medium (ms-10ms) |
| Use case | RPC-ish, work queues | Event logs, analytics |
| Replay | Limited | Full replay support |
| Routing | Flexible (exchanges) | Simple (topics) |
When to use RabbitMQ
- Traditional messaging patterns
- Complex routing (topic, fanout, direct)
- RPC-style request/response
- Lower message volume (<100k msg/s)
- Message acknowledgment critical
When to use Kafka
- Event sourcing
- Log aggregation
- High throughput (>100k msg/s)
- Message replay needed
- Stream processing
- Multiple consumers reading same data
3. RabbitMQ implementation
Installation
# Ubuntu
sudo apt-get install rabbitmq-server
# Docker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# C++ client
sudo apt-get install librabbitmq-dev
# Or use AMQP-CPP: https://github.com/CopernicaMarketingSoftware/AMQP-CPP
Producer example
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <string>
class RabbitMQProducer {
amqp_connection_state_t conn_;
amqp_socket_t* socket_;
std::string exchange_;
public:
RabbitMQProducer(const std::string& host, int port, const std::string& exchange)
: exchange_(exchange) {
conn_ = amqp_new_connection();
socket_ = amqp_tcp_socket_new(conn_);
if (amqp_socket_open(socket_, host.c_str(), port) != 0) {
throw std::runtime_error("Cannot open socket");
}
amqp_rpc_reply_t reply = amqp_login(conn_, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
throw std::runtime_error("Login failed");
}
amqp_channel_open(conn_, 1);
amqp_get_rpc_reply(conn_);
}
~RabbitMQProducer() {
amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn_);
}
void publish(const std::string& routing_key, const std::string& message) {
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2; // Persistent
amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
int result = amqp_basic_publish(
conn_,
1,
amqp_cstring_bytes(exchange_.c_str()),
amqp_cstring_bytes(routing_key.c_str()),
0,
0,
&props,
message_bytes
);
if (result != 0) {
throw std::runtime_error("Publish failed");
}
}
};
Consumer example
class RabbitMQConsumer {
amqp_connection_state_t conn_;
std::string queue_;
public:
RabbitMQConsumer(const std::string& host, int port, const std::string& queue)
: queue_(queue) {
conn_ = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn_);
amqp_socket_open(socket, host.c_str(), port);
amqp_login(conn_, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
amqp_channel_open(conn_, 1);
amqp_get_rpc_reply(conn_);
// Set prefetch count
amqp_basic_qos(conn_, 1, 0, 10, 0);
}
~RabbitMQConsumer() {
amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn_);
}
void consume(std::function<bool(const std::string&)> handler) {
amqp_basic_consume(conn_, 1, amqp_cstring_bytes(queue_.c_str()),
amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
while (true) {
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn_);
amqp_rpc_reply_t result = amqp_consume_message(conn_, &envelope, NULL, 0);
if (result.reply_type == AMQP_RESPONSE_NORMAL) {
std::string message(
static_cast<char*>(envelope.message.body.bytes),
envelope.message.body.len
);
bool success = handler(message);
if (success) {
amqp_basic_ack(conn_, 1, envelope.delivery_tag, 0);
} else {
amqp_basic_nack(conn_, 1, envelope.delivery_tag, 0, 1);
}
amqp_destroy_envelope(&envelope);
}
}
}
};
4. Kafka implementation
Installation
# Install librdkafka
sudo apt-get install librdkafka-dev
# Or build from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
Producer example
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
class KafkaProducer {
std::unique_ptr<RdKafka::Producer> producer_;
std::unique_ptr<RdKafka::Topic> topic_;
public:
KafkaProducer(const std::string& brokers, const std::string& topic) {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("client.id", "cpp-producer", errstr);
producer_.reset(RdKafka::Producer::create(conf, errstr));
if (!producer_) {
throw std::runtime_error("Failed to create producer: " + errstr);
}
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
topic_.reset(RdKafka::Topic::create(producer_.get(), topic, tconf, errstr));
delete conf;
delete tconf;
}
void produce(const std::string& key, const std::string& message) {
RdKafka::ErrorCode resp = producer_->produce(
topic_.get(),
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(message.c_str()),
message.size(),
&key,
nullptr
);
if (resp != RdKafka::ERR_NO_ERROR) {
throw std::runtime_error("Produce failed: " + RdKafka::err2str(resp));
}
producer_->poll(0);
}
void flush(int timeout_ms = 10000) {
producer_->flush(timeout_ms);
}
};
Consumer example
class KafkaConsumer {
std::unique_ptr<RdKafka::KafkaConsumer> consumer_;
public:
KafkaConsumer(const std::string& brokers, const std::string& group_id,
const std::vector<std::string>& topics) {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("enable.auto.commit", "false", errstr);
consumer_.reset(RdKafka::KafkaConsumer::create(conf, errstr));
if (!consumer_) {
throw std::runtime_error("Failed to create consumer: " + errstr);
}
RdKafka::ErrorCode err = consumer_->subscribe(topics);
if (err) {
throw std::runtime_error("Subscribe failed: " + RdKafka::err2str(err));
}
delete conf;
}
void consume(std::function<bool(const std::string&, const std::string&)> handler) {
while (true) {
RdKafka::Message* message = consumer_->consume(1000);
if (message->err() == RdKafka::ERR_NO_ERROR) {
std::string key = message->key() ? *message->key() : "";
std::string payload(
static_cast<const char*>(message->payload()),
message->len()
);
bool success = handler(key, payload);
if (success) {
consumer_->commitSync(message);
}
}
delete message;
}
}
};
5. Serialization strategies
JSON serialization
#include <nlohmann/json.hpp>
struct Order {
std::string id;
std::string customer;
double amount;
std::string to_json() const {
nlohmann::json j;
j[id] = id;
j[customer] = customer;
j[amount] = amount;
return j.dump();
}
static Order from_json(const std::string& json_str) {
auto j = nlohmann::json::parse(json_str);
Order order;
order.id = j[id];
order.customer = j[customer];
order.amount = j[amount];
return order;
}
};
Protobuf serialization
// order.proto
syntax = "proto3";
message Order {
string id = 1;
string customer = 2;
double amount = 3;
int64 timestamp = 4;
}
#include "order.pb.h"
std::string serialize_order(const Order& order) {
OrderProto proto;
proto.set_id(order.id);
proto.set_customer(order.customer);
proto.set_amount(order.amount);
proto.set_timestamp(std::time(nullptr));
std::string serialized;
proto.SerializeToString(&serialized);
return serialized;
}
Order deserialize_order(const std::string& data) {
OrderProto proto;
proto.ParseFromString(data);
Order order;
order.id = proto.id();
order.customer = proto.customer();
order.amount = proto.amount();
return order;
}
6. Real-world examples
Example 1: Order processing system
class OrderProcessor {
KafkaProducer producer_;
public:
OrderProcessor() : producer_("localhost:9092", "orders") {}
void create_order(const Order& order) {
// Publish to Kafka
std::string message = order.to_json();
producer_.produce(order.id, message);
producer_.flush();
std::cout << "Order published: " << order.id << "\n";
}
};
class PaymentService {
KafkaConsumer consumer_;
public:
PaymentService()
: consumer_("localhost:9092", "payment-service", {"orders"}) {}
void start() {
consumer_.consume([this](const std::string& key, const std::string& payload) {
try {
Order order = Order::from_json(payload);
process_payment(order);
return true;
} catch (const std::exception& e) {
std::cerr << "Payment failed: " << e.what() << "\n";
return false;
}
});
}
private:
void process_payment(const Order& order) {
std::cout << "Processing payment for order: " << order.id << "\n";
// Payment logic...
}
};
Example 2: Log aggregation
class LogAggregator {
KafkaProducer producer_;
public:
LogAggregator() : producer_("localhost:9092", "logs") {}
void log(const std::string& level, const std::string& message) {
nlohmann::json log_entry;
log_entry[timestamp] = std::time(nullptr);
log_entry[level] = level;
log_entry[message] = message;
log_entry[service] = "myapp";
producer_.produce("", log_entry.dump());
}
};
7. Performance comparison
Throughput benchmark
Messages/sec | RabbitMQ | Kafka
-------------|----------|-------
10,000 | 5ms | 2ms
100,000 | 50ms | 10ms
1,000,000 | 500ms | 50ms
Latency benchmark
Percentile | RabbitMQ | Kafka
-----------|----------|-------
p50 | 1ms | 2ms
p95 | 5ms | 10ms
p99 | 20ms | 50ms
8. Error handling
Retry logic
class RetryHandler {
int max_retries_;
std::chrono::milliseconds backoff_;
public:
RetryHandler(int max_retries, std::chrono::milliseconds backoff)
: max_retries_(max_retries), backoff_(backoff) {}
bool handle_message(const std::string& message) {
for (int attempt = 0; attempt < max_retries_; ++attempt) {
try {
process(message);
return true;
} catch (const std::exception& e) {
std::cerr << "Attempt " << attempt + 1 << " failed: " << e.what() << "\n";
if (attempt < max_retries_ - 1) {
std::this_thread::sleep_for(backoff_ * (attempt + 1));
}
}
}
// Send to dead letter queue
send_to_dlq(message);
return false;
}
};
Dead letter queue
class DeadLetterQueue {
KafkaProducer dlq_producer_;
public:
DeadLetterQueue() : dlq_producer_("localhost:9092", "dlq") {}
void send(const std::string& original_topic, const std::string& message,
const std::string& error) {
nlohmann::json dlq_message;
dlq_message[original_topic] = original_topic;
dlq_message[message] = message;
dlq_message[error] = error;
dlq_message[timestamp] = std::time(nullptr);
dlq_producer_.produce("", dlq_message.dump());
}
};
9. Common mistakes
Mistake 1: Not handling backpressure
// ❌ BAD: Producer overwhelms consumer
while (true) {
producer.produce(message); // No rate limiting
}
// ✅ GOOD: Rate limiting
RateLimiter limiter(1000); // 1000 msg/s
while (true) {
limiter.wait();
producer.produce(message);
}
Mistake 2: Not committing offsets
// ❌ BAD: Auto-commit may lose messages
consumer.consume([](const std::string& msg) {
process(msg); // If crash here, message lost
});
// ✅ GOOD: Manual commit after processing
consumer.consume([&](const std::string& msg) {
process(msg);
consumer.commit(); // Commit after success
});
10. Best practices
- Idempotent consumers: Handle duplicate messages
- Schema evolution: Use Protobuf or Avro
- Monitoring: Track lag, throughput, errors
- Partitioning: Design partition keys carefully
- Backpressure: Implement rate limiting
- Dead letter queues: Handle poison messages
- Graceful shutdown: Flush producers, close consumers
- Testing: Use embedded brokers for tests
11. Production patterns
Pattern 1: Circuit breaker
class CircuitBreaker {
enum State { CLOSED, OPEN, HALF_OPEN };
State state_ = CLOSED;
int failure_count_ = 0;
int threshold_ = 5;
public:
bool allow_request() {
if (state_ == OPEN) {
// Check if should try again
return false;
}
return true;
}
void on_success() {
failure_count_ = 0;
state_ = CLOSED;
}
void on_failure() {
failure_count_++;
if (failure_count_ >= threshold_) {
state_ = OPEN;
}
}
};
Summary
- RabbitMQ: Classic messaging, flexible routing, RPC patterns
- Kafka: High-throughput streaming, event sourcing, replay
- Serialization: JSON for simplicity, Protobuf for performance
- Error handling: Retries, dead letter queues, circuit breakers
- Production: Monitoring, backpressure, graceful shutdown
Next: Monitoring dashboard (#50-6)
Previous: Production deployment (#50-5)
Keywords
C++ message queue, RabbitMQ, Kafka, AMQP, async messaging, event-driven architecture, microservices
자주 묻는 질문 (FAQ)
Q. 이 내용을 실무에서 언제 쓰나요?
A. Complete message queue guide: Decouple services with AMQP and Kafka, producers and consumers, serialization strategies, … 실무에서는 위 본문의 예제와 선택 가이드를 참고해 적용하면 됩니다.
Q. 선행으로 읽으면 좋은 글은?
A. 각 글 하단의 이전 글 또는 관련 글 링크를 따라가면 순서대로 배울 수 있습니다. C++ 시리즈 목차에서 전체 흐름을 확인할 수 있습니다.
Q. 더 깊이 공부하려면?
A. cppreference와 해당 라이브러리 공식 문서를 참고하세요. 글 말미의 참고 자료 링크도 활용하면 좋습니다.
같이 보면 좋은 글 (내부 링크)
이 주제와 연결되는 다른 글입니다.
- C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달
- C++ 실시간 모니터링 대시보드 | Grafana·Prometheus 통합 [#50-6]
- C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c
이 글에서 다루는 키워드 (관련 검색어)
C++, message queue, RabbitMQ, Kafka, async, AMQP 등으로 검색하시면 이 글이 도움이 됩니다.