C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달

C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달

이 글의 핵심

C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머 그룹·오프셋 관리 구현. Connection timeout·메시지 유실·리밸런싱 등 흔한 에러 해결, 배치·압축 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.

들어가며: C++에서 Kafka를 왜 쓰나요?

실제 겪는 문제 시나리오

시나리오 1: 로그 수집 시 DB 부하 폭증
여러 마이크로서비스에서 로그를 중앙 DB에 직접 INSERT합니다. 트래픽이 늘면 DB 연결 수가 폭증하고, 로그 쓰기 지연으로 애플리케이션 응답이 느려집니다. “로그 때문에 메인 서비스가 죽어요.”

시나리오 2: 주문 이벤트를 여러 서비스에 전달
주문 완료 시 재고 차감, 포인트 적립, 알림 발송 등 여러 서비스에 이벤트를 보내야 합니다. HTTP로 순차 호출하면 지연이 누적되고, 한 서비스 장애 시 전체가 멈춥니다. “어떤 서비스가 느리면 전체 주문 처리가 막혀요.”

시나리오 3: 실시간 분석 파이프라인
클릭 스트림, 센서 데이터를 실시간으로 수집해 분석 엔진에 전달해야 합니다. 폴링 방식은 지연이 크고, 직접 DB 조회는 부하가 큽니다. “1초 단위로 집계해야 하는데 10초씩 밀려요.”

시나리오 4: librdkafka 도입 후 “Connection refused” 에러
Kafka 브로커 주소를 설정했는데 연결이 안 됩니다. localhost:9092 vs 127.0.0.1:9092, 방화벽, Docker 네트워크 등 확인할 것이 많아 막막합니다.

시나리오 5: 컨슈머 재시작 시 메시지 중복 처리
컨슈머가 메시지 처리 중 크래시했습니다. 재시작 후 같은 메시지를 다시 읽어 중복 처리됩니다. “주문이 두 번 차감돼요.”

시나리오 6: 파티션 리밸런싱 시 처리 중단
컨슈머 그룹에 새 인스턴스를 추가했더니 기존 컨슈머가 담당하던 파티션이 재할당됩니다. 리밸런싱 중 메시지 처리 순서가 꼬이거나 유실될 수 있습니다.

Kafka로 해결:

  • 이벤트 스트리밍: 프로듀서가 메시지를 토픽에 발행, 컨슈머가 구독. DB에 직접 쓰지 않아 메인 서비스 부하 감소
  • 비동기·디커플링: 프로듀서는 발행만 하고, 여러 컨슈머가 각자 처리. 한 서비스 장애가 다른 서비스에 영향 없음
  • 오프셋 관리: 컨슈머 그룹이 읽은 위치를 커밋해, 재시작 시 이어서 처리
  • 파티션·스케일아웃: 파티션 수만큼 컨슈머를 늘려 처리량 확장
flowchart LR
  subgraph Producer["프로듀서 (C++)"]
    P1[앱 로그]
    P2[주문 이벤트]
    P3[센서 데이터]
  end

  subgraph Kafka[Apache Kafka]
    T1[logs 토픽]
    T2[orders 토픽]
    T3[events 토픽]
  end

  subgraph Consumer["컨슈머 (C++)"]
    C1[로그 저장]
    C2[재고 차감]
    C3[실시간 분석]
  end

  P1 --> T1
  P2 --> T2
  P3 --> T3
  T1 --> C1
  T2 --> C2
  T3 --> C3

Kafka 프로듀서-컨슈머 흐름

sequenceDiagram
  participant P as 프로듀서
  participant B as 브로커
  participant C as 컨슈머

  P->>B: produce(topic, key, value)
  B->>B: 파티션에 저장
  B->>P: dr_cb (delivery report)

  C->>B: subscribe(topic)
  B->>C: 파티션 할당
  loop consume
    C->>B: poll() → 메시지 수신
    C->>C: 비즈니스 로직 처리
    C->>B: commit() 오프셋
  end

RabbitMQ vs Kafka 비교

항목RabbitMQKafka
모델큐, Exchange토픽, 파티션
메시지 보존소비 후 삭제 (기본)보존 기간 동안 유지
재처리별도 구현오프셋 이동으로 가능
처리량수만 msg/s수백만 msg/s
C++ 클라이언트rabbitmq-clibrdkafka

이 글에서 다루는 것:

  • librdkafka 설치 및 CMake 연동
  • 완전한 프로듀서·컨슈머 C++ 예제
  • 자주 발생하는 에러와 해결법
  • 성능 최적화 (배치, 압축, 파티션)
  • 프로덕션 배포 패턴

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

목차

  1. 환경 설정
  2. 완전한 프로듀서 예제
  3. 완전한 컨슈머 예제
  4. 자주 발생하는 에러와 해결법
  5. 성능 최적화
  6. 프로덕션 패턴
  7. 구현 체크리스트

1. 환경 설정

필수 의존성

항목버전비고
C++C++14 이상C++17 권장
librdkafka2.0+vcpkg, Homebrew, 또는 소스 빌드
Apache Kafka2.8+브로커 (Docker 권장)
CMake3.16+FindPackage 지원

Kafka 브로커 실행 (Docker)

# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  apache/kafka:3.6

# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

librdkafka 설치

# vcpkg (권장)
vcpkg install rdkafka

# macOS (Homebrew)
brew install librdkafka

# Ubuntu/Debian
sudo apt-get install librdkafka-dev

CMakeLists.txt 기본 설정

cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)

find_package(RdKafka CONFIG REQUIRED)

add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)

add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)

주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.


2. 완전한 프로듀서 예제

2.1 최소 동작 프로듀서

// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>

int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string topic = argc > 2 ? argv[2] : "app-logs";
    std::string errstr;

    // 1. 전역 설정
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);

    // 2. 프로듀서 생성
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }

    // 3. 메시지 발행
    std::string payload = "Hello, Kafka from C++!";
    RdKafka::ErrorCode err = producer->produce(
        topic,
        RdKafka::Topic::PARTITION_UA,  // 파티션 자동 할당
        RdKafka::Producer::RK_MSG_COPY,
        const_cast<char*>(payload.data()),
        payload.size(),
        "key1", 5,  // 키 (선택)
        0, nullptr);

    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
    } else {
        std::cout << "메시지 발행 완료" << std::endl;
    }

    // 4. 대기 중인 delivery report 처리
    while (producer->outq_len() > 0) {
        producer->poll(100);
    }

    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • PARTITION_UA: 파티션을 자동 선택 (키 해시 또는 라운드로빈)
  • RK_MSG_COPY: 페이로드를 복사해 저장 (호출 후 버퍼 수정 가능)
  • outq_len(): 전송 대기 중인 메시지 수. 0이 될 때까지 poll() 호출 필요

2.2 Delivery Report 콜백 (실전용)

발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다.

// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>

class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
        } else {
            std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
                      << msg.partition() << "] @ " << msg.offset() << std::endl;
        }
    }
};

int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);

    DeliveryReportCb dr_cb;
    conf->set("dr_cb", &dr_cb, errstr);

    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }

    std::string payload = "Test message with delivery report";
    producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
                      RdKafka::Producer::RK_MSG_COPY,
                      const_cast<char*>(payload.data()), payload.size(),
                      nullptr, 0, 0, nullptr);

    // poll을 호출해야 dr_cb가 실행됨
    for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
        producer->poll(100);
    }

    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다. 프로듀서 스레드에서 루프로 poll(100)을 호출하는 것이 일반적입니다.

2.3 RAII 래퍼 클래스 (재사용 가능)

// kafka_producer.hpp
#pragma once

#include <librdkafka/rdkafkacpp.h>
#include <memory>
#include <stdexcept>
#include <string>

class KafkaProducer {
public:
    KafkaProducer(const std::string& brokers,
                  const std::string& client_id = "cpp-producer") {
        std::string errstr;
        conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
        conf_->set("bootstrap.servers", brokers, errstr);
        conf_->set("client.id", client_id, errstr);
        conf_->set("dr_cb", &dr_cb_, errstr);

        producer_.reset(RdKafka::Producer::create(conf_.get(), errstr));
        if (!producer_) {
            throw std::runtime_error("프로듀서 생성 실패: " + errstr);
        }
    }

    void produce(const std::string& topic, const std::string& key,
                 const std::string& value) {
        RdKafka::ErrorCode err = producer_->produce(
            topic, RdKafka::Topic::PARTITION_UA,
            RdKafka::Producer::RK_MSG_COPY,
            const_cast<char*>(value.data()), value.size(),
            key.empty() ? nullptr : key.data(), key.size(),
            0, nullptr);

        if (err != RdKafka::ERR_NO_ERROR) {
            throw std::runtime_error("발행 실패: " + RdKafka::err2str(err));
        }
        producer_->poll(0);
    }

    void flush(int timeout_ms = 10000) {
        producer_->flush(timeout_ms);
    }

    void poll(int timeout_ms = 0) {
        producer_->poll(timeout_ms);
    }

private:
    struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
        void dr_cb(RdKafka::Message& msg) override {
            if (msg.err()) {
                std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
            }
        }
    } dr_cb_;
    std::unique_ptr<RdKafka::Conf> conf_;
    std::unique_ptr<RdKafka::Producer> producer_;
};

3. 완전한 컨슈머 예제

3.1 최소 동작 컨슈머

// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>

static volatile sig_atomic_t run = 1;

void sigterm_handler(int) { run = 0; }

int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
    std::string topic = argc > 3 ? argv[3] : "app-logs";
    std::string errstr;

    signal(SIGINT, sigterm_handler);
    signal(SIGTERM, sigterm_handler);

    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    conf->set("group.id", group_id, errstr);
    conf->set("enable.auto.commit", "false", errstr);  // 수동 커밋

    RdKafka::KafkaConsumer* consumer =
        RdKafka::KafkaConsumer::create(conf, errstr);
    delete conf;
    if (!consumer) {
        std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
        return 1;
    }

    std::vector<std::string> topics = {topic};
    RdKafka::ErrorCode err = consumer->subscribe(topics);
    if (err) {
        std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
        delete consumer;
        return 1;
    }

    std::cout << "메시지 수신 대기 중... (Ctrl+C로 종료)" << std::endl;

    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR:
                std::cout << "[" << msg->topic_name() << ":" << msg->partition()
                          << "@" << msg->offset() << "] "
                          << std::string(static_cast<const char*>(msg->payload()),
                                         msg->len())
                          << std::endl;
                consumer->commit(msg);  // 오프셋 커밋
                break;
            case RdKafka::ERR__TIMED_OUT:
                break;
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                std::cerr << "소비 에러: " << msg->errstr() << std::endl;
                run = 0;
                break;
        }
        delete msg;
    }

    consumer->close();
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • enable.auto.commit=false: 메시지 처리 완료 후 수동으로 commit() 호출. 처리 중 크래시 시 재처리 가능
  • consume(1000): 1초 타임아웃. 메시지 없으면 ERR__TIMED_OUT 반환
  • commit(msg): 해당 메시지의 오프셋까지 커밋

3.2 리밸런싱 콜백 (컨슈머 그룹)

파티션 재할당 시 리밸런싱 콜백에서 assign/unassign를 처리해야 합니다.

// consumer_rebalance.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <signal.h>

static volatile sig_atomic_t run = 1;

class RebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb(RdKafka::KafkaConsumer* consumer,
                     RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*>& partitions) override {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
            consumer->assign(partitions);
            std::cerr << "[리밸런싱] 파티션 할당: ";
            for (auto* p : partitions)
                std::cerr << p->topic() << "[" << p->partition() << "] ";
            std::cerr << std::endl;
        } else {
            consumer->unassign();
            std::cerr << "[리밸런싱] 파티션 해제" << std::endl;
        }
    }
};

int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    conf->set("group.id", "cpp-group", errstr);
    conf->set("enable.auto.commit", "false", errstr);

    RebalanceCb rebalance_cb;
    conf->set("rebalance_cb", &rebalance_cb, errstr);

    RdKafka::KafkaConsumer* consumer =
        RdKafka::KafkaConsumer::create(conf, errstr);
    delete conf;

    consumer->subscribe({"app-logs"});

    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        if (msg->err() == RdKafka::ERR_NO_ERROR) {
            std::cout << std::string(static_cast<const char*>(msg->payload()),
                                     msg->len()) << std::endl;
            consumer->commit(msg);
        }
        delete msg;
    }

    consumer->close();
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

3.3 이벤트 콜백 (에러·통계)

// EventCb: 연결 에러, throttle 등 수신
class EventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal()) {
                    std::cerr << "치명적 에러: " << event.str() << std::endl;
                } else {
                    std::cerr << "에러: " << event.str() << std::endl;
                }
                break;
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "Throttled: " << event.throttle_time() << "ms by "
                          << event.broker_name() << std::endl;
                break;
            default:
                break;
        }
    }
};

// 설정 시
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);

4. 자주 발생하는 에러와 해결법

에러 1: “Connection refused” / “Broker: Connection refused”

증상: 프로듀서/컨슈머 생성은 되지만 메시지 발행·소비 시 연결 실패.

원인:

  • Kafka 브로커가 실행 중이 아님
  • 잘못된 주소/포트
  • Docker 환경에서 localhost vs 127.0.0.1 혼동
  • 방화벽 차단

해결법:

# 브로커 실행 확인
docker ps | grep kafka

# Docker 내부에서 접속 시
# bootstrap.servers = localhost:9092 (호스트에서)
# bootstrap.servers = host.docker.internal:9092 (Docker 컨테이너에서 호스트 접속)
// ✅ 여러 브로커 지정 (고가용성)
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);

// ✅ 연결 타임아웃 설정
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);

에러 2: “Topic ‘xxx’ not present in metadata”

증상: produce() 호출 시 ERR__UNKNOWN_TOPIC 또는 토픽을 찾을 수 없음.

원인: 토픽이 아직 생성되지 않았거나, auto.create.topics.enable=true인데 브로커가 아직 메타데이터를 갱신하지 않음.

해결법:

# 토픽 사전 생성
kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092
// ✅ 토픽 자동 생성 대기 (metadata.max.age.ms)
conf->set("metadata.max.age.ms", "5000", errstr);

에러 3: “Message delivery failed: Broker: Message size too large”

증상: 큰 메시지 발행 시 실패.

원인: 브로커의 message.max.bytes 설정보다 메시지가 큼.

해결법:

// 프로듀서: 메시지 크기 제한
conf->set("message.max.bytes", "10485760", errstr);  // 10MB

// 브로커 설정: message.max.bytes, replica.fetch.max.bytes 동일하게

에러 4: “Commit failed: Local: No offset stored”

증상: consumer->commit(msg) 호출 시 에러.

원인: enable.auto.commit=true이면서 수동 commit()을 호출하거나, 아직 커밋할 오프셋이 없음.

해결법:

// ✅ 수동 커밋 사용 시
conf->set("enable.auto.commit", "false", errstr);

// ✅ commitSync 호출
consumer->commit(msg);
// 또는
consumer->commitSync();  // 현재 오프셋까지 커밋

에러 5: 메시지 중복 처리

증상: 컨슈머 재시작 후 같은 메시지를 다시 처리.

원인: 메시지 처리 완료 전에 커밋하거나, 처리 후 커밋 전에 크래시.

해결법:

// ✅ 처리 완료 후 커밋 (at-least-once)
void process_msg(RdKafka::Message* msg) {
    // 1. 비즈니스 로직 처리
    doBusinessLogic(msg);
    // 2. 처리 성공 후 커밋
    consumer->commit(msg);
}

// ❌ 나쁜 예: 처리 전 커밋
consumer->commit(msg);  // 커밋
doBusinessLogic(msg);   // 처리 중 크래시 시 재처리됨

정확히 한 번 전달이 필요하면 프로듀서의 enable.idempotence=true와 트랜잭션을 사용합니다. Kafka 고급(#52-6)에서 다룹니다.

에러 6: “Out of memory” / “Queue full”

증상: 프로듀서가 produce() 호출 시 ERR__QUEUE_FULL 반환.

원인: 브로커 전송 속도보다 발행 속도가 빠름. 내부 큐가 가득 참.

해결법:

// ✅ 큐 크기 증가
conf->set("queue.buffering.max.messages", "1000000", errstr);
conf->set("queue.buffering.max.kbytes", "1048576", errstr);  // 1GB

// ✅ 발행 시 블로킹 대기
while (producer->outq_len() > 10000) {
    producer->poll(100);
}
producer->produce(...);

에러 7: “Broker: Not leader for partition”

증상: 리더가 변경된 파티션에 발행 시도.

원인: 브로커 장애·리밸런싱으로 리더가 변경됨. librdkafka는 자동으로 새 리더로 재시도합니다.

해결법:

// ✅ 재시도 설정 (기본값으로 충분)
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "100", errstr);

에러 8: “Consumer group is rebalancing”

증상: 컨슈머가 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS.

원인: 컨슈머 그룹에 인스턴스 추가/제거로 리밸런싱 중.

해결법:

// ✅ 리밸런싱 콜백에서 assign 후 재처리
// rebalance_cb에서 파티션 해제 시 처리 중인 메시지 정리
// assign 후 새 파티션에서 consume 재개

에러 9: DeliveryReportCb가 호출되지 않음

증상: dr_cb가 한 번도 실행되지 않음.

원인: poll()을 호출하지 않음. delivery report는 poll() 호출 시 처리됩니다.

해결법:

// ✅ 프로듀서 스레드에서 주기적 poll
void producer_loop() {
    while (running) {
        producer->produce(...);
        producer->poll(100);  // 필수!
    }
    producer->flush(10000);
}

5. 성능 최적화

5.1 배치 발행 (Linger)

메시지를 모아서 한 번에 전송하면 RTT를 줄일 수 있습니다.

// 배치 대기 시간 (ms)
conf->set("linger.ms", "5", errstr);  // 5ms 대기 후 전송

// 배치 크기 (bytes)
conf->set("batch.size", "16384", errstr);  // 16KB

주의: linger.ms가 크면 지연이 증가합니다. 처리량 vs 지연 트레이드오프를 고려하세요.

5.2 압축

네트워크 대역폭을 줄입니다.

conf->set("compression.type", "gzip", errstr);
// none, gzip, snappy, lz4, zstd
압축속도압축률CPU
none빠름없음낮음
snappy빠름중간중간
lz4빠름좋음중간
gzip느림높음높음
zstd중간매우 높음중간

5.3 프로듀서 풀링

// ❌ 나쁜 예: 매 요청마다 새 프로듀서
void handle_request() {
    auto producer = create_producer();
    producer->produce(...);
    delete producer;
}

// ✅ 좋은 예: 프로듀서 재사용 (싱글톤 또는 풀)
static KafkaProducer* get_producer() {
    static KafkaProducer producer("localhost:9092");
    return &producer;
}

5.4 파티션 수와 컨슈머 수

  • 파티션 수 ≥ 컨슈머 수일 때만 모든 컨슈머가 활용됩니다.
  • 파티션 수를 늘리면 처리량은 늘지만, 순서 보장은 파티션 내에서만입니다.
// 키가 같은 메시지는 같은 파티션으로
producer->produce(topic, partition, ..., key, key_len, ...);
// partition = PARTITION_UA면 키 해시로 파티션 선택

5.5 성능 비교 (참고)

설정초당 메시지 (대략)지연
linger.ms=0, batch 작음5만1ms
linger.ms=5, batch 16KB50만5ms
linger.ms=5, 압축30만5ms

6. 프로덕션 패턴

6.1 Graceful Shutdown

static std::atomic<bool> running{true};

void sig_handler(int) {
    running = false;
}

int main() {
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);

    // ... 프로듀서 생성

    while (running) {
        producer->produce(...);
        producer->poll(100);
    }

    // 종료 전 모든 메시지 전송 완료 대기
    producer->flush(10000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

6.2 메시지 키 설계

// 파티션 내 순서 보장: 같은 키 → 같은 파티션
// 예: 사용자별 이벤트 순서
std::string key = "user:" + std::to_string(user_id);
producer->produce(topic, RdKafka::Topic::PARTITION_UA, ..., key.data(), key.size(), ...);

6.3 헬스 체크

// 프로듀서: 메타데이터 조회로 브로커 연결 확인
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = producer->metadata(true, nullptr, &metadata, 5000);
bool healthy = (err == RdKafka::ERR_NO_ERROR && metadata);
if (metadata) delete metadata;

6.4 환경 변수 기반 설정

struct KafkaConfig {
    std::string brokers = "localhost:9092";
    std::string group_id = "cpp-consumer";
    int linger_ms = 5;
};

KafkaConfig load_config() {
    KafkaConfig c;
    if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
    if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
    return c;
}

6.5 로깅 (debug)

// 문제 디버깅 시
conf->set("debug", "broker,topic,msg", errstr);
// broker, topic, msg, protocol, cgrp 등

6.6 SSL/TLS (프로덕션)

conf->set("security.protocol", "ssl", errstr);
conf->set("ssl.ca.location", "/path/to/ca-cert", errstr);
conf->set("ssl.certificate.location", "/path/to/client-cert", errstr);
conf->set("ssl.key.location", "/path/to/client-key", errstr);

6.7 SASL 인증

conf->set("security.protocol", "sasl_plaintext", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", "user", errstr);
conf->set("sasl.password", "pass", errstr);

7. 구현 체크리스트

환경 설정

  • Kafka 브로커 실행 확인
  • 토픽 사전 생성 (또는 auto.create 설정)
  • librdkafka 설치 및 CMake 연동

프로듀서

  • bootstrap.servers 설정
  • dr_cb 등록 및 poll() 주기적 호출
  • flush() 또는 outq_len()==0 대기 후 종료
  • 배치·압축 설정 (성능 요구 시)

컨슈머

  • group.id 설정
  • enable.auto.commit=false 시 수동 commit()
  • rebalance_cb 등록 (컨슈머 그룹 사용 시)
  • event_cb 등록 (에러 모니터링)

에러 처리

  • produce() 반환값 검사
  • msg->err() 검사
  • ERR__QUEUE_FULL 시 대기 또는 백프레셔

프로덕션

  • Graceful Shutdown (flush, wait_destroyed)
  • SSL/TLS, SASL (필요 시)
  • 환경 변수로 설정 외부화

정리

항목요약
librdkafkaC/C++ Kafka 클라이언트의 사실상 표준
프로듀서Conf → Producer::create → produce → poll → flush
컨슈머Conf → KafkaConsumer::create → subscribe → consume → commit
오프셋수동 커밋으로 메시지 처리 완료 후 commit
에러Connection refused, Topic not found, Queue full, Rebalance
성능linger.ms, batch.size, 압축, 프로듀서 재사용
프로덕션Graceful Shutdown, SSL/TLS, SASL, 환경 변수

핵심 원칙:

  1. poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
  2. 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
  3. 리밸런싱: rebalance_cb에서 assign/unassign 처리
  4. 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지

자주 묻는 질문 (FAQ)

Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?

A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.

Q. 정확히 한 번 전달(at-least-once vs exactly-once)은 어떻게 하나요?

A. at-least-once: 처리 후 커밋. 중복 가능성 있음. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. Kafka 고급(#52-6)에서 다룹니다.

Q. 메시지 순서가 보장되나요?

A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.

Q. librdkafka는 스레드 안전한가요?

A. 네. librdkafka API는 스레드 안전합니다. 단, 콜백 객체(dr_cb, rebalance_cb 등)는 프로듀서/컨슈머 생성 시점에 등록되어야 하며, 수명이 프로듀서/컨슈머보다 길어야 합니다.

한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머를 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다.

다음 글: Kafka 고급: 스트림 처리·트랜잭션·정확히 한 번(#52-6)

이전 글: C++ 시리즈 목차


참고 자료


관련 글

  • C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인
  • C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]
  • C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c
  • C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드
... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3