본문으로 건너뛰기
Previous
Next
C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인

C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인

C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인

이 글의 핵심

C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머·DeliveryReport·리밸런싱 콜백·트랜잭션·스트리밍 파이프라인 구현. | 시나리오 | 상황 | 문제 | 결과 | |----------|------|------|------| | 로그 수집 | 여러 서비스에서 DB에 직접 INSERT | DB 연결 폭증,.

들어가며: “C++에서 Kafka 연동이 막막해요”

실제 겪는 문제 시나리오

시나리오상황문제결과
로그 수집여러 서비스에서 DB에 직접 INSERTDB 연결 폭증, 응답 저하Kafka 비동기 발행 → 부하 분산
주문 이벤트재고·포인트·알림 등 여러 서비스 전달HTTP 순차 호출 시 지연·장애 전파Kafka 토픽 발행 → 독립 구독
Connection refusedbootstrap.servers 설정 후 연결 실패localhost/Docker/방화벽 혼동브로커·포트 점검, EventCb 모니터링
메시지 중복처리 중 크래시 후 재시작at-least-once만으로 중복 제거 불가멱등성 키 또는 트랜잭션
dr_cb 미호출dr_cb 등록했는데 실행 안 됨poll() 호출 누락주기적 poll(100) 필수
리밸런싱컨슈머 그룹 인스턴스 추가파티션 재할당 시 순서 꼬임·유실rebalance_cb에서 assign/unassign
flowchart TB
    subgraph 문제[실무 문제]
        P1[DB 부하] --> S1[Kafka 비동기]
        P2[서비스 결합] --> S2[이벤트 디커플링]
        P3[Connection refused] --> S3[설정·EventCb]
        P4[메시지 중복] --> S4[멱등성·트랜잭션]
        P5[dr_cb 미호출] --> S5[poll 필수]
        P6[리밸런싱] --> S6[rebalance_cb]
    end

이 글에서 다루는 것:

  • librdkafka 설치 및 CMake 연동
  • 완전한 프로듀서·컨슈머 예제 (콜백 포함)
  • 트랜잭션 프로듀서·정확히 한 번 전달
  • 스트리밍 파이프라인 (read → transform → produce)
  • 자주 발생하는 에러와 해결법
  • 베스트 프랙티스·프로덕션 패턴

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

1. 환경 설정

필수 의존성

항목버전비고
C++C++14 이상C++17 권장
librdkafka2.0+vcpkg, Homebrew, 또는 소스 빌드
Apache Kafka2.8+브로커 (Docker 권장)
CMake3.16+FindPackage 지원

Kafka 브로커 실행 (Docker)

# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_CFG_NODE_ID=0 \
  -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
  -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  apache/kafka:3.6
# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic processed-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

librdkafka 설치

# vcpkg (권장)
vcpkg install rdkafka
# macOS (Homebrew)
brew install librdkafka
# Ubuntu/Debian
sudo apt-get install librdkafka-dev

CMakeLists.txt 기본 설정

cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(RdKafka CONFIG REQUIRED)
add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)
add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)

주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.

2. 완전한 프로듀서 예제

2.1 최소 동작 프로듀서

// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string topic = argc > 2 ? argv[2] : "app-logs";
    std::string errstr;
    // 1. 전역 설정
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    // 2. 프로듀서 생성
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }
    // 3. 메시지 발행
    std::string payload = "Hello, Kafka from C++!";
    RdKafka::ErrorCode err = producer->produce(
        topic,
        RdKafka::Topic::PARTITION_UA,  // 파티션 자동 할당
        RdKafka::Producer::RK_MSG_COPY,
        const_cast<char*>(payload.data()),
        payload.size(),
        "key1", 5,  // 키 (선택)
        0, nullptr);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
    } else {
        std::cout << "메시지 발행 완료" << std::endl;
    }
    // 4. 대기 중인 delivery report 처리 (필수!)
    while (producer->outq_len() > 0) {
        producer->poll(100);
    }
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • PARTITION_UA: 파티션 자동 선택 (키 해시 또는 라운드로빈)
  • RK_MSG_COPY: 페이로드 복사 저장 (호출 후 버퍼 수정 가능)
  • outq_len(): 전송 대기 메시지 수. 0이 될 때까지 poll() 호출 필수

2.2 Delivery Report 콜백 (실전용)

발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다.

// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
        } else {
            std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
                      << msg.partition() << "] @ " << msg.offset() << std::endl;
        }
    }
};
int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    DeliveryReportCb dr_cb;
    conf->set("dr_cb", &dr_cb, errstr);
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }
    std::string payload = "Test message with delivery report";
    producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
                      RdKafka::Producer::RK_MSG_COPY,
                      const_cast<char*>(payload.data()), payload.size(),
                      nullptr, 0, 0, nullptr);
    // poll을 호출해야 dr_cb가 실행됨
    for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
        producer->poll(100);
    }
    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다.

2.3 RAII 래퍼 클래스 (재사용 가능)

// kafka_producer.hpp
class KafkaProducer {
public:
    KafkaProducer(const std::string& brokers, const std::string& client_id = "cpp-producer");
    void produce(const std::string& topic, const std::string& key, const std::string& value);
    void flush(int timeout_ms = 10000);
    void poll(int timeout_ms = 0);
private:
    struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
        void dr_cb(RdKafka::Message& msg) override {
            if (msg.err()) std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
        }
    } dr_cb_;
    std::unique_ptr<RdKafka::Conf> conf_;
    std::unique_ptr<RdKafka::Producer> producer_;
};
// produce() 후 poll(0) 호출 필수

3. 완전한 컨슈머 예제

3.1 최소 동작 컨슈머

// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sigterm_handler(int) { run = 0; }
int main(int argc, char** argv) {
    std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
    std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
    std::string topic = argc > 3 ? argv[3] : "app-logs";
    std::string errstr;
    signal(SIGINT, sigterm_handler);
    signal(SIGTERM, sigterm_handler);
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    conf->set("group.id", group_id, errstr);
    conf->set("enable.auto.commit", "false", errstr);  // 수동 커밋
    RdKafka::KafkaConsumer* consumer =
        RdKafka::KafkaConsumer::create(conf, errstr);
    delete conf;
    if (!consumer) {
        std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
        return 1;
    }
    std::vector<std::string> topics = {topic};
    RdKafka::ErrorCode err = consumer->subscribe(topics);
    if (err) {
        std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
        delete consumer;
        return 1;
    }
    std::cout << "메시지 수신 대기 중....(Ctrl+C로 종료)" << std::endl;
    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR:
                std::cout << "[" << msg->topic_name() << ":" << msg->partition()
                          << "@" << msg->offset() << "] "
                          << std::string(static_cast<const char*>(msg->payload()),
                                         msg->len())
                          << std::endl;
                consumer->commit(msg);  // 오프셋 커밋
                break;
            case RdKafka::ERR__TIMED_OUT:
                break;
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                std::cerr << "소비 에러: " << msg->errstr() << std::endl;
                run = 0;
                break;
        }
        delete msg;
    }
    consumer->close();
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • enable.auto.commit=false: 처리 완료 후 수동 commit(). 크래시 시 재처리 가능
  • consume(1000): 1초 타임아웃. 메시지 없으면 ERR__TIMED_OUT
  • commit(msg): 해당 메시지 오프셋까지 커밋

3.2 consume 루프 시퀀스

sequenceDiagram
    participant C as 컨슈머
    participant B as 브로커
    C->>B: subscribe(topics)
    B->>C: 파티션 할당
    loop consume
        C->>B: consume(1000)
        alt 메시지 있음
            B->>C: Message
            C->>C: 비즈니스 로직
            C->>B: commit(msg)
        else 타임아웃
            B->>C: ERR__TIMED_OUT
        end
    end

4. 콜백 (DeliveryReport·리밸런싱·Event)

4.1 DeliveryReportCb (프로듀서)

발행 결과(성공/실패, 파티션, 오프셋)를 비동기로 수신합니다.

class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message& msg) override {
        if (msg.err()) {
            std::cerr << "[DR] 실패: " << msg.errstr()
                      << " topic=" << msg.topic_name()
                      << " partition=" << msg.partition() << std::endl;
        } else {
            std::cout << "[DR] 성공: " << msg.topic_name()
                      << "[" << msg.partition() << "] @ " << msg.offset() << std::endl;
        }
    }
};
// 설정
DeliveryReportCb dr_cb;
conf->set("dr_cb", &dr_cb, errstr);

4.2 RebalanceCb (컨슈머 그룹)

파티션 재할당 시 호출됩니다. ERR__ASSIGN_PARTITIONSassign, ERR__REVOKE_PARTITIONSunassign 처리.

class RebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb(RdKafka::KafkaConsumer* consumer,
                     RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*>& partitions) override {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
            consumer->assign(partitions);
            std::cout << "[리밸런싱] 파티션 할당: ";
            for (auto* p : partitions)
                std::cout << p->topic() << "[" << p->partition() << "] ";
            std::cout << std::endl;
        } else {
            consumer->unassign();
            std::cout << "[리밸런싱] 파티션 해제" << std::endl;
        }
    }
};
// 설정
RebalanceCb rebalance_cb;
conf->set("rebalance_cb", &rebalance_cb, errstr);

4.3 EventCb (에러·통계)

연결 에러, throttle, 통계 이벤트를 수신합니다.

class EventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        switch (event.type()) {
            case RdKafka::Event::EVENT_ERROR:
                if (event.fatal()) {
                    std::cerr << "치명적 에러: " << event.str() << std::endl;
                } else {
                    std::cerr << "에러: " << event.str() << std::endl;
                }
                break;
            case RdKafka::Event::EVENT_THROTTLE:
                std::cerr << "Throttled: " << event.throttle_time() << "ms by "
                          << event.broker_name() << std::endl;
                break;
            case RdKafka::Event::EVENT_STATS:
                // event.str()에 JSON 통계
                break;
            default:
                break;
        }
    }
};
// 설정
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);

콜백 등록: conf->set("rebalance_cb", &rebalance_cb, errstr), conf->set("event_cb", &event_cb, errstr)로 등록. 콜백 객체는 컨슈머 수명보다 길어야 합니다.

5. 트랜잭션·정확히 한 번 전달

5.1 전달 시맨틱 비교

시맨틱설명중복유실
at-most-once커밋 후 처리없음가능
at-least-once처리 후 커밋가능없음
exactly-once멱등성·트랜잭션없음없음

5.2 프로듀서 멱등성 (enable.idempotence)

conf->set("enable.idempotence", "true", errstr);
// retries, acks, max.in.flight.requests.per.connection가 자동으로 안전한 값으로 설정됨

5.3 트랜잭션 프로듀서

여러 토픽에 원자적으로 쓰기, consume-transform-produce를 원자적으로 처리할 때 사용합니다.

// transactional_producer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    conf->set("transactional.id", "my-txn-id", errstr);  // 유일값 필수
    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }
    // 1. 트랜잭션 초기화
    RdKafka::ErrorCode err = producer->init_transactions(5000);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "init_transactions 실패: " << RdKafka::err2str(err) << std::endl;
        delete producer;
        return 1;
    }
    // 2. 트랜잭션 시작
    err = producer->begin_transaction();
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "begin_transaction 실패: " << RdKafka::err2str(err) << std::endl;
        delete producer;
        return 1;
    }
    // 3. 여러 토픽에 발행
    std::string msg1 = "order-123";
    std::string msg2 = "event-order-123";
    producer->produce("orders", RdKafka::Topic::PARTITION_UA,
                     RdKafka::Producer::RK_MSG_COPY,
                     const_cast<char*>(msg1.data()), msg1.size(),
                     nullptr, 0, 0, nullptr);
    producer->produce("order-events", RdKafka::Topic::PARTITION_UA,
                     RdKafka::Producer::RK_MSG_COPY,
                     const_cast<char*>(msg2.data()), msg2.size(),
                     nullptr, 0, 0, nullptr);
    // 4. 트랜잭션 커밋
    err = producer->commit_transaction(5000);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "commit_transaction 실패: " << RdKafka::err2str(err) << std::endl;
        producer->abort_transaction(5000);
    }
    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: transactional.id는 클러스터 내에서 유일해야 합니다.

5.4 컨슈머 read_committed

트랜잭션으로 커밋된 메시지만 읽으려면:

conf->set("isolation.level", "read_committed", errstr);

5.5 멱등성 키로 중복 제거

// 멱등성 캐시 (Redis/DB 권장)
class IdempotencyCache {
public:
    bool try_acquire(const std::string& key) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (seen_.count(key)) return false;
        seen_.insert(key);
        if (seen_.size() > 100000) seen_.erase(seen_.begin());
        return true;
    }
private:
    std::unordered_set<std::string> seen_;
    std::mutex mutex_;
};
// 메시지 처리 시
std::string idempotency_key = msg->key() ?
    std::string(msg->key()->data(), msg->key()->size()) :
    std::string(msg->topic_name()) + "-" + std::to_string(msg->partition()) +
    "-" + std::to_string(msg->offset());
if (idempotency_cache.try_acquire(idempotency_key)) {
    do_business_logic(msg);
}
consumer->commit(msg);

6. 스트리밍 파이프라인

6.1 topic-a → transform → topic-b

C++에서는 컨슈머로 읽고 → 변환 → 프로듀서로 쓰는 패턴으로 스트림 처리를 구현합니다.

sequenceDiagram
    participant C as 컨슈머
    participant App as C++ 앱
    participant P as 프로듀서
    C->>App: consume(topic-a)
    App->>App: transform/filter/aggregate
    App->>P: produce(topic-b)
    App->>C: commit (발행 성공 후)

6.2 완전한 스트리밍 파이프라인 예제

// stream_pipeline.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sig_handler(int) { run = 0; }
int main() {
    std::string errstr;
    std::string brokers = "localhost:9092";
    RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    c_conf->set("bootstrap.servers", brokers, errstr);
    c_conf->set("group.id", "stream-pipeline-group", errstr);
    c_conf->set("enable.auto.commit", "false", errstr);
    RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
    delete c_conf;
    RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    p_conf->set("bootstrap.servers", brokers, errstr);
    RdKafka::Producer* producer = RdKafka::Producer::create(p_conf, errstr);
    delete p_conf;
    consumer->subscribe({"raw-logs"});
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);
    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR: {
                std::string payload(static_cast<const char*>(msg->payload()),
                                    msg->len());
                std::string transformed;
                for (char c : payload) {
                    transformed += (c >= 'a' && c <= 'z') ? (c - 32) : c;
                }
                RdKafka::ErrorCode err = producer->produce(
                    "processed-logs",
                    RdKafka::Topic::PARTITION_UA,
                    RdKafka::Producer::RK_MSG_COPY,
                    const_cast<char*>(transformed.data()),
                    transformed.size(),
                    msg->key() ? msg->key()->data() : nullptr,
                    msg->key() ? msg->key()->size() : 0,
                    0, nullptr);
                if (err == RdKafka::ERR_NO_ERROR) {
                    consumer->commit(msg);
                }
                break;
            }
            case RdKafka::ERR__TIMED_OUT:
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                run = 0;
                break;
        }
        delete msg;
        producer->poll(0);
    }
    producer->flush(5000);
    consumer->close();
    delete producer;
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

핵심: 발행 성공 후에만 commit(). at-least-once 보장.

6.3 필터링 파이프라인

// 에러 로그만 필터링
bool should_forward(const std::string& payload) {
    return payload.find("ERROR") != std::string::npos ||
           payload.find("FATAL") != std::string::npos;
}
if (msg->err() == RdKafka::ERR_NO_ERROR) {
    std::string payload(static_cast<const char*>(msg->payload()), msg->len());
    if (should_forward(payload)) {
        producer->produce("error-logs", RdKafka::Topic::PARTITION_UA,
                         RdKafka::Producer::RK_MSG_COPY,
                         const_cast<char*>(payload.data()), payload.size(),
                         nullptr, 0, 0, nullptr);
    }
    consumer->commit(msg);
}

7. 자주 발생하는 에러와 해결법

에러 1: “Connection refused”

원인: 브로커 미실행, 주소/포트 오류, Docker/방화벽. 해결:

docker ps | grep kafka  # 브로커 실행 확인
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);

에러 2: “Topic ‘xxx’ not present in metadata”

해결: kafka-topics --create로 토픽 사전 생성, metadata.max.age.ms=5000.

에러 3: “Message size too large”

해결: conf->set("message.max.bytes", "10485760", errstr); (10MB).

에러 4: “Commit failed: Local: No offset stored”

원인: enable.auto.commit=true + 수동 commit, 또는 리밸런싱 중. 해결: enable.auto.commit=false, rebalance_cb 구현.

에러 5: 메시지 중복 처리

해결:

// ✅ 처리 완료 후 커밋
doBusinessLogic(msg);
consumer->commit(msg);
// ❌ 나쁜 예: 처리 전 커밋 → 크래시 시 재처리됨
consumer->commit(msg);
doBusinessLogic(msg);

에러 6: “Queue full” (ERR__QUEUE_FULL)

해결: queue.buffering.max.messages, queue.buffering.max.kbytes 증가. 백프레셔: while (producer->outq_len() > 10000) producer->poll(100);

에러 7: DeliveryReportCb가 호출되지 않음

원인: poll() 미호출. 해결: while (running) { producer->produce(...); producer->poll(100); } — poll 필수.

에러 8: “Transaction coordinator not found”

원인: 브로커가 트랜잭션 미지원. 해결: init_transactions() 실패 시 멱등성 모드로 fallback.

에러 9: “Consumer group is rebalancing”

해결: rebalance_cb에서 ERR__REVOKE_PARTITIONSwait_for_pending_processing()unassign, ERR__ASSIGN_PARTITIONSassign.

8. 베스트 프랙티스

항목권장
poll()프로듀서: produce()poll(100) 필수. 컨슈머: consume() 내부적으로 poll
커밋처리 완료 후 consumer->commit(msg)
프로듀서매 요청마다 생성 금지 — 풀/싱글톤 재사용
메시지 키같은 키 → 같은 파티션 (순서 보장): key = "user:" + std::to_string(user_id)
에러 처리produce() 반환값 검사, ERR__QUEUE_FULL 시 백프레셔

9. 프로덕션 패턴

9.1 Graceful Shutdown

static std::atomic<bool> running{true};
void sig_handler(int) { running = false; }
int main() {
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);
    while (running) {
        producer->produce(...);
        producer->poll(100);
    }
    producer->flush(10000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

9.2 헬스 체크·환경 변수·SSL

  • 헬스 체크: producer->metadata(true, nullptr, &meta, 5000)로 브로커 연결 확인
  • 환경 변수: std::getenv("KAFKA_BROKERS"), KAFKA_GROUP_ID로 설정 외부화
  • SSL/TLS + SASL: security.protocol=sasl_ssl, sasl.mechanisms=PLAIN, ssl.ca.location
  • 성능: linger.ms=5, batch.size=16384, compression.type=lz4

10. 구현 체크리스트

환경 설정

  • Kafka 브로커 실행 확인
  • 토픽 사전 생성
  • librdkafka 설치 및 CMake 연동

프로듀서

  • bootstrap.servers 설정
  • dr_cb 등록 및 poll() 주기적 호출
  • flush() 또는 outq_len()==0 대기 후 종료
  • 배치·압축 설정 (성능 요구 시)

컨슈머

  • group.id 설정
  • enable.auto.commit=false 시 수동 commit()
  • rebalance_cb 등록 (컨슈머 그룹 사용 시)
  • event_cb 등록 (에러 모니터링)

에러 처리

  • produce() 반환값 검사
  • msg->err() 검사
  • ERR__QUEUE_FULL 시 백프레셔

프로덕션

  • Graceful Shutdown (flush, wait_destroyed)
  • SSL/TLS, SASL (필요 시)
  • 환경 변수로 설정 외부화

11. 정리

항목요약
librdkafkaC/C++ Kafka 클라이언트의 사실상 표준
프로듀서Conf → Producer::create → produce → poll → flush
컨슈머Conf → KafkaConsumer::create → subscribe → consume → commit
콜백dr_cb, rebalance_cb, event_cb
트랜잭션transactional.id, init_transactions, begin/commit/abort
스트리밍consume → transform → produce, 발행 후 커밋
에러Connection refused, Topic not found, Queue full, Rebalance
프로덕션Graceful Shutdown, SSL/TLS, SASL, 환경 변수
핵심 원칙:
  1. poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
  2. 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
  3. 리밸런싱: rebalance_cb에서 assign/unassign 처리
  4. 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지

자주 묻는 질문 (FAQ)

Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?

A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.

Q. 정확히 한 번 전달은 어떻게 하나요?

A. at-least-once: 처리 후 커밋. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. 멱등성 키로 비즈니스 레벨 중복 제거도 가능합니다.

Q. 메시지 순서가 보장되나요?

A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.

Q. librdkafka는 스레드 안전한가요?

A. 네. librdkafka API는 스레드 안전합니다. 콜백 객체는 프로듀서/컨슈머 수명보다 길어야 합니다. 한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인을 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다. 다음 글: C++ 시리즈 목차 이전 글: Kafka 완벽 가이드(#52-5)

참고 자료


관련 글

심화 부록: 구현·운영 관점

이 부록은 앞선 본문에서 다룬 주제(「C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인」)를 구현·런타임·운영 관점에서 다시 압축합니다. 도메인별 세부 구현은 글마다 다르지만, 입력 검증 → 핵심 연산 → 부작용(I/O·네트워크·동시성) → 관측의 흐름으로 장애를 나누면 원인 추적이 빨라집니다.

내부 동작과 핵심 메커니즘

flowchart TD
  A[입력·요청·이벤트] --> B[파싱·검증·디코딩]
  B --> C[핵심 연산·상태 전이]
  C --> D[부작용: I/O·네트워크·동시성]
  D --> E[결과·관측·저장]
sequenceDiagram
  participant C as 클라이언트/호출자
  participant B as 경계(런타임·게이트웨이·프로세스)
  participant D as 의존성(API·DB·큐·파일)
  C->>B: 요청/이벤트
  B->>D: 조회·쓰기·RPC
  D-->>B: 지연·부분 실패·재시도 가능
  B-->>C: 응답 또는 오류(코드·상관 ID)
  • 불변 조건(Invariant): 버퍼 경계, 프로토콜 상태, 트랜잭션 격리, FD 상한 등 단계별로 문장으로 적어 두면 디버깅 비용이 줄어듭니다.
  • 결정성: 순수 층과 시간·네트워크·스케줄에 의존하는 층을 분리해야 테스트와 장애 분석이 쉬워집니다.
  • 경계 비용: 직렬화, 인코딩, syscall 횟수, 락 경합, 할당·GC, 캐시 미스를 의심 목록에 둡니다.
  • 백프레셔: 생산자가 소비자보다 빠를 때 버퍼·큐·스트림에서 속도를 줄이는 신호를 어디에 둘지 정의합니다.

프로덕션 운영 패턴

영역운영 관점 질문
관측성요청 단위 상관 ID, 에러율·지연 p95/p99, 의존성 타임아웃·재시도가 대시보드에 보이는가
안전성입력 검증·권한·비밀·감사 로그가 코드 경로마다 일관적인가
신뢰성재시도는 멱등 연산에만 적용되는가, 서킷 브레이커·백오프·DLQ가 있는가
성능캐시·배치 크기·커넥션 풀·인덱스·백프레셔가 데이터 규모에 맞는가
배포롤백 룬북, 카나리/블루그린, 마이그레이션·피처 플래그가 문서화되어 있는가
용량피크 트래픽·디스크·FD·스레드 풀 상한을 주기적으로 검증하는가

스테이징은 데이터 양·네트워크 RTT·동시성을 프로덕션에 가깝게 맞출수록 재현율이 올라갑니다.

확장 예시: 엔드투엔드 미니 시나리오

앞선 본문 주제(「C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인」)를 배포·운영 흐름에 맞춰 옮긴 체크리스트입니다. 도메인에 맞게 단계 이름만 바꿔 적용할 수 있습니다.

  1. 입력 계약 고정: 스키마·버전·최대 페이로드·타임아웃·에러 코드를 경계에 둔다.
  2. 핵심 경로 계측: 요청 ID, 단계별 지연, 외부 호출 결과 코드를 로그·메트릭·트레이스에서 한 흐름으로 본다.
  3. 실패 주입: 의존성 타임아웃·5xx·부분 데이터·락 대기를 스테이징에서 재현한다.
  4. 호환·롤백: 설정/마이그레이션/클라이언트 버전을 되돌릴 수 있는지 확인한다.
  5. 부하 후 검증: 피크 대비 p95/p99, 에러율, 리소스 상한, 알림 임계값을 점검한다.
handle(request):
  ctx = newCorrelationId()
  validated = validateSchema(request)
  authorize(validated, ctx)
  result = domainCore(validated)
  persistOrEmit(result, idempotentKey)
  recordMetrics(ctx, latency, outcome)
  return result

문제 해결(Troubleshooting)

증상가능 원인조치
간헐적 실패레이스, 타임아웃, 외부 의존성, DNS최소 재현 스크립트, 분산 트레이스·로그 상관관계, 재시도·서킷 설정 점검
성능 저하N+1, 동기 I/O, 락 경합, 과도한 직렬화, 캐시 미스프로파일러·APM으로 핫스팟 확인 후 한 가지씩 제거
메모리 증가캐시 무제한, 구독/리스너 누수, 대용량 버퍼, 커넥션 미반납상한·TTL·힙/FD 스냅샷 비교
빌드·배포만 실패환경 변수, 권한, 플랫폼 차이, lockfileCI 로그와 로컬 diff, 런타임·이미지 버전 핀
설정 불일치프로필·시크릿·기본값, 리전스키마 검증된 설정 단일 소스와 배포 매트릭스 표준화
데이터 불일치비멱등 재시도, 부분 쓰기, 캐시 무효화 누락멱등 키·아웃박스·트랜잭션 경계 재검토

권장 순서: (1) 최소 재현 (2) 최근 변경 범위 축소 (3) 환경·의존성 차이 (4) 관측으로 가설 검증 (5) 수정 후 회귀·부하 테스트.

배포 전에는 git addgit commitgit pushnpm run deploy 순서를 권장합니다.


같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.


이 글에서 다루는 키워드 (관련 검색어)

C++, Kafka, librdkafka, 이벤트 스트리밍, 메시지큐, 트랜잭션, 실전 등으로 검색하시면 이 글이 도움이 됩니다.