본문으로 건너뛰기
Previous
Next
C++ Message Queues: RabbitMQ and Kafka Integration Complete

C++ Message Queues: RabbitMQ and Kafka Integration Complete

C++ Message Queues: RabbitMQ and Kafka Integration Complete

이 글의 핵심

Complete message queue guide: Decouple services with AMQP and Kafka, producers and consumers, serialization, backpressure, performance comparison, real-world examples, and production patterns.

Introduction: “Sync calls are the bottleneck”

If service A calls B over HTTP and waits, A pays B’s full latency (DB, external APIs, heavy CPU). Message queues let A publish work and return quickly—decoupling producers and consumers and buffering spikes. Topics:

  • RabbitMQ (AMQP) — exchanges, queues, acks (e.g. SimpleAmqpClient patterns)
  • Kafka — topics, partitions, consumer groups (e.g. librdkafka)
  • Serialization — JSON, Protobuf
  • Errors, tuning, production patterns Environment: C++17+.

Table of contents

  1. When queues help
  2. RabbitMQ vs Kafka
  3. RabbitMQ implementation
  4. Kafka implementation
  5. Serialization strategies
  6. Real-world examples
  7. Performance comparison
  8. Error handling
  9. Common mistakes
  10. Best practices
  11. Production patterns

1. When queues help

Use cases

  1. Order pipeline — user gets immediate ACK; payment, stock, email, analytics run asynchronously.
  2. Traffic spikes — queue absorbs bursts; workers drain at sustainable rate.
  3. Microservices — publish OrderCreated; downstream services subscribe independently.
  4. Centralized logs/metrics — Kafka-style log aggregation.
  5. Heavy jobs — image resize, encoding, fan-out to worker pools.

Architecture comparison

flowchart LR
    subgraph sync["Synchronous (HTTP)"]
        A1[Service A] -->|wait| B1[Service B]
        B1 -->|wait| C1[Service C]
    end
    
    subgraph async["Asynchronous (Queue)"]
        A2[Service A] -->|publish| Q[Queue]
        Q --> B2[Service B]
        Q --> C2[Service C]
    end

Benefits:

  • Decoupling: Services don’t need to know about each other
  • Buffering: Handle traffic spikes
  • Reliability: Retry failed messages
  • Scalability: Add more consumers

2. RabbitMQ vs Kafka

RabbitMQKafka
ModelClassic broker, routingDurable log / streaming
RetentionOften delete after ackTime/size retention
ThroughputVery high (100k+ msg/s)Extremely high (1M+ msg/s)
OrderingPer queuePer partition
LatencyLow (ms)Low-medium (ms-10ms)
Use caseRPC-ish, work queuesEvent logs, analytics
ReplayLimitedFull replay support
RoutingFlexible (exchanges)Simple (topics)

When to use RabbitMQ

  • Traditional messaging patterns
  • Complex routing (topic, fanout, direct)
  • RPC-style request/response
  • Lower message volume (<100k msg/s)
  • Message acknowledgment critical

When to use Kafka

  • Event sourcing
  • Log aggregation
  • High throughput (>100k msg/s)
  • Message replay needed
  • Stream processing
  • Multiple consumers reading same data

3. RabbitMQ implementation

Installation

# Ubuntu
sudo apt-get install rabbitmq-server
# Docker
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
# C++ client
sudo apt-get install librabbitmq-dev
# Or use AMQP-CPP: https://github.com/CopernicaMarketingSoftware/AMQP-CPP

Producer example

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <iostream>
#include <string>
class RabbitMQProducer {
    amqp_connection_state_t conn_;
    amqp_socket_t* socket_;
    std::string exchange_;
    
public:
    RabbitMQProducer(const std::string& host, int port, const std::string& exchange)
        : exchange_(exchange) {
        conn_ = amqp_new_connection();
        socket_ = amqp_tcp_socket_new(conn_);
        
        if (amqp_socket_open(socket_, host.c_str(), port) != 0) {
            throw std::runtime_error("Cannot open socket");
        }
        
        amqp_rpc_reply_t reply = amqp_login(conn_, "/", 0, 131072, 0,
            AMQP_SASL_METHOD_PLAIN, "guest", "guest");
        
        if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
            throw std::runtime_error("Login failed");
        }
        
        amqp_channel_open(conn_, 1);
        amqp_get_rpc_reply(conn_);
    }
    
    ~RabbitMQProducer() {
        amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
        amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
        amqp_destroy_connection(conn_);
    }
    
    void publish(const std::string& routing_key, const std::string& message) {
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("application/json");
        props.delivery_mode = 2;  // Persistent
        
        amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());
        
        int result = amqp_basic_publish(
            conn_,
            1,
            amqp_cstring_bytes(exchange_.c_str()),
            amqp_cstring_bytes(routing_key.c_str()),
            0,
            0,
            &props,
            message_bytes
        );
        
        if (result != 0) {
            throw std::runtime_error("Publish failed");
        }
    }
};

Consumer example

class RabbitMQConsumer {
    amqp_connection_state_t conn_;
    std::string queue_;
    
public:
    RabbitMQConsumer(const std::string& host, int port, const std::string& queue)
        : queue_(queue) {
        conn_ = amqp_new_connection();
        amqp_socket_t* socket = amqp_tcp_socket_new(conn_);
        
        amqp_socket_open(socket, host.c_str(), port);
        amqp_login(conn_, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
        amqp_channel_open(conn_, 1);
        amqp_get_rpc_reply(conn_);
        
        // Set prefetch count
        amqp_basic_qos(conn_, 1, 0, 10, 0);
    }
    
    ~RabbitMQConsumer() {
        amqp_channel_close(conn_, 1, AMQP_REPLY_SUCCESS);
        amqp_connection_close(conn_, AMQP_REPLY_SUCCESS);
        amqp_destroy_connection(conn_);
    }
    
    void consume(std::function<bool(const std::string&)> handler) {
        amqp_basic_consume(conn_, 1, amqp_cstring_bytes(queue_.c_str()),
            amqp_empty_bytes, 0, 0, 0, amqp_empty_table);
        
        while (true) {
            amqp_envelope_t envelope;
            amqp_maybe_release_buffers(conn_);
            
            amqp_rpc_reply_t result = amqp_consume_message(conn_, &envelope, NULL, 0);
            
            if (result.reply_type == AMQP_RESPONSE_NORMAL) {
                std::string message(
                    static_cast<char*>(envelope.message.body.bytes),
                    envelope.message.body.len
                );
                
                bool success = handler(message);
                
                if (success) {
                    amqp_basic_ack(conn_, 1, envelope.delivery_tag, 0);
                } else {
                    amqp_basic_nack(conn_, 1, envelope.delivery_tag, 0, 1);
                }
                
                amqp_destroy_envelope(&envelope);
            }
        }
    }
};

4. Kafka implementation

Installation

# Install librdkafka
sudo apt-get install librdkafka-dev
# Or build from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

Producer example

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
class KafkaProducer {
    std::unique_ptr<RdKafka::Producer> producer_;
    std::unique_ptr<RdKafka::Topic> topic_;
    
public:
    KafkaProducer(const std::string& brokers, const std::string& topic) {
        std::string errstr;
        
        RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        conf->set("bootstrap.servers", brokers, errstr);
        conf->set("client.id", "cpp-producer", errstr);
        
        producer_.reset(RdKafka::Producer::create(conf, errstr));
        if (!producer_) {
            throw std::runtime_error("Failed to create producer: " + errstr);
        }
        
        RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
        topic_.reset(RdKafka::Topic::create(producer_.get(), topic, tconf, errstr));
        
        delete conf;
        delete tconf;
    }
    
    void produce(const std::string& key, const std::string& message) {
        RdKafka::ErrorCode resp = producer_->produce(
            topic_.get(),
            RdKafka::Topic::PARTITION_UA,
            RdKafka::Producer::RK_MSG_COPY,
            const_cast<char*>(message.c_str()),
            message.size(),
            &key,
            nullptr
        );
        
        if (resp != RdKafka::ERR_NO_ERROR) {
            throw std::runtime_error("Produce failed: " + RdKafka::err2str(resp));
        }
        
        producer_->poll(0);
    }
    
    void flush(int timeout_ms = 10000) {
        producer_->flush(timeout_ms);
    }
};

Consumer example

class KafkaConsumer {
    std::unique_ptr<RdKafka::KafkaConsumer> consumer_;
    
public:
    KafkaConsumer(const std::string& brokers, const std::string& group_id,
                  const std::vector<std::string>& topics) {
        std::string errstr;
        
        RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        conf->set("bootstrap.servers", brokers, errstr);
        conf->set("group.id", group_id, errstr);
        conf->set("auto.offset.reset", "earliest", errstr);
        conf->set("enable.auto.commit", "false", errstr);
        
        consumer_.reset(RdKafka::KafkaConsumer::create(conf, errstr));
        if (!consumer_) {
            throw std::runtime_error("Failed to create consumer: " + errstr);
        }
        
        RdKafka::ErrorCode err = consumer_->subscribe(topics);
        if (err) {
            throw std::runtime_error("Subscribe failed: " + RdKafka::err2str(err));
        }
        
        delete conf;
    }
    
    void consume(std::function<bool(const std::string&, const std::string&)> handler) {
        while (true) {
            RdKafka::Message* message = consumer_->consume(1000);
            
            if (message->err() == RdKafka::ERR_NO_ERROR) {
                std::string key = message->key() ? *message->key() : "";
                std::string payload(
                    static_cast<const char*>(message->payload()),
                    message->len()
                );
                
                bool success = handler(key, payload);
                
                if (success) {
                    consumer_->commitSync(message);
                }
            }
            
            delete message;
        }
    }
};

5. Serialization strategies

JSON serialization

#include <nlohmann/json.hpp>
struct Order {
    std::string id;
    std::string customer;
    double amount;
    
    std::string to_json() const {
        nlohmann::json j;
        j[id] = id;
        j[customer] = customer;
        j[amount] = amount;
        return j.dump();
    }
    
    static Order from_json(const std::string& json_str) {
        auto j = nlohmann::json::parse(json_str);
        Order order;
        order.id = j[id];
        order.customer = j[customer];
        order.amount = j[amount];
        return order;
    }
};

Protobuf serialization

// order.proto
syntax = "proto3";
message Order {
  string id = 1;
  string customer = 2;
  double amount = 3;
  int64 timestamp = 4;
}
#include "order.pb.h"
std::string serialize_order(const Order& order) {
    OrderProto proto;
    proto.set_id(order.id);
    proto.set_customer(order.customer);
    proto.set_amount(order.amount);
    proto.set_timestamp(std::time(nullptr));
    
    std::string serialized;
    proto.SerializeToString(&serialized);
    return serialized;
}
Order deserialize_order(const std::string& data) {
    OrderProto proto;
    proto.ParseFromString(data);
    
    Order order;
    order.id = proto.id();
    order.customer = proto.customer();
    order.amount = proto.amount();
    return order;
}

6. Real-world examples

Example 1: Order processing system

class OrderProcessor {
    KafkaProducer producer_;
    
public:
    OrderProcessor() : producer_("localhost:9092", "orders") {}
    
    void create_order(const Order& order) {
        // Publish to Kafka
        std::string message = order.to_json();
        producer_.produce(order.id, message);
        producer_.flush();
        
        std::cout << "Order published: " << order.id << "\n";
    }
};
class PaymentService {
    KafkaConsumer consumer_;
    
public:
    PaymentService() 
        : consumer_("localhost:9092", "payment-service", {"orders"}) {}
    
    void start() {
        consumer_.consume([this](const std::string& key, const std::string& payload) {
            try {
                Order order = Order::from_json(payload);
                process_payment(order);
                return true;
            } catch (const std::exception& e) {
                std::cerr << "Payment failed: " << e.what() << "\n";
                return false;
            }
        });
    }
    
private:
    void process_payment(const Order& order) {
        std::cout << "Processing payment for order: " << order.id << "\n";
        // Payment logic...
    }
};

Example 2: Log aggregation

class LogAggregator {
    KafkaProducer producer_;
    
public:
    LogAggregator() : producer_("localhost:9092", "logs") {}
    
    void log(const std::string& level, const std::string& message) {
        nlohmann::json log_entry;
        log_entry[timestamp] = std::time(nullptr);
        log_entry[level] = level;
        log_entry[message] = message;
        log_entry[service] = "myapp";
        
        producer_.produce("", log_entry.dump());
    }
};

7. Performance comparison

Throughput benchmark

Messages/sec | RabbitMQ | Kafka
-------------|----------|-------
10,000       | 5ms      | 2ms
100,000      | 50ms     | 10ms
1,000,000    | 500ms    | 50ms

Latency benchmark

Percentile | RabbitMQ | Kafka
-----------|----------|-------
p50        | 1ms      | 2ms
p95        | 5ms      | 10ms
p99        | 20ms     | 50ms

8. Error handling

Retry logic

class RetryHandler {
    int max_retries_;
    std::chrono::milliseconds backoff_;
    
public:
    RetryHandler(int max_retries, std::chrono::milliseconds backoff)
        : max_retries_(max_retries), backoff_(backoff) {}
    
    bool handle_message(const std::string& message) {
        for (int attempt = 0; attempt < max_retries_; ++attempt) {
            try {
                process(message);
                return true;
            } catch (const std::exception& e) {
                std::cerr << "Attempt " << attempt + 1 << " failed: " << e.what() << "\n";
                if (attempt < max_retries_ - 1) {
                    std::this_thread::sleep_for(backoff_ * (attempt + 1));
                }
            }
        }
        
        // Send to dead letter queue
        send_to_dlq(message);
        return false;
    }
};

Dead letter queue

class DeadLetterQueue {
    KafkaProducer dlq_producer_;
    
public:
    DeadLetterQueue() : dlq_producer_("localhost:9092", "dlq") {}
    
    void send(const std::string& original_topic, const std::string& message,
              const std::string& error) {
        nlohmann::json dlq_message;
        dlq_message[original_topic] = original_topic;
        dlq_message[message] = message;
        dlq_message[error] = error;
        dlq_message[timestamp] = std::time(nullptr);
        
        dlq_producer_.produce("", dlq_message.dump());
    }
};

9. Common mistakes

Mistake 1: Not handling backpressure

// ❌ BAD: Producer overwhelms consumer
while (true) {
    producer.produce(message);  // No rate limiting
}
// ✅ GOOD: Rate limiting
RateLimiter limiter(1000);  // 1000 msg/s
while (true) {
    limiter.wait();
    producer.produce(message);
}

Mistake 2: Not committing offsets

// ❌ BAD: Auto-commit may lose messages
consumer.consume([](const std::string& msg) {
    process(msg);  // If crash here, message lost
});
// ✅ GOOD: Manual commit after processing
consumer.consume([&](const std::string& msg) {
    process(msg);
    consumer.commit();  // Commit after success
});

10. Best practices

  1. Idempotent consumers: Handle duplicate messages
  2. Schema evolution: Use Protobuf or Avro
  3. Monitoring: Track lag, throughput, errors
  4. Partitioning: Design partition keys carefully
  5. Backpressure: Implement rate limiting
  6. Dead letter queues: Handle poison messages
  7. Graceful shutdown: Flush producers, close consumers
  8. Testing: Use embedded brokers for tests

11. Production patterns

Pattern 1: Circuit breaker

class CircuitBreaker {
    enum State { CLOSED, OPEN, HALF_OPEN };
    State state_ = CLOSED;
    int failure_count_ = 0;
    int threshold_ = 5;
    
public:
    bool allow_request() {
        if (state_ == OPEN) {
            // Check if should try again
            return false;
        }
        return true;
    }
    
    void on_success() {
        failure_count_ = 0;
        state_ = CLOSED;
    }
    
    void on_failure() {
        failure_count_++;
        if (failure_count_ >= threshold_) {
            state_ = OPEN;
        }
    }
};

Summary

  • RabbitMQ: Classic messaging, flexible routing, RPC patterns
  • Kafka: High-throughput streaming, event sourcing, replay
  • Serialization: JSON for simplicity, Protobuf for performance
  • Error handling: Retries, dead letter queues, circuit breakers
  • Production: Monitoring, backpressure, graceful shutdown Next: Monitoring dashboard (#50-6)
    Previous: Production deployment (#50-5)

Keywords

C++ message queue, RabbitMQ, Kafka, AMQP, async messaging, event-driven architecture, microservices


자주 묻는 질문 (FAQ)

Q. 이 내용을 실무에서 언제 쓰나요?

A. Complete message queue guide: Decouple services with AMQP and Kafka, producers and consumers, serialization strategies, … 실무에서는 위 본문의 예제와 선택 가이드를 참고해 적용하면 됩니다.

Q. 선행으로 읽으면 좋은 글은?

A. 각 글 하단의 이전 글 또는 관련 글 링크를 따라가면 순서대로 배울 수 있습니다. C++ 시리즈 목차에서 전체 흐름을 확인할 수 있습니다.

Q. 더 깊이 공부하려면?

A. cppreference와 해당 라이브러리 공식 문서를 참고하세요. 글 말미의 참고 자료 링크도 활용하면 좋습니다.


같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.


이 글에서 다루는 키워드 (관련 검색어)

C++, message queue, RabbitMQ, Kafka, async, AMQP 등으로 검색하시면 이 글이 도움이 됩니다.