C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인

C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인

이 글의 핵심

C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머·DeliveryReport·리밸런싱 콜백·트랜잭션·스트리밍 파이프라인 구현. | 시나리오 | 상황 | 문제 | 결과 | |----------|------|------|------| | 로그 수집 | 여러 서비스에서 DB에 직접 INSERT | DB 연결 폭증,.

들어가며: “C++에서 Kafka 연동이 막막해요”

실제 겪는 문제 시나리오

시나리오상황문제결과
로그 수집여러 서비스에서 DB에 직접 INSERTDB 연결 폭증, 응답 저하Kafka 비동기 발행 → 부하 분산
주문 이벤트재고·포인트·알림 등 여러 서비스 전달HTTP 순차 호출 시 지연·장애 전파Kafka 토픽 발행 → 독립 구독
Connection refusedbootstrap.servers 설정 후 연결 실패localhost/Docker/방화벽 혼동브로커·포트 점검, EventCb 모니터링
메시지 중복처리 중 크래시 후 재시작at-least-once만으로 중복 제거 불가멱등성 키 또는 트랜잭션
dr_cb 미호출dr_cb 등록했는데 실행 안 됨poll() 호출 누락주기적 poll(100) 필수
리밸런싱컨슈머 그룹 인스턴스 추가파티션 재할당 시 순서 꼬임·유실rebalance_cb에서 assign/unassign
flowchart TB
    subgraph 문제[실무 문제]
        P1[DB 부하] --> S1[Kafka 비동기]
        P2[서비스 결합] --> S2[이벤트 디커플링]
        P3[Connection refused] --> S3[설정·EventCb]
        P4[메시지 중복] --> S4[멱등성·트랜잭션]
        P5[dr_cb 미호출] --> S5[poll 필수]
        P6[리밸런싱] --> S6[rebalance_cb]
    end

이 글에서 다루는 것:

  • librdkafka 설치 및 CMake 연동
  • 완전한 프로듀서·컨슈머 예제 (콜백 포함)
  • 트랜잭션 프로듀서·정확히 한 번 전달
  • 스트리밍 파이프라인 (read → transform → produce)
  • 자주 발생하는 에러와 해결법
  • 베스트 프랙티스·프로덕션 패턴

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

목차

  1. 환경 설정
  2. 완전한 프로듀서 예제
  3. 완전한 컨슈머 예제
  4. 콜백 (DeliveryReport·리밸런싱·Event)
  5. 트랜잭션·정확히 한 번 전달
  6. 스트리밍 파이프라인
  7. 자주 발생하는 에러와 해결법
  8. 베스트 프랙티스
  9. 프로덕션 패턴
  10. 구현 체크리스트
  11. 정리

1. 환경 설정

필수 의존성

항목버전비고
C++C++14 이상C++17 권장
librdkafka2.0+vcpkg, Homebrew, 또는 소스 빌드
Apache Kafka2.8+브로커 (Docker 권장)
CMake3.16+FindPackage 지원

Kafka 브로커 실행 (Docker)

# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  apache/kafka:3.6

# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic processed-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

librdkafka 설치

# vcpkg (권장)
vcpkg install rdkafka

# macOS (Homebrew)
brew install librdkafka

# Ubuntu/Debian
sudo apt-get install librdkafka-dev

CMakeLists.txt 기본 설정

cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)

find_package(RdKafka CONFIG REQUIRED)

add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)

add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)

주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.


2. 완전한 프로듀서 예제

2.1 최소 동작 프로듀서

// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>

int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string topic = argc > 2 ? argv[2] : "app-logs";
    std::string errstr;

    // 1. 전역 설정
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);

    // 2. 프로듀서 생성
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }

    // 3. 메시지 발행
    std::string payload = "Hello, Kafka from C++!";
    RdKafka::ErrorCode err = producer->produce(
        topic,
        RdKafka::Topic::PARTITION_UA,  // 파티션 자동 할당
        RdKafka::Producer::RK_MSG_COPY,
        const_cast<char*>(payload.data()),
        payload.size(),
        "key1", 5,  // 키 (선택)
        0, nullptr);

    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
    } else {
        std::cout << "메시지 발행 완료" << std::endl;
    }

    // 4. 대기 중인 delivery report 처리 (필수!)
    while (producer->outq_len() > 0) {
        producer->poll(100);
    }

    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • PARTITION_UA: 파티션 자동 선택 (키 해시 또는 라운드로빈)
  • RK_MSG_COPY: 페이로드 복사 저장 (호출 후 버퍼 수정 가능)
  • outq_len(): 전송 대기 메시지 수. 0이 될 때까지 poll() 호출 필수

2.2 Delivery Report 콜백 (실전용)

발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다.

// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>

class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
        } else {
            std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
                      << msg.partition() << "] @ " << msg.offset() << std::endl;
        }
    }
};

int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);

    DeliveryReportCb dr_cb;
    conf->set("dr_cb", &dr_cb, errstr);

    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }

    std::string payload = "Test message with delivery report";
    producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
                      RdKafka::Producer::RK_MSG_COPY,
                      const_cast<char*>(payload.data()), payload.size(),
                      nullptr, 0, 0, nullptr);

    // poll을 호출해야 dr_cb가 실행됨
    for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
        producer->poll(100);
    }

    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다.

2.3 RAII 래퍼 클래스 (재사용 가능)

// kafka_producer.hpp
class KafkaProducer {
public:
    KafkaProducer(const std::string& brokers, const std::string& client_id = "cpp-producer");
    void produce(const std::string& topic, const std::string& key, const std::string& value);
    void flush(int timeout_ms = 10000);
    void poll(int timeout_ms = 0);
private:
    struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
        void dr_cb(RdKafka::Message& msg) override {
            if (msg.err()) std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
        }
    } dr_cb_;
    std::unique_ptr<RdKafka::Conf> conf_;
    std::unique_ptr<RdKafka::Producer> producer_;
};
// produce() 후 poll(0) 호출 필수

3. 완전한 컨슈머 예제

3.1 최소 동작 컨슈머

// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>

static volatile sig_atomic_t run = 1;

void sigterm_handler(int) { run = 0; }

int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
    std::string topic = argc > 3 ? argv[3] : "app-logs";
    std::string errstr;

    signal(SIGINT, sigterm_handler);
    signal(SIGTERM, sigterm_handler);

    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    conf->set("group.id", group_id, errstr);
    conf->set("enable.auto.commit", "false", errstr);  // 수동 커밋

    RdKafka::KafkaConsumer* consumer =
        RdKafka::KafkaConsumer::create(conf, errstr);
    delete conf;
    if (!consumer) {
        std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
        return 1;
    }

    std::vector<std::string> topics = {topic};
    RdKafka::ErrorCode err = consumer->subscribe(topics);
    if (err) {
        std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
        delete consumer;
        return 1;
    }

    std::cout << "메시지 수신 대기 중... (Ctrl+C로 종료)" << std::endl;

    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR:
                std::cout << "[" << msg->topic_name() << ":" << msg->partition()
                          << "@" << msg->offset() << "] "
                          << std::string(static_cast<const char*>(msg->payload()),
                                         msg->len())
                          << std::endl;
                consumer->commit(msg);  // 오프셋 커밋
                break;
            case RdKafka::ERR__TIMED_OUT:
                break;
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                std::cerr << "소비 에러: " << msg->errstr() << std::endl;
                run = 0;
                break;
        }
        delete msg;
    }

    consumer->close();
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • enable.auto.commit=false: 처리 완료 후 수동 commit(). 크래시 시 재처리 가능
  • consume(1000): 1초 타임아웃. 메시지 없으면 ERR__TIMED_OUT
  • commit(msg): 해당 메시지 오프셋까지 커밋

3.2 consume 루프 시퀀스

sequenceDiagram
    participant C as 컨슈머
    participant B as 브로커

    C->>B: subscribe(topics)
    B->>C: 파티션 할당
    loop consume
        C->>B: consume(1000)
        alt 메시지 있음
            B->>C: Message
            C->>C: 비즈니스 로직
            C->>B: commit(msg)
        else 타임아웃
            B->>C: ERR__TIMED_OUT
        end
    end

4. 콜백 (DeliveryReport·리밸런싱·Event)

4.1 DeliveryReportCb (프로듀서)

발행 결과(성공/실패, 파티션, 오프셋)를 비동기로 수신합니다.

class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "[DR] 실패: " << msg.errstr()
                      << " topic=" << msg.topic_name()
                      << " partition=" << msg.partition() << std::endl;
        } else {
            std::cout << "[DR] 성공: " << msg.topic_name()
                      << "[" << msg.partition() << "] @ " << msg.offset() << std::endl;
        }
    }
};

// 설정
DeliveryReportCb dr_cb;
conf->set("dr_cb", &dr_cb, errstr);

4.2 RebalanceCb (컨슈머 그룹)

파티션 재할당 시 호출됩니다. ERR__ASSIGN_PARTITIONSassign, ERR__REVOKE_PARTITIONSunassign 처리.

class RebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb(RdKafka::KafkaConsumer* consumer,
                     RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*>& partitions) override {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
            consumer->assign(partitions);
            std::cout << "[리밸런싱] 파티션 할당: ";
            for (auto* p : partitions)
                std::cout << p->topic() << "[" << p->partition() << "] ";
            std::cout << std::endl;
        } else {
            consumer->unassign();
            std::cout << "[리밸런싱] 파티션 해제" << std::endl;
        }
    }
};

// 설정
RebalanceCb rebalance_cb;
conf->set("rebalance_cb", &rebalance_cb, errstr);

4.3 EventCb (에러·통계)

연결 에러, throttle, 통계 이벤트를 수신합니다.

class EventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal()) {
                    std::cerr << "치명적 에러: " << event.str() << std::endl;
                } else {
                    std::cerr << "에러: " << event.str() << std::endl;
                }
                break;
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "Throttled: " << event.throttle_time() << "ms by "
                          << event.broker_name() << std::endl;
                break;
            case RdKafka::Event::EVENT_STATS:
                // event.str()에 JSON 통계
                break;
            default:
                break;
        }
    }
};

// 설정
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);

콜백 등록: conf->set("rebalance_cb", &rebalance_cb, errstr), conf->set("event_cb", &event_cb, errstr)로 등록. 콜백 객체는 컨슈머 수명보다 길어야 합니다.


5. 트랜잭션·정확히 한 번 전달

5.1 전달 시맨틱 비교

시맨틱설명중복유실
at-most-once커밋 후 처리없음가능
at-least-once처리 후 커밋가능없음
exactly-once멱등성·트랜잭션없음없음

5.2 프로듀서 멱등성 (enable.idempotence)

conf->set("enable.idempotence", "true", errstr);
// retries, acks, max.in.flight.requests.per.connection가 자동으로 안전한 값으로 설정됨

5.3 트랜잭션 프로듀서

여러 토픽에 원자적으로 쓰기, consume-transform-produce를 원자적으로 처리할 때 사용합니다.

// transactional_producer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>

int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    conf->set("transactional.id", "my-txn-id", errstr);  // 유일값 필수

    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }

    // 1. 트랜잭션 초기화
    RdKafka::ErrorCode err = producer->init_transactions(5000);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "init_transactions 실패: " << RdKafka::err2str(err) << std::endl;
        delete producer;
        return 1;
    }

    // 2. 트랜잭션 시작
    err = producer->begin_transaction();
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "begin_transaction 실패: " << RdKafka::err2str(err) << std::endl;
        delete producer;
        return 1;
    }

    // 3. 여러 토픽에 발행
    std::string msg1 = "order-123";
    std::string msg2 = "event-order-123";
    producer->produce("orders", RdKafka::Topic::PARTITION_UA,
                     RdKafka::Producer::RK_MSG_COPY,
                     const_cast<char*>(msg1.data()), msg1.size(),
                     nullptr, 0, 0, nullptr);
    producer->produce("order-events", RdKafka::Topic::PARTITION_UA,
                     RdKafka::Producer::RK_MSG_COPY,
                     const_cast<char*>(msg2.data()), msg2.size(),
                     nullptr, 0, 0, nullptr);

    // 4. 트랜잭션 커밋
    err = producer->commit_transaction(5000);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "commit_transaction 실패: " << RdKafka::err2str(err) << std::endl;
        producer->abort_transaction(5000);
    }

    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: transactional.id는 클러스터 내에서 유일해야 합니다.

5.4 컨슈머 read_committed

트랜잭션으로 커밋된 메시지만 읽으려면:

conf->set("isolation.level", "read_committed", errstr);

5.5 멱등성 키로 중복 제거

// 멱등성 캐시 (Redis/DB 권장)
class IdempotencyCache {
public:
    bool try_acquire(const std::string& key) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (seen_.count(key)) return false;
        seen_.insert(key);
        if (seen_.size() > 100000) seen_.erase(seen_.begin());
        return true;
    }
private:
    std::unordered_set<std::string> seen_;
    std::mutex mutex_;
};

// 메시지 처리 시
std::string idempotency_key = msg->key() ?
    std::string(msg->key()->data(), msg->key()->size()) :
    std::string(msg->topic_name()) + "-" + std::to_string(msg->partition()) +
    "-" + std::to_string(msg->offset());

if (idempotency_cache.try_acquire(idempotency_key)) {
    do_business_logic(msg);
}
consumer->commit(msg);

6. 스트리밍 파이프라인

6.1 topic-a → transform → topic-b

C++에서는 컨슈머로 읽고 → 변환 → 프로듀서로 쓰는 패턴으로 스트림 처리를 구현합니다.

sequenceDiagram
    participant C as 컨슈머
    participant App as C++ 앱
    participant P as 프로듀서

    C->>App: consume(topic-a)
    App->>App: transform/filter/aggregate
    App->>P: produce(topic-b)
    App->>C: commit (발행 성공 후)

6.2 완전한 스트리밍 파이프라인 예제

// stream_pipeline.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
#include <csignal>

static volatile sig_atomic_t run = 1;
void sig_handler(int) { run = 0; }

int main() {
    std::string errstr;
    std::string brokers = "localhost:9092";

    RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    c_conf->set("bootstrap.servers", brokers, errstr);
    c_conf->set("group.id", "stream-pipeline-group", errstr);
    c_conf->set("enable.auto.commit", "false", errstr);

    RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
    delete c_conf;

    RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    p_conf->set("bootstrap.servers", brokers, errstr);
    RdKafka::Producer* producer = RdKafka::Producer::create(p_conf, errstr);
    delete p_conf;

    consumer->subscribe({"raw-logs"});
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);

    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR: {
                std::string payload(static_cast<const char*>(msg->payload()),
                                    msg->len());
                std::string transformed;
                for (char c : payload) {
                    transformed += (c >= 'a' && c <= 'z') ? (c - 32) : c;
                }
                RdKafka::ErrorCode err = producer->produce(
                    "processed-logs",
                    RdKafka::Topic::PARTITION_UA,
                    RdKafka::Producer::RK_MSG_COPY,
                    const_cast<char*>(transformed.data()),
                    transformed.size(),
                    msg->key() ? msg->key()->data() : nullptr,
                    msg->key() ? msg->key()->size() : 0,
                    0, nullptr);
                if (err == RdKafka::ERR_NO_ERROR) {
                    consumer->commit(msg);
                }
                break;
            }
            case RdKafka::ERR__TIMED_OUT:
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                run = 0;
                break;
        }
        delete msg;
        producer->poll(0);
    }

    producer->flush(5000);
    consumer->close();
    delete producer;
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

핵심: 발행 성공 후에만 commit(). at-least-once 보장.

6.3 필터링 파이프라인

// 에러 로그만 필터링
bool should_forward(const std::string& payload) {
    return payload.find("ERROR") != std::string::npos ||
           payload.find("FATAL") != std::string::npos;
}

if (msg->err() == RdKafka::ERR_NO_ERROR) {
    std::string payload(static_cast<const char*>(msg->payload()), msg->len());
    if (should_forward(payload)) {
        producer->produce("error-logs", RdKafka::Topic::PARTITION_UA,
                         RdKafka::Producer::RK_MSG_COPY,
                         const_cast<char*>(payload.data()), payload.size(),
                         nullptr, 0, 0, nullptr);
    }
    consumer->commit(msg);
}

7. 자주 발생하는 에러와 해결법

에러 1: “Connection refused”

원인: 브로커 미실행, 주소/포트 오류, Docker/방화벽.

해결:

docker ps | grep kafka  # 브로커 실행 확인
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);

에러 2: “Topic ‘xxx’ not present in metadata”

해결: kafka-topics --create로 토픽 사전 생성, metadata.max.age.ms=5000.

에러 3: “Message size too large”

해결: conf->set("message.max.bytes", "10485760", errstr); (10MB).

에러 4: “Commit failed: Local: No offset stored”

원인: enable.auto.commit=true + 수동 commit, 또는 리밸런싱 중. 해결: enable.auto.commit=false, rebalance_cb 구현.

에러 5: 메시지 중복 처리

해결:

// ✅ 처리 완료 후 커밋
doBusinessLogic(msg);
consumer->commit(msg);

// ❌ 나쁜 예: 처리 전 커밋 → 크래시 시 재처리됨
consumer->commit(msg);
doBusinessLogic(msg);

에러 6: “Queue full” (ERR__QUEUE_FULL)

해결: queue.buffering.max.messages, queue.buffering.max.kbytes 증가. 백프레셔: while (producer->outq_len() > 10000) producer->poll(100);

에러 7: DeliveryReportCb가 호출되지 않음

원인: poll() 미호출. 해결: while (running) { producer->produce(...); producer->poll(100); } — poll 필수.

에러 8: “Transaction coordinator not found”

원인: 브로커가 트랜잭션 미지원. 해결: init_transactions() 실패 시 멱등성 모드로 fallback.

에러 9: “Consumer group is rebalancing”

해결: rebalance_cb에서 ERR__REVOKE_PARTITIONSwait_for_pending_processing()unassign, ERR__ASSIGN_PARTITIONSassign.


8. 베스트 프랙티스

항목권장
poll()프로듀서: produce()poll(100) 필수. 컨슈머: consume() 내부적으로 poll
커밋처리 완료 후 consumer->commit(msg)
프로듀서매 요청마다 생성 금지 — 풀/싱글톤 재사용
메시지 키같은 키 → 같은 파티션 (순서 보장): key = "user:" + std::to_string(user_id)
에러 처리produce() 반환값 검사, ERR__QUEUE_FULL 시 백프레셔

9. 프로덕션 패턴

9.1 Graceful Shutdown

static std::atomic<bool> running{true};
void sig_handler(int) { running = false; }

int main() {
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);
    while (running) {
        producer->produce(...);
        producer->poll(100);
    }
    producer->flush(10000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

9.2 헬스 체크·환경 변수·SSL

  • 헬스 체크: producer->metadata(true, nullptr, &meta, 5000)로 브로커 연결 확인
  • 환경 변수: std::getenv("KAFKA_BROKERS"), KAFKA_GROUP_ID로 설정 외부화
  • SSL/TLS + SASL: security.protocol=sasl_ssl, sasl.mechanisms=PLAIN, ssl.ca.location
  • 성능: linger.ms=5, batch.size=16384, compression.type=lz4

10. 구현 체크리스트

환경 설정

  • Kafka 브로커 실행 확인
  • 토픽 사전 생성
  • librdkafka 설치 및 CMake 연동

프로듀서

  • bootstrap.servers 설정
  • dr_cb 등록 및 poll() 주기적 호출
  • flush() 또는 outq_len()==0 대기 후 종료
  • 배치·압축 설정 (성능 요구 시)

컨슈머

  • group.id 설정
  • enable.auto.commit=false 시 수동 commit()
  • rebalance_cb 등록 (컨슈머 그룹 사용 시)
  • event_cb 등록 (에러 모니터링)

에러 처리

  • produce() 반환값 검사
  • msg->err() 검사
  • ERR__QUEUE_FULL 시 백프레셔

프로덕션

  • Graceful Shutdown (flush, wait_destroyed)
  • SSL/TLS, SASL (필요 시)
  • 환경 변수로 설정 외부화

11. 정리

항목요약
librdkafkaC/C++ Kafka 클라이언트의 사실상 표준
프로듀서Conf → Producer::create → produce → poll → flush
컨슈머Conf → KafkaConsumer::create → subscribe → consume → commit
콜백dr_cb, rebalance_cb, event_cb
트랜잭션transactional.id, init_transactions, begin/commit/abort
스트리밍consume → transform → produce, 발행 후 커밋
에러Connection refused, Topic not found, Queue full, Rebalance
프로덕션Graceful Shutdown, SSL/TLS, SASL, 환경 변수

핵심 원칙:

  1. poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
  2. 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
  3. 리밸런싱: rebalance_cb에서 assign/unassign 처리
  4. 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지

자주 묻는 질문 (FAQ)

Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?

A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.

Q. 정확히 한 번 전달은 어떻게 하나요?

A. at-least-once: 처리 후 커밋. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. 멱등성 키로 비즈니스 레벨 중복 제거도 가능합니다.

Q. 메시지 순서가 보장되나요?

A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.

Q. librdkafka는 스레드 안전한가요?

A. 네. librdkafka API는 스레드 안전합니다. 콜백 객체는 프로듀서/컨슈머 수명보다 길어야 합니다.

한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인을 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다.

다음 글: C++ 시리즈 목차

이전 글: Kafka 완벽 가이드(#52-5)


참고 자료


관련 글

  • C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달
  • C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]
  • C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c
... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3