본문으로 건너뛰기
Previous
Next
C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달

C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달

C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달

이 글의 핵심

C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머 그룹·오프셋 관리 구현. Connection timeout·메시지 유실·리밸런싱 등 흔한 에러 해결, 배치·압축 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.

들어가며: C++에서 Kafka를 왜 쓰나요?

실제 겪는 문제 시나리오

시나리오 1: 로그 수집 시 DB 부하 폭증

여러 마이크로서비스에서 로그를 중앙 DB에 직접 INSERT합니다. 트래픽이 늘면 DB 연결 수가 폭증하고, 로그 쓰기 지연으로 애플리케이션 응답이 느려집니다. “로그 때문에 메인 서비스가 죽어요.” 시나리오 2: 주문 이벤트를 여러 서비스에 전달

주문 완료 시 재고 차감, 포인트 적립, 알림 발송 등 여러 서비스에 이벤트를 보내야 합니다. HTTP로 순차 호출하면 지연이 누적되고, 한 서비스 장애 시 전체가 멈춥니다. “어떤 서비스가 느리면 전체 주문 처리가 막혀요.” 시나리오 3: 실시간 분석 파이프라인

클릭 스트림, 센서 데이터를 실시간으로 수집해 분석 엔진에 전달해야 합니다. 폴링 방식은 지연이 크고, 직접 DB 조회는 부하가 큽니다. “1초 단위로 집계해야 하는데 10초씩 밀려요.” 시나리오 4: librdkafka 도입 후 “Connection refused” 에러

Kafka 브로커 주소를 설정했는데 연결이 안 됩니다. localhost:9092 vs 127.0.0.1:9092, 방화벽, Docker 네트워크 등 확인할 것이 많아 막막합니다. 시나리오 5: 컨슈머 재시작 시 메시지 중복 처리

컨슈머가 메시지 처리 중 크래시했습니다. 재시작 후 같은 메시지를 다시 읽어 중복 처리됩니다. “주문이 두 번 차감돼요.” 시나리오 6: 파티션 리밸런싱 시 처리 중단

컨슈머 그룹에 새 인스턴스를 추가했더니 기존 컨슈머가 담당하던 파티션이 재할당됩니다. 리밸런싱 중 메시지 처리 순서가 꼬이거나 유실될 수 있습니다. Kafka로 해결:

  • 이벤트 스트리밍: 프로듀서가 메시지를 토픽에 발행, 컨슈머가 구독. DB에 직접 쓰지 않아 메인 서비스 부하 감소
  • 비동기·디커플링: 프로듀서는 발행만 하고, 여러 컨슈머가 각자 처리. 한 서비스 장애가 다른 서비스에 영향 없음
  • 오프셋 관리: 컨슈머 그룹이 읽은 위치를 커밋해, 재시작 시 이어서 처리
  • 파티션·스케일아웃: 파티션 수만큼 컨슈머를 늘려 처리량 확장
flowchart LR
  subgraph Producer["프로듀서 (C++)"]
    P1[앱 로그]
    P2[주문 이벤트]
    P3[센서 데이터]
  end
  subgraph Kafka[Apache Kafka]
    T1[logs 토픽]
    T2[orders 토픽]
    T3[events 토픽]
  end
  subgraph Consumer["컨슈머 (C++)"]
    C1[로그 저장]
    C2[재고 차감]
    C3[실시간 분석]
  end
  P1 --> T1
  P2 --> T2
  P3 --> T3
  T1 --> C1
  T2 --> C2
  T3 --> C3

Kafka 프로듀서-컨슈머 흐름

sequenceDiagram
  participant P as 프로듀서
  participant B as 브로커
  participant C as 컨슈머
  P->>B: produce(topic, key, value)
  B->>B: 파티션에 저장
  B->>P: dr_cb (delivery report)
  C->>B: subscribe(topic)
  B->>C: 파티션 할당
  loop consume
    C->>B: poll() → 메시지 수신
    C->>C: 비즈니스 로직 처리
    C->>B: commit() 오프셋
  end

RabbitMQ vs Kafka 비교

항목RabbitMQKafka
모델큐, Exchange토픽, 파티션
메시지 보존소비 후 삭제 (기본)보존 기간 동안 유지
재처리별도 구현오프셋 이동으로 가능
처리량수만 msg/s수백만 msg/s
C++ 클라이언트rabbitmq-clibrdkafka
이 글에서 다루는 것:
  • librdkafka 설치 및 CMake 연동
  • 완전한 프로듀서·컨슈머 C++ 예제
  • 자주 발생하는 에러와 해결법
  • 성능 최적화 (배치, 압축, 파티션)
  • 프로덕션 배포 패턴

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

1. 환경 설정

필수 의존성

항목버전비고
C++C++14 이상C++17 권장
librdkafka2.0+vcpkg, Homebrew, 또는 소스 빌드
Apache Kafka2.8+브로커 (Docker 권장)
CMake3.16+FindPackage 지원

Kafka 브로커 실행 (Docker)

# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  apache/kafka:3.6
# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

librdkafka 설치

# vcpkg (권장)
vcpkg install rdkafka
# macOS (Homebrew)
brew install librdkafka
# Ubuntu/Debian
sudo apt-get install librdkafka-dev

CMakeLists.txt 기본 설정

cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(RdKafka CONFIG REQUIRED)
add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)
add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)

주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.

2. 완전한 프로듀서 예제

2.1 최소 동작 프로듀서

// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string topic = argc > 2 ? argv[2] : "app-logs";
    std::string errstr;
    // 1. 전역 설정
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    // 2. 프로듀서 생성
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }
    // 3. 메시지 발행
    std::string payload = "Hello, Kafka from C++!";
    RdKafka::ErrorCode err = producer->produce(
        topic,
        RdKafka::Topic::PARTITION_UA,  // 파티션 자동 할당
        RdKafka::Producer::RK_MSG_COPY,
        const_cast<char*>(payload.data()),
        payload.size(),
        "key1", 5,  // 키 (선택)
        0, nullptr);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
    } else {
        std::cout << "메시지 발행 완료" << std::endl;
    }
    // 4. 대기 중인 delivery report 처리
    while (producer->outq_len() > 0) {
        producer->poll(100);
    }
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • PARTITION_UA: 파티션을 자동 선택 (키 해시 또는 라운드로빈)
  • RK_MSG_COPY: 페이로드를 복사해 저장 (호출 후 버퍼 수정 가능)
  • outq_len(): 전송 대기 중인 메시지 수. 0이 될 때까지 poll() 호출 필요

2.2 Delivery Report 콜백 (실전용)

발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다.

// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
        } else {
            std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
                      << msg.partition() << "] @ " << msg.offset() << std::endl;
        }
    }
};
int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    DeliveryReportCb dr_cb;
    conf->set("dr_cb", &dr_cb, errstr);
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }
    std::string payload = "Test message with delivery report";
    producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
                      RdKafka::Producer::RK_MSG_COPY,
                      const_cast<char*>(payload.data()), payload.size(),
                      nullptr, 0, 0, nullptr);
    // poll을 호출해야 dr_cb가 실행됨
    for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
        producer->poll(100);
    }
    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다. 프로듀서 스레드에서 루프로 poll(100)을 호출하는 것이 일반적입니다.

2.3 RAII 래퍼 클래스 (재사용 가능)

// kafka_producer.hpp
#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <memory>
#include <stdexcept>
#include <string>
class KafkaProducer {
public:
    KafkaProducer(const std::string& brokers,
                  const std::string& client_id = "cpp-producer") {
        std::string errstr;
        conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
        conf_->set("bootstrap.servers", brokers, errstr);
        conf_->set("client.id", client_id, errstr);
        conf_->set("dr_cb", &dr_cb_, errstr);
        producer_.reset(RdKafka::Producer::create(conf_.get(), errstr));
        if (!producer_) {
            throw std::runtime_error("프로듀서 생성 실패: " + errstr);
        }
    }
    void produce(const std::string& topic, const std::string& key,
                 const std::string& value) {
        RdKafka::ErrorCode err = producer_->produce(
            topic, RdKafka::Topic::PARTITION_UA,
            RdKafka::Producer::RK_MSG_COPY,
            const_cast<char*>(value.data()), value.size(),
            key.empty() ? nullptr : key.data(), key.size(),
            0, nullptr);
        if (err != RdKafka::ERR_NO_ERROR) {
            throw std::runtime_error("발행 실패: " + RdKafka::err2str(err));
        }
        producer_->poll(0);
    }
    void flush(int timeout_ms = 10000) {
        producer_->flush(timeout_ms);
    }
    void poll(int timeout_ms = 0) {
        producer_->poll(timeout_ms);
    }
private:
    struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
        void dr_cb(RdKafka::Message& msg) override {
            if (msg.err()) {
                std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
            }
        }
    } dr_cb_;
    std::unique_ptr<RdKafka::Conf> conf_;
    std::unique_ptr<RdKafka::Producer> producer_;
};

3. 완전한 컨슈머 예제

3.1 최소 동작 컨슈머

// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sigterm_handler(int) { run = 0; }
int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
    std::string topic = argc > 3 ? argv[3] : "app-logs";
    std::string errstr;
    signal(SIGINT, sigterm_handler);
    signal(SIGTERM, sigterm_handler);
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    conf->set("group.id", group_id, errstr);
    conf->set("enable.auto.commit", "false", errstr);  // 수동 커밋
    RdKafka::KafkaConsumer* consumer =
        RdKafka::KafkaConsumer::create(conf, errstr);
    delete conf;
    if (!consumer) {
        std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
        return 1;
    }
    std::vector<std::string> topics = {topic};
    RdKafka::ErrorCode err = consumer->subscribe(topics);
    if (err) {
        std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
        delete consumer;
        return 1;
    }
    std::cout << "메시지 수신 대기 중....(Ctrl+C로 종료)" << std::endl;
    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR:
                std::cout << "[" << msg->topic_name() << ":" << msg->partition()
                          << "@" << msg->offset() << "] "
                          << std::string(static_cast<const char*>(msg->payload()),
                                         msg->len())
                          << std::endl;
                consumer->commit(msg);  // 오프셋 커밋
                break;
            case RdKafka::ERR__TIMED_OUT:
                break;
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                std::cerr << "소비 에러: " << msg->errstr() << std::endl;
                run = 0;
                break;
        }
        delete msg;
    }
    consumer->close();
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • enable.auto.commit=false: 메시지 처리 완료 후 수동으로 commit() 호출. 처리 중 크래시 시 재처리 가능
  • consume(1000): 1초 타임아웃. 메시지 없으면 ERR__TIMED_OUT 반환
  • commit(msg): 해당 메시지의 오프셋까지 커밋

3.2 리밸런싱 콜백 (컨슈머 그룹)

파티션 재할당 시 리밸런싱 콜백에서 assign/unassign를 처리해야 합니다.

// consumer_rebalance.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <signal.h>
static volatile sig_atomic_t run = 1;
class RebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb(RdKafka::KafkaConsumer* consumer,
                     RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*>& partitions) override {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
            consumer->assign(partitions);
            std::cerr << "[리밸런싱] 파티션 할당: ";
            for (auto* p : partitions)
                std::cerr << p->topic() << "[" << p->partition() << "] ";
            std::cerr << std::endl;
        } else {
            consumer->unassign();
            std::cerr << "[리밸런싱] 파티션 해제" << std::endl;
        }
    }
};
int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    conf->set("group.id", "cpp-group", errstr);
    conf->set("enable.auto.commit", "false", errstr);
    RebalanceCb rebalance_cb;
    conf->set("rebalance_cb", &rebalance_cb, errstr);
    RdKafka::KafkaConsumer* consumer =
        RdKafka::KafkaConsumer::create(conf, errstr);
    delete conf;
    consumer->subscribe({"app-logs"});
    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        if (msg->err() == RdKafka::ERR_NO_ERROR) {
            std::cout << std::string(static_cast<const char*>(msg->payload()),
                                     msg->len()) << std::endl;
            consumer->commit(msg);
        }
        delete msg;
    }
    consumer->close();
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

3.3 이벤트 콜백 (에러·통계)

// EventCb: 연결 에러, throttle 등 수신
class EventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal()) {
                    std::cerr << "치명적 에러: " << event.str() << std::endl;
                } else {
                    std::cerr << "에러: " << event.str() << std::endl;
                }
                break;
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "Throttled: " << event.throttle_time() << "ms by "
                          << event.broker_name() << std::endl;
                break;
            default:
                break;
        }
    }
};
// 설정 시
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);

4. 자주 발생하는 에러와 해결법

에러 1: “Connection refused” / “Broker: Connection refused”

증상: 프로듀서/컨슈머 생성은 되지만 메시지 발행·소비 시 연결 실패. 원인:

  • Kafka 브로커가 실행 중이 아님
  • 잘못된 주소/포트
  • Docker 환경에서 localhost vs 127.0.0.1 혼동
  • 방화벽 차단 해결법:
# 브로커 실행 확인
docker ps | grep kafka
# Docker 내부에서 접속 시
# bootstrap.servers = localhost:9092 (호스트에서)
# bootstrap.servers = host.docker.internal:9092 (Docker 컨테이너에서 호스트 접속)
// ✅ 여러 브로커 지정 (고가용성)
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);
// ✅ 연결 타임아웃 설정
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);

에러 2: “Topic ‘xxx’ not present in metadata”

증상: produce() 호출 시 ERR__UNKNOWN_TOPIC 또는 토픽을 찾을 수 없음. 원인: 토픽이 아직 생성되지 않았거나, auto.create.topics.enable=true인데 브로커가 아직 메타데이터를 갱신하지 않음. 해결법:

# 토픽 사전 생성
kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092
// ✅ 토픽 자동 생성 대기 (metadata.max.age.ms)
conf->set("metadata.max.age.ms", "5000", errstr);

에러 3: “Message delivery failed: Broker: Message size too large”

증상: 큰 메시지 발행 시 실패. 원인: 브로커의 message.max.bytes 설정보다 메시지가 큼. 해결법:

// 프로듀서: 메시지 크기 제한
conf->set("message.max.bytes", "10485760", errstr);  // 10MB
// 브로커 설정: message.max.bytes, replica.fetch.max.bytes 동일하게

에러 4: “Commit failed: Local: No offset stored”

증상: consumer->commit(msg) 호출 시 에러. 원인: enable.auto.commit=true이면서 수동 commit()을 호출하거나, 아직 커밋할 오프셋이 없음. 해결법:

// ✅ 수동 커밋 사용 시
conf->set("enable.auto.commit", "false", errstr);
// ✅ commitSync 호출
consumer->commit(msg);
// 또는
consumer->commitSync();  // 현재 오프셋까지 커밋

에러 5: 메시지 중복 처리

증상: 컨슈머 재시작 후 같은 메시지를 다시 처리. 원인: 메시지 처리 완료 전에 커밋하거나, 처리 후 커밋 전에 크래시. 해결법:

// ✅ 처리 완료 후 커밋 (at-least-once)
void process_msg(RdKafka::Message* msg) {
    // 1. 비즈니스 로직 처리
    doBusinessLogic(msg);
    // 2. 처리 성공 후 커밋
    consumer->commit(msg);
}
// ❌ 나쁜 예: 처리 전 커밋
consumer->commit(msg);  // 커밋
doBusinessLogic(msg);   // 처리 중 크래시 시 재처리됨

정확히 한 번 전달이 필요하면 프로듀서의 enable.idempotence=true와 트랜잭션을 사용합니다. Kafka 고급(#52-6)에서 다룹니다.

에러 6: “Out of memory” / “Queue full”

증상: 프로듀서가 produce() 호출 시 ERR__QUEUE_FULL 반환. 원인: 브로커 전송 속도보다 발행 속도가 빠름. 내부 큐가 가득 참. 해결법:

// ✅ 큐 크기 증가
conf->set("queue.buffering.max.messages", "1000000", errstr);
conf->set("queue.buffering.max.kbytes", "1048576", errstr);  // 1GB
// ✅ 발행 시 블로킹 대기
while (producer->outq_len() > 10000) {
    producer->poll(100);
}
producer->produce(...);

에러 7: “Broker: Not leader for partition”

증상: 리더가 변경된 파티션에 발행 시도. 원인: 브로커 장애·리밸런싱으로 리더가 변경됨. librdkafka는 자동으로 새 리더로 재시도합니다. 해결법:

// ✅ 재시도 설정 (기본값으로 충분)
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "100", errstr);

에러 8: “Consumer group is rebalancing”

증상: 컨슈머가 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS. 원인: 컨슈머 그룹에 인스턴스 추가/제거로 리밸런싱 중. 해결법:

// ✅ 리밸런싱 콜백에서 assign 후 재처리
// rebalance_cb에서 파티션 해제 시 처리 중인 메시지 정리
// assign 후 새 파티션에서 consume 재개

에러 9: DeliveryReportCb가 호출되지 않음

증상: dr_cb가 한 번도 실행되지 않음. 원인: poll()을 호출하지 않음. delivery report는 poll() 호출 시 처리됩니다. 해결법:

// ✅ 프로듀서 스레드에서 주기적 poll
void producer_loop() {
    while (running) {
        producer->produce(...);
        producer->poll(100);  // 필수!
    }
    producer->flush(10000);
}

5. 성능 최적화

5.1 배치 발행 (Linger)

메시지를 모아서 한 번에 전송하면 RTT를 줄일 수 있습니다.

// 배치 대기 시간 (ms)
conf->set("linger.ms", "5", errstr);  // 5ms 대기 후 전송
// 배치 크기 (bytes)
conf->set("batch.size", "16384", errstr);  // 16KB

주의: linger.ms가 크면 지연이 증가합니다. 처리량 vs 지연 트레이드오프를 고려하세요.

5.2 압축

네트워크 대역폭을 줄입니다.

conf->set("compression.type", "gzip", errstr);
// none, gzip, snappy, lz4, zstd
압축속도압축률CPU
none빠름없음낮음
snappy빠름중간중간
lz4빠름좋음중간
gzip느림높음높음
zstd중간매우 높음중간

5.3 프로듀서 풀링

// ❌ 나쁜 예: 매 요청마다 새 프로듀서
void handle_request() {
    auto producer = create_producer();
    producer->produce(...);
    delete producer;
}
// ✅ 좋은 예: 프로듀서 재사용 (싱글톤 또는 풀)
static KafkaProducer* get_producer() {
    static KafkaProducer producer("localhost:9092");
    return &producer;
}

5.4 파티션 수와 컨슈머 수

  • 파티션 수 ≥ 컨슈머 수일 때만 모든 컨슈머가 활용됩니다.
  • 파티션 수를 늘리면 처리량은 늘지만, 순서 보장은 파티션 내에서만입니다.
// 키가 같은 메시지는 같은 파티션으로
producer->produce(topic, partition, ..., key, key_len, ...);
// partition = PARTITION_UA면 키 해시로 파티션 선택

5.5 성능 비교 (참고)

설정초당 메시지 (대략)지연
linger.ms=0, batch 작음5만1ms
linger.ms=5, batch 16KB50만5ms
linger.ms=5, 압축30만5ms

6. 프로덕션 패턴

6.1 Graceful Shutdown

static std::atomic<bool> running{true};
void sig_handler(int) {
    running = false;
}
int main() {
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);
    // ....프로듀서 생성
    while (running) {
        producer->produce(...);
        producer->poll(100);
    }
    // 종료 전 모든 메시지 전송 완료 대기
    producer->flush(10000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

6.2 메시지 키 설계

// 파티션 내 순서 보장: 같은 키 → 같은 파티션
// 예: 사용자별 이벤트 순서
std::string key = "user:" + std::to_string(user_id);
producer->produce(topic, RdKafka::Topic::PARTITION_UA, ..., key.data(), key.size(), ...);

6.3 헬스 체크

// 프로듀서: 메타데이터 조회로 브로커 연결 확인
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = producer->metadata(true, nullptr, &metadata, 5000);
bool healthy = (err == RdKafka::ERR_NO_ERROR && metadata);
if (metadata) delete metadata;

6.4 환경 변수 기반 설정

struct KafkaConfig {
    std::string brokers = "localhost:9092";
    std::string group_id = "cpp-consumer";
    int linger_ms = 5;
};
KafkaConfig load_config() {
    KafkaConfig c;
    if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
    if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
    return c;
}

6.5 로깅 (debug)

// 문제 디버깅 시
conf->set("debug", "broker,topic,msg", errstr);
// broker, topic, msg, protocol, cgrp 등

6.6 SSL/TLS (프로덕션)

conf->set("security.protocol", "ssl", errstr);
conf->set("ssl.ca.location", "/path/to/ca-cert", errstr);
conf->set("ssl.certificate.location", "/path/to/client-cert", errstr);
conf->set("ssl.key.location", "/path/to/client-key", errstr);

6.7 SASL 인증

conf->set("security.protocol", "sasl_plaintext", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", "user", errstr);
conf->set("sasl.password", "pass", errstr);

7. 구현 체크리스트

환경 설정

  • Kafka 브로커 실행 확인
  • 토픽 사전 생성 (또는 auto.create 설정)
  • librdkafka 설치 및 CMake 연동

프로듀서

  • bootstrap.servers 설정
  • dr_cb 등록 및 poll() 주기적 호출
  • flush() 또는 outq_len()==0 대기 후 종료
  • 배치·압축 설정 (성능 요구 시)

컨슈머

  • group.id 설정
  • enable.auto.commit=false 시 수동 commit()
  • rebalance_cb 등록 (컨슈머 그룹 사용 시)
  • event_cb 등록 (에러 모니터링)

에러 처리

  • produce() 반환값 검사
  • msg->err() 검사
  • ERR__QUEUE_FULL 시 대기 또는 백프레셔

프로덕션

  • Graceful Shutdown (flush, wait_destroyed)
  • SSL/TLS, SASL (필요 시)
  • 환경 변수로 설정 외부화

정리

항목요약
librdkafkaC/C++ Kafka 클라이언트의 사실상 표준
프로듀서Conf → Producer::create → produce → poll → flush
컨슈머Conf → KafkaConsumer::create → subscribe → consume → commit
오프셋수동 커밋으로 메시지 처리 완료 후 commit
에러Connection refused, Topic not found, Queue full, Rebalance
성능linger.ms, batch.size, 압축, 프로듀서 재사용
프로덕션Graceful Shutdown, SSL/TLS, SASL, 환경 변수
핵심 원칙:
  1. poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
  2. 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
  3. 리밸런싱: rebalance_cb에서 assign/unassign 처리
  4. 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지

자주 묻는 질문 (FAQ)

Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?

A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.

Q. 정확히 한 번 전달(at-least-once vs exactly-once)은 어떻게 하나요?

A. at-least-once: 처리 후 커밋. 중복 가능성 있음. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. Kafka 고급(#52-6)에서 다룹니다.

Q. 메시지 순서가 보장되나요?

A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.

Q. librdkafka는 스레드 안전한가요?

A. 네. librdkafka API는 스레드 안전합니다. 단, 콜백 객체(dr_cb, rebalance_cb 등)는 프로듀서/컨슈머 생성 시점에 등록되어야 하며, 수명이 프로듀서/컨슈머보다 길어야 합니다. 한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머를 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다. 다음 글: Kafka 고급: 스트림 처리·트랜잭션·정확히 한 번(#52-6) 이전 글: C++ 시리즈 목차

참고 자료


관련 글

심화 부록: 구현·운영 관점

이 부록은 앞선 본문에서 다룬 주제(「C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달」)를 구현·런타임·운영 관점에서 다시 압축합니다. 도메인별 세부 구현은 글마다 다르지만, 입력 검증 → 핵심 연산 → 부작용(I/O·네트워크·동시성) → 관측의 흐름으로 장애를 나누면 원인 추적이 빨라집니다.

내부 동작과 핵심 메커니즘

flowchart TD
  A[입력·요청·이벤트] --> B[파싱·검증·디코딩]
  B --> C[핵심 연산·상태 전이]
  C --> D[부작용: I/O·네트워크·동시성]
  D --> E[결과·관측·저장]
sequenceDiagram
  participant C as 클라이언트/호출자
  participant B as 경계(런타임·게이트웨이·프로세스)
  participant D as 의존성(API·DB·큐·파일)
  C->>B: 요청/이벤트
  B->>D: 조회·쓰기·RPC
  D-->>B: 지연·부분 실패·재시도 가능
  B-->>C: 응답 또는 오류(코드·상관 ID)
  • 불변 조건(Invariant): 버퍼 경계, 프로토콜 상태, 트랜잭션 격리, FD 상한 등 단계별로 문장으로 적어 두면 디버깅 비용이 줄어듭니다.
  • 결정성: 순수 층과 시간·네트워크·스케줄에 의존하는 층을 분리해야 테스트와 장애 분석이 쉬워집니다.
  • 경계 비용: 직렬화, 인코딩, syscall 횟수, 락 경합, 할당·GC, 캐시 미스를 의심 목록에 둡니다.
  • 백프레셔: 생산자가 소비자보다 빠를 때 버퍼·큐·스트림에서 속도를 줄이는 신호를 어디에 둘지 정의합니다.

프로덕션 운영 패턴

영역운영 관점 질문
관측성요청 단위 상관 ID, 에러율·지연 p95/p99, 의존성 타임아웃·재시도가 대시보드에 보이는가
안전성입력 검증·권한·비밀·감사 로그가 코드 경로마다 일관적인가
신뢰성재시도는 멱등 연산에만 적용되는가, 서킷 브레이커·백오프·DLQ가 있는가
성능캐시·배치 크기·커넥션 풀·인덱스·백프레셔가 데이터 규모에 맞는가
배포롤백 룬북, 카나리/블루그린, 마이그레이션·피처 플래그가 문서화되어 있는가
용량피크 트래픽·디스크·FD·스레드 풀 상한을 주기적으로 검증하는가

스테이징은 데이터 양·네트워크 RTT·동시성을 프로덕션에 가깝게 맞출수록 재현율이 올라갑니다.

확장 예시: 엔드투엔드 미니 시나리오

앞선 본문 주제(「C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달」)를 배포·운영 흐름에 맞춰 옮긴 체크리스트입니다. 도메인에 맞게 단계 이름만 바꿔 적용할 수 있습니다.

  1. 입력 계약 고정: 스키마·버전·최대 페이로드·타임아웃·에러 코드를 경계에 둔다.
  2. 핵심 경로 계측: 요청 ID, 단계별 지연, 외부 호출 결과 코드를 로그·메트릭·트레이스에서 한 흐름으로 본다.
  3. 실패 주입: 의존성 타임아웃·5xx·부분 데이터·락 대기를 스테이징에서 재현한다.
  4. 호환·롤백: 설정/마이그레이션/클라이언트 버전을 되돌릴 수 있는지 확인한다.
  5. 부하 후 검증: 피크 대비 p95/p99, 에러율, 리소스 상한, 알림 임계값을 점검한다.
handle(request):
  ctx = newCorrelationId()
  validated = validateSchema(request)
  authorize(validated, ctx)
  result = domainCore(validated)
  persistOrEmit(result, idempotentKey)
  recordMetrics(ctx, latency, outcome)
  return result

문제 해결(Troubleshooting)

증상가능 원인조치
간헐적 실패레이스, 타임아웃, 외부 의존성, DNS최소 재현 스크립트, 분산 트레이스·로그 상관관계, 재시도·서킷 설정 점검
성능 저하N+1, 동기 I/O, 락 경합, 과도한 직렬화, 캐시 미스프로파일러·APM으로 핫스팟 확인 후 한 가지씩 제거
메모리 증가캐시 무제한, 구독/리스너 누수, 대용량 버퍼, 커넥션 미반납상한·TTL·힙/FD 스냅샷 비교
빌드·배포만 실패환경 변수, 권한, 플랫폼 차이, lockfileCI 로그와 로컬 diff, 런타임·이미지 버전 핀
설정 불일치프로필·시크릿·기본값, 리전스키마 검증된 설정 단일 소스와 배포 매트릭스 표준화
데이터 불일치비멱등 재시도, 부분 쓰기, 캐시 무효화 누락멱등 키·아웃박스·트랜잭션 경계 재검토

권장 순서: (1) 최소 재현 (2) 최근 변경 범위 축소 (3) 환경·의존성 차이 (4) 관측으로 가설 검증 (5) 수정 후 회귀·부하 테스트.

배포 전에는 git addgit commitgit pushnpm run deploy 순서를 권장합니다.


같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.


이 글에서 다루는 키워드 (관련 검색어)

C++, Kafka, librdkafka, 이벤트 스트리밍, 메시지큐, 실전 등으로 검색하시면 이 글이 도움이 됩니다.