C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]

C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]

이 글의 핵심

C++ Kafka 심화: librdkafka 스트림 처리 패턴, 트랜잭션 프로듀서, 정확히 한 번 전달, KSQL 대안. 실무 문제 시나리오, 완전한 예제, 자주 발생하는 에러, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.

들어가며: “스트림 처리·정확히 한 번이 막막해요”

실제 겪는 문제 시나리오

Kafka 기본(#52-5)에서 프로듀서·컨슈머·오프셋·리밸런싱을 다뤘다면, 이 글에서는 고급 기능을 다룹니다. 실무에서 자주 맞닥뜨리는 문제와 해결 방법을 제시합니다.

시나리오 1: 토픽 A → 처리 → 토픽 B 파이프라인

상황: 클릭 로그를 읽어 집계 후 요약 토픽에 저장하는 파이프라인
문제: C++에서 Kafka Streams/KSQL 없이 어떻게 구현하나?
결과: 컨슈머-프로듀서 패턴으로 read → transform → produce

시나리오 2: 주문 이벤트 중복 처리로 재고가 음수

상황: 컨슈머 재시작 시 같은 메시지를 다시 처리해 재고가 두 번 차감됨
문제: at-least-once만으로는 중복 제거 불가
결과: 멱등성 키 설계 또는 정확히 한 번(idempotence + 트랜잭션) 적용

시나리오 3: 여러 토픽에 원자적으로 쓰기

상황: 주문 생성 시 orders 토픽 + order-events 토픽에 동시에 저장해야 함
문제: 중간에 실패하면 일부만 반영되어 데이터 불일치
결과: 트랜잭션 프로듀서로 여러 토픽에 원자적 발행

시나리오 4: 실시간 윈도우 집계

상황: 1분 단위로 API 호출 수를 집계해 대시보드에 표시
문제: C++에서 Kafka Streams 없이 윈도우 집계를 어떻게?
결과: 시간 윈도우 버퍼 + 주기적 flush로 시뮬레이션

시나리오 5: 컨슈머 처리 중 오프셋 커밋 타이밍

상황: 배치 처리 후 커밋 vs 메시지별 커밋 시 리밸런싱 시 중복 유실
문제: 리밸런싱 시 처리 중인 메시지가 다른 컨슈머로 넘어감
결과: 처리 완료 직후 커밋, 리밸런싱 콜백에서 상태 정리
flowchart TB
    subgraph 문제[실무 문제]
        P1[스트림 파이프라인] --> S1[컨슈머-프로듀서]
        P2[중복 처리] --> S2[멱등성·정확히 한 번]
        P3[원자적 다중 토픽] --> S3[트랜잭션]
        P4[윈도우 집계] --> S4[버퍼·주기 flush]
        P5[리밸런싱] --> S5[상태·커밋 전략]
    end

목표:

  • 스트림 처리: read → transform → produce
  • 트랜잭션: 여러 토픽 원자적 발행
  • 정확히 한 번: idempotence·멱등성·트랜잭션
  • 윈도우 집계: C++에서 시뮬레이션
  • 프로덕션: 모니터링·백프레셔·재시도

요구 환경: C++17 이상, librdkafka 2.0+, Kafka 기본(#52-5) 선행

이 글을 읽으면:

  • 스트림 처리 파이프라인을 C++로 구현할 수 있습니다.
  • 트랜잭션·정확히 한 번 전달을 적용할 수 있습니다.
  • 실무 에러·성능·프로덕션 패턴을 활용할 수 있습니다.

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

목차

  1. 스트림 처리 패턴
  2. 정확히 한 번 전달
  3. 트랜잭션 프로듀서
  4. 완전한 Kafka 예제
  5. 자주 발생하는 에러와 해결법
  6. 성능 최적화 팁
  7. 프로덕션 패턴
  8. 구현 체크리스트
  9. 정리

1. 스트림 처리 패턴

C++에서 스트림 처리란?

Kafka Streams나 KSQL은 JVM 기반입니다. C++에서는 컨슈머로 읽고 → 변환 → 프로듀서로 쓰는 패턴으로 스트림 처리를 구현합니다.

sequenceDiagram
    participant C as 컨슈머
    participant App as C++ 앱
    participant P as 프로듀서

    C->>App: consume(topic-a)
    App->>App: transform/filter/aggregate
    App->>P: produce(topic-b)

1.1 단순 파이프라인: topic-a → topic-b

// stream_pipeline.cpp
// 컴파일: g++ -std=c++17 -o stream_pipeline stream_pipeline.cpp -lrdkafka -lrdkafka++

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>

static volatile sig_atomic_t run = 1;
void sig_handler(int) { run = 0; }

int main() {
    std::string errstr;
    std::string brokers = "localhost:9092";

    // 1. 컨슈머 설정
    RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    c_conf->set("bootstrap.servers", brokers, errstr);
    c_conf->set("group.id", "stream-pipeline-group", errstr);
    c_conf->set("enable.auto.commit", "false", errstr);

    RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
    delete c_conf;
    if (!consumer) {
        std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
        return 1;
    }

    // 2. 프로듀서 설정
    RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    p_conf->set("bootstrap.servers", brokers, errstr);

    RdKafka::Producer* producer = RdKafka::Producer::create(p_conf, errstr);
    delete p_conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        delete consumer;
        return 1;
    }

    consumer->subscribe({"raw-logs"});
    signal(SIGINT, sig_handler);
    signal(SIGTERM, sig_handler);

    std::cout << "스트림 파이프라인 시작 (raw-logs → processed-logs)" << std::endl;

    while (run) {
        RdKafka::Message* msg = consumer->consume(1000);
        switch (msg->err()) {
            case RdKafka::ERR_NO_ERROR: {
                std::string payload(static_cast<const char*>(msg->payload()),
                                    msg->len());
                // 변환: 대문자 변환 (예시)
                std::string transformed;
                for (char c : payload) {
                    transformed += (c >= 'a' && c <= 'z') ? (c - 32) : c;
                }
                // 출력 토픽에 발행
                RdKafka::ErrorCode err = producer->produce(
                    "processed-logs",
                    RdKafka::Topic::PARTITION_UA,
                    RdKafka::Producer::RK_MSG_COPY,
                    const_cast<char*>(transformed.data()),
                    transformed.size(),
                    msg->key() ? msg->key()->data() : nullptr,
                    msg->key() ? msg->key()->size() : 0,
                    0, nullptr);
                if (err != RdKafka::ERR_NO_ERROR) {
                    std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
                } else {
                    consumer->commit(msg);  // 처리 완료 후 커밋
                }
                break;
            }
            case RdKafka::ERR__TIMED_OUT:
            case RdKafka::ERR__PARTITION_EOF:
                break;
            default:
                std::cerr << "소비 에러: " << msg->errstr() << std::endl;
                run = 0;
                break;
        }
        delete msg;
        producer->poll(0);
    }

    producer->flush(5000);
    consumer->close();
    delete producer;
    delete consumer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

코드 설명:

  • raw-logs → 읽기 → 변환 → processed-logs 발행
  • 처리 완료 후 commit(): 발행 성공 후에만 커밋해 유실 방지
  • producer->poll(0): delivery report 처리

주의: 발행 실패 시 commit()하지 않으면 재시작 시 재처리됩니다. at-least-once 보장.

1.2 필터링: 조건에 맞는 메시지만 전달

// stream_filter.cpp - 에러 로그만 필터링

bool should_forward(const std::string& payload) {
    return payload.find("ERROR") != std::string::npos ||
           payload.find("FATAL") != std::string::npos;
}

// consume 루프 내부
if (msg->err() == RdKafka::ERR_NO_ERROR) {
    std::string payload(static_cast<const char*>(msg->payload()), msg->len());
    if (should_forward(payload)) {
        producer->produce("error-logs", RdKafka::Topic::PARTITION_UA,
                         RdKafka::Producer::RK_MSG_COPY,
                         const_cast<char*>(payload.data()), payload.size(),
                         nullptr, 0, 0, nullptr);
    }
    consumer->commit(msg);  // 필터 통과 여부와 관계없이 소비 완료
}

1.3 시간 윈도우 집계 (1분 단위)

// stream_window_aggregate.cpp - 1분 윈도우 카운트

#include <librdkafka/rdkafkacpp.h>
#include <chrono>
#include <map>
#include <string>

class WindowAggregator {
public:
    using WindowKey = std::string;
    using CountMap = std::map<WindowKey, int64_t>;

    void add(const std::string& key, int64_t count = 1) {
        auto window = current_window();
        counts_[window][key] += count;
    }

    void flush_if_needed(RdKafka::Producer* producer) {
        auto now = std::chrono::system_clock::now();
        auto current = current_window();
        for (auto it = counts_.begin(); it != counts_.end();) {
            if (it->first < current) {
                for (const auto& [k, v] : it->second) {
                    std::string msg = k + ":" + std::to_string(v);
                    producer->produce("aggregated", RdKafka::Topic::PARTITION_UA,
                                     RdKafka::Producer::RK_MSG_COPY,
                                     const_cast<char*>(msg.data()), msg.size(),
                                     nullptr, 0, 0, nullptr);
                }
                it = counts_.erase(it);
            } else {
                ++it;
            }
        }
    }

private:
    std::string current_window() {
        auto now = std::chrono::system_clock::now();
        auto sec = std::chrono::duration_cast<std::chrono::seconds>(
            now.time_since_epoch()).count();
        return std::to_string((sec / 60) * 60);  // 1분 단위
    }
    std::map<std::string, CountMap> counts_;
};

2. 정확히 한 번 전달

2.1 전달 시맨틱 비교

시맨틱설명중복유실
at-most-once커밋 후 처리없음가능
at-least-once처리 후 커밋가능없음
exactly-once멱등성·트랜잭션없음없음

2.2 멱등성 키로 중복 제거

비즈니스 로직에서 멱등성 키(idempotency key)를 사용해 중복 처리를 방지합니다.

// exactly_once_idempotent.cpp - 멱등성 키 기반 중복 제거

#include <unordered_set>
#include <string>

class IdempotencyCache {
public:
    bool try_acquire(const std::string& key) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (seen_.count(key)) return false;  // 이미 처리됨
        seen_.insert(key);
        if (seen_.size() > max_size_) {
            // FIFO: 오래된 키 제거 (간단히 처음 것 제거)
            seen_.erase(seen_.begin());
        }
        return true;
    }
private:
    std::unordered_set<std::string> seen_;
    std::mutex mutex_;
    size_t max_size_ = 100000;
};

// 메시지 처리 시
std::string idempotency_key = msg->key() ?
    std::string(msg->key()->data(), msg->key()->size()) :
    std::string(msg->topic_name()) + "-" + std::to_string(msg->partition()) +
    "-" + std::to_string(msg->offset());

if (idempotency_cache.try_acquire(idempotency_key)) {
    do_business_logic(msg);  // 실제 처리
}
consumer->commit(msg);  // 커밋은 항상 (재처리 시 멱등성으로 스킵)

주의: idempotency_key는 Redis·DB에 저장해 재시작 후에도 유지하는 것이 좋습니다. 메모리 캐시는 재시작 시 초기화됩니다.

2.3 프로듀서 멱등성 (enable.idempotence)

프로듀서가 메시지를 중복 발행하지 않도록 브로커 설정을 합니다.

// 프로듀서 설정
conf->set("enable.idempotence", "true", errstr);
// 이 설정 시 retries, acks, max.in.flight.requests.per.connection가 자동으로
// 안전한 값으로 설정됨

효과: 네트워크 오류로 재시도 시 브로커가 중복을 제거합니다. 단일 토픽 발행에만 적용됩니다.


3. 트랜잭션 프로듀서

3.1 트랜잭션이 필요한 경우

  • 여러 토픽에 원자적으로 쓰기
  • consume → transform → produce를 원자적으로 처리 (read-process-write)
sequenceDiagram
    participant C as 컨슈머
    participant App as 앱
    participant P as 트랜잭션 프로듀서

    P->>P: init_transactions()
    P->>P: begin_transaction()
    C->>App: consume
    App->>P: produce (topic-a)
    App->>P: produce (topic-b)
    P->>P: commit_transaction()
    App->>C: commit (offsets)

3.2 librdkafka 트랜잭션 API (C++)

librdkafka는 C API로 트랜잭션을 지원합니다. C++에서는 RdKafka::Producerinit_transactions() 등이 있습니다.

// transactional_producer.cpp

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>

int main() {
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", "localhost:9092", errstr);
    conf->set("transactional.id", "my-txn-id", errstr);  // 트랜잭션 ID 필수

    RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
    delete conf;
    if (!producer) {
        std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
        return 1;
    }

    // 1. 트랜잭션 초기화
    RdKafka::ErrorCode err = producer->init_transactions(5000);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "init_transactions 실패: " << RdKafka::err2str(err) << std::endl;
        delete producer;
        return 1;
    }

    // 2. 트랜잭션 시작
    err = producer->begin_transaction();
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "begin_transaction 실패: " << RdKafka::err2str(err) << std::endl;
        delete producer;
        return 1;
    }

    // 3. 여러 토픽에 발행
    std::string msg1 = "order-123";
    std::string msg2 = "event-order-123";
    producer->produce("orders", RdKafka::Topic::PARTITION_UA,
                     RdKafka::Producer::RK_MSG_COPY,
                     const_cast<char*>(msg1.data()), msg1.size(),
                     nullptr, 0, 0, nullptr);
    producer->produce("order-events", RdKafka::Topic::PARTITION_UA,
                     RdKafka::Producer::RK_MSG_COPY,
                     const_cast<char*>(msg2.data()), msg2.size(),
                     nullptr, 0, 0, nullptr);

    // 4. 트랜잭션 커밋
    err = producer->commit_transaction(5000);
    if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "commit_transaction 실패: " << RdKafka::err2str(err) << std::endl;
        producer->abort_transaction(5000);
    }

    producer->flush(5000);
    delete producer;
    RdKafka::wait_destroyed(5000);
    return 0;
}

주의: transactional.id는 클러스터 내에서 유일해야 합니다. enable.idempotence는 트랜잭션 사용 시 자동으로 활성화됩니다.

3.3 컨슈머 read_committed

트랜잭션으로 커밋된 메시지만 읽으려면:

conf->set("isolation.level", "read_committed", errstr);

3.4 트랜잭션 미지원 브로커

Kafka 2.8+ 및 transaction.state.log.replication.factor가 1 이상이어야 합니다. 구버전이면 트랜잭션 API가 실패합니다. 이 경우 멱등성 키 + at-least-once로 대체합니다.


4. 완전한 Kafka 예제

예제 1: 로그 집계 파이프라인 (에러율 계산)

// log_aggregator.cpp - 에러 로그 비율 집계

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <chrono>
#include <atomic>

struct LogStats {
    std::atomic<int64_t> total{0};
    std::atomic<int64_t> errors{0};
};

void run_log_aggregator(const std::string& brokers) {
    std::string errstr;
    RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    c_conf->set("bootstrap.servers", brokers, errstr);
    c_conf->set("group.id", "log-aggregator", errstr);
    c_conf->set("enable.auto.commit", "false", errstr);

    auto* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
    delete c_conf;

    RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    p_conf->set("bootstrap.servers", brokers, errstr);
    auto* producer = RdKafka::Producer::create(p_conf, errstr);
    delete p_conf;

    consumer->subscribe({"app-logs"});
    LogStats stats;

    while (true) {
        RdKafka::Message* msg = consumer->consume(1000);
        if (msg->err() == RdKafka::ERR_NO_ERROR) {
            std::string payload(static_cast<const char*>(msg->payload()), msg->len());
            stats.total++;
            if (payload.find("ERROR") != std::string::npos) stats.errors++;

            // 1분마다 요약 발행
            auto now = std::chrono::system_clock::now();
            static auto last_flush = now;
            if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_flush).count() > 60000) {
                double rate = (stats.total > 0) ? (100.0 * stats.errors / stats.total) : 0;
                std::string summary = "{\"total\":" + std::to_string(stats.total) +
                    ",\"errors\":" + std::to_string(stats.errors) +
                    ",\"error_rate\":" + std::to_string(rate) + "}";
                producer->produce("log-summary", RdKafka::Topic::PARTITION_UA,
                                 RdKafka::Producer::RK_MSG_COPY,
                                 const_cast<char*>(summary.data()), summary.size(),
                                 nullptr, 0, 0, nullptr);
                last_flush = now;
            }
            consumer->commit(msg);
        }
        delete msg;
        producer->poll(0);
    }

    delete producer;
    delete consumer;
}

예제 2: 주문 이벤트 처리 (멱등성)

// order_processor.cpp - 멱등성 키로 중복 주문 방지

#include <librdkafka/rdkafkacpp.h>
#include <unordered_set>
#include <mutex>
#include <string>

class OrderProcessor {
public:
    void process(RdKafka::Message* msg, RdKafka::KafkaConsumer* consumer) {
        if (msg->err() != RdKafka::ERR_NO_ERROR) return;

        std::string order_id = extract_order_id(msg);
        if (order_id.empty()) {
            consumer->commit(msg);
            return;
        }

        {
            std::lock_guard<std::mutex> lock(mutex_);
            if (processed_.count(order_id)) {
                consumer->commit(msg);  // 이미 처리됨, 스킵
                return;
            }
            processed_.insert(order_id);
            if (processed_.size() > 100000) {
                processed_.erase(processed_.begin());
            }
        }

        // 실제 비즈니스 로직: 재고 차감, DB 저장 등
        do_order_processing(order_id, msg);
        consumer->commit(msg);
    }

private:
    std::string extract_order_id(RdKafka::Message* msg) {
        if (msg->key()) {
            return std::string(msg->key()->data(), msg->key()->size());
        }
        return "";
    }
    void do_order_processing(const std::string& order_id, RdKafka::Message* msg) {
        (void)order_id;
        (void)msg;
        // 재고 차감, 주문 저장 등
    }
    std::unordered_set<std::string> processed_;
    std::mutex mutex_;
};

예제 3: Graceful Shutdown + Flush

// graceful_shutdown.cpp

static std::atomic<bool> running{true};

void shutdown_handler(int sig) {
    (void)sig;
    running = false;
}

int main() {
    signal(SIGINT, shutdown_handler);
    signal(SIGTERM, shutdown_handler);

    // ... 프로듀서·컨슈머 생성

    while (running) {
        RdKafka::Message* msg = consumer->consume(1000);
        if (msg->err() == RdKafka::ERR_NO_ERROR) {
            process(msg);
            consumer->commit(msg);
        }
        delete msg;
    }

    // 1. 프로듀서: 대기 중인 메시지 전송 완료
    producer->flush(10000);
    delete producer;

    // 2. 컨슈머: 정리
    consumer->close();
    delete consumer;

    RdKafka::wait_destroyed(5000);
    return 0;
}

5. 자주 발생하는 에러와 해결법

에러 1: “Transaction coordinator not found”

증상: init_transactions() 또는 begin_transaction() 실패.

원인: 브로커가 트랜잭션을 지원하지 않거나, transaction.state.log.replication.factor가 0.

해결법:

# 브로커 설정 확인
# broker 설정에서 transaction.state.log.replication.factor >= 1
// ✅ 트랜잭션 미지원 시 fallback: 멱등성 키 + at-least-once
err = producer->init_transactions(5000);
if (err != RdKafka::ERR_NO_ERROR) {
    std::cerr << "트랜잭션 미지원, 멱등성 모드로 전환" << std::endl;
    use_transaction_ = false;
}

에러 2: “Fatal error: Local: Invalid argument (transactional.id)”

증상: transactional.id 설정 후 오류.

원인: transactional.id가 비어 있거나, 이미 사용 중인 ID.

해결법:

// ✅ 유일한 ID 사용 (인스턴스별)
conf->set("transactional.id", "my-app-" + std::to_string(getpid()), errstr);

에러 3: “Commit failed: Local: No offset stored”

증상: 리밸런싱 직후 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS.

원인: 컨슈머 그룹 리밸런싱 중에는 커밋이 거부됨.

해결법:

// ✅ 리밸런싱 콜백에서 assign 후에만 commit
if (msg->err() == RdKafka::ERR_NO_ERROR) {
    consumer->commit(msg);
} else if (msg->err() == RdKafka::ERR__REBALANCE_IN_PROGRESS) {
    // rebalance_cb에서 처리됨, 여기서는 consume 계속
}

에러 4: “Producer: Queue full”

증상: produce() 호출 시 ERR__QUEUE_FULL.

원인: 발행 속도 > 브로커 수신 속도.

해결법:

// ✅ 백프레셔: 큐가 비워질 때까지 대기
while (producer->outq_len() > 10000) {
    producer->poll(100);
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
producer->produce(...);

에러 5: 스트림 파이프라인에서 메시지 유실

증상: 컨슈머가 commit() 후 프로듀서 발행 전에 크래시.

원인: 처리 후 커밋 → 발행 순서가 잘못되면, 커밋은 됐는데 발행이 안 된 상태로 재시작됨.

해결법:

// ✅ 발행 성공 후 커밋 (at-least-once)
RdKafka::ErrorCode err = producer->produce(...);
if (err == RdKafka::ERR_NO_ERROR) {
    producer->poll(0);  // dr_cb 확인
    consumer->commit(msg);
}

// ✅ 정확히 한 번이 필요하면 트랜잭션 + send_offsets_to_transaction

에러 6: “Broker: Not leader for partition” 반복

증상: 리더가 자주 바뀌는 파티션에서 지속적 실패.

원인: 브로커 불안정, 네트워크 지연.

해결법:

// ✅ 재시도·백오프
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "500", errstr);
conf->set("message.send.max.retries", "5", errstr);

에러 7: “Consumer group is rebalancing” 후 메시지 중복

증상: 리밸런싱 시 처리 중이던 메시지가 다른 컨슈머로 재할당되어 중복 처리.

원인: rebalance_cb에서 unassign 시 처리 중인 메시지 정리 전에 새 컨슈머가 할당받음.

해결법:

// ✅ rebalance_cb에서 파티션 해제 시 처리 중인 작업 완료 대기
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
                  RdKafka::ErrorCode err,
                  std::vector<RdKafka::TopicPartition*>& partitions) override {
    if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
        // 처리 중인 메시지 완료 대기
        wait_for_pending_processing();
        consumer->unassign();
    } else if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
        consumer->assign(partitions);
    }
}

에러 8: “Message delivery failed: Broker: Message size too large”

증상: 큰 메시지 발행 시 실패.

해결법:

// 프로듀서
conf->set("message.max.bytes", "10485760", errstr);  // 10MB

// 브로커: message.max.bytes, replica.fetch.max.bytes 동일하게

6. 성능 최적화 팁

6.1 스트림 파이프라인 배치 처리

메시지를 모아서 한 번에 처리하면 오버헤드가 줄어듭니다.

// 배치 크기만큼 모아서 처리
const size_t BATCH_SIZE = 100;
std::vector<RdKafka::Message*> batch;
while (batch.size() < BATCH_SIZE) {
    RdKafka::Message* msg = consumer->consume(100);
    if (msg->err() == RdKafka::ERR_NO_ERROR) {
        batch.push_back(msg);
    } else {
        bool timed_out = (msg->err() == RdKafka::ERR__TIMED_OUT);
        delete msg;
        if (timed_out && !batch.empty()) break;
    }
}
for (auto* m : batch) {
    process_and_produce(m);
    consumer->commit(m);
    delete m;
}

6.2 프로듀서 병렬화

// ✅ 여러 스레드에서 동일 프로듀서 사용 (librdkafka는 스레드 안전)
std::thread t1([&]() { producer->produce(...); producer->poll(100); });
std::thread t2([&]() { producer->produce(...); producer->poll(100); });

6.3 압축

conf->set("compression.type", "lz4", errstr);  // 속도·압축률 균형

6.4 linger.ms vs 처리량

linger.ms처리량지연
0낮음1ms
5중간5ms
50높음50ms
conf->set("linger.ms", "5", errstr);
conf->set("batch.size", "32768", errstr);  // 32KB

6.5 파티션 수와 컨슈머 수

  • 파티션 수 ≥ 컨슈머 수: 모든 컨슈머가 활용됨
  • 파티션 수 < 컨슈머 수: 일부 컨슈머는 idle
# 토픽 생성 시 파티션 수 결정
kafka-topics --create --topic events --partitions 12 --replication-factor 2 \
  --bootstrap-server localhost:9092

7. 프로덕션 패턴

7.1 헬스 체크

bool health_check(RdKafka::Producer* producer) {
    RdKafka::Metadata* meta = nullptr;
    RdKafka::ErrorCode err = producer->metadata(true, nullptr, &meta, 5000);
    bool ok = (err == RdKafka::ERR_NO_ERROR && meta);
    if (meta) delete meta;
    return ok;
}

bool health_check(RdKafka::KafkaConsumer* consumer) {
    std::vector<RdKafka::TopicPartition*> partitions;
    RdKafka::ErrorCode err = consumer->assignment(partitions);
    bool ok = (err == RdKafka::ERR_NO_ERROR);
    for (auto* p : partitions) delete p;
    return ok;
}

7.2 메트릭 수집

// librdkafka 통계 (JSON)
conf->set("statistics.interval.ms", "10000", errstr);

class StatsCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) override {
        if (event.type() == RdKafka::Event::EVENT_STATS) {
            // event.str()에 JSON 통계
            std::cout << event.str() << std::endl;
        }
    }
};

7.3 환경 변수 기반 설정

struct KafkaConfig {
    std::string brokers = "localhost:9092";
    std::string group_id = "cpp-consumer";
    std::string transactional_id;
    int linger_ms = 5;
};

KafkaConfig load_config() {
    KafkaConfig c;
    if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
    if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
    if (const char* t = std::getenv("KAFKA_TRANSACTIONAL_ID")) c.transactional_id = t;
    return c;
}

7.4 SSL/TLS + SASL

conf->set("security.protocol", "sasl_ssl", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", std::getenv("KAFKA_USER"), errstr);
conf->set("sasl.password", std::getenv("KAFKA_PASSWORD"), errstr);
conf->set("ssl.ca.location", "/etc/ssl/certs/ca-certificates.crt", errstr);

7.5 멀티 브로커

conf->set("bootstrap.servers",
          "broker1:9092,broker2:9092,broker3:9092", errstr);

7.6 디버그 로깅

conf->set("debug", "broker,topic,msg", errstr);
// 프로덕션에서는 제거

8. 구현 체크리스트

스트림 처리

  • consume → transform → produce 순서 준수
  • 발행 성공 후 commit (유실 방지)
  • producer->poll() 주기적 호출

정확히 한 번

  • 멱등성 키 설계 (키 또는 topic-partition-offset)
  • 멱등성 캐시 저장소 (Redis/DB 권장)
  • enable.idempotence (프로듀서)

트랜잭션

  • transactional.id 설정 (유일값)
  • init_transactions → begin → produce → commit
  • 실패 시 abort_transaction
  • isolation.level=read_committed (컨슈머)

에러 처리

  • produce() 반환값 검사
  • msg->err() 검사
  • ERR__QUEUE_FULL 시 백프레셔
  • rebalance_cb 구현

프로덕션

  • Graceful Shutdown (flush, close)
  • SSL/TLS, SASL (필요 시)
  • 환경 변수로 설정 외부화
  • 헬스 체크·메트릭

9. 정리

항목요약
스트림 처리consume → transform → produce, 발행 후 커밋
정확히 한 번멱등성 키 + enable.idempotence + 트랜잭션
트랜잭션transactional.id, init_transactions, begin/commit/abort
에러Transaction coordinator, Queue full, Rebalance
성능배치 처리, linger.ms, 압축, 파티션 수
프로덕션Graceful Shutdown, SSL/SASL, 헬스 체크

핵심 원칙:

  1. 발행 후 커밋: 처리 완료 후 프로듀서 발행 성공 확인 후 commit
  2. 멱등성: 비즈니스 키로 중복 처리 방지
  3. 트랜잭션: 다중 토픽 원자적 쓰기
  4. 리밸런싱: rebalance_cb에서 상태 정리·재할당 처리

자주 묻는 질문 (FAQ)

Q. 이 내용을 실무에서 언제 쓰나요?

A. 이벤트 스트리밍, 로그 수집, 실시간 데이터 파이프라인, CDC, 금융·주문 시스템의 정확히 한 번 전달 등에 활용합니다. Kafka 기본(#52-5)을 선행으로 읽으세요.

Q. C++에서 Kafka Streams 대신 뭘 쓰나요?

A. Kafka Streams는 JVM 전용입니다. C++에서는 컨슈머-프로듀서 패턴으로 read → transform → produce를 구현합니다. 복잡한 윈도우·조인은 별도 상태 저장소(Redis, DB)와 함께 구현합니다.

Q. 정확히 한 번 전달이 꼭 필요한가요?

A. 금융·주문 시스템에서는 필수입니다. 로그·메트릭은 at-least-once로도 충분할 수 있습니다. 멱등성 키 설계가 가능하면 정확히 한 번 전달을 시뮬레이션할 수 있습니다.

Q. 트랜잭션 프로듀서가 실패하면?

A. abort_transaction()을 호출해 롤백합니다. 재시도 시 begin_transaction()부터 다시 시작합니다.

한 줄 요약: 스트림 처리·트랜잭션·정확히 한 번 전달을 C++로 구현해 실무에 적용할 수 있습니다.

다음 글: C++ 시리즈 목차

이전 글: Kafka 완벽 가이드(#52-5)


참고 자료


관련 글

  • C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인
... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3