C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]
이 글의 핵심
C++ Kafka 심화: librdkafka 스트림 처리 패턴, 트랜잭션 프로듀서, 정확히 한 번 전달, KSQL 대안. 실무 문제 시나리오, 완전한 예제, 자주 발생하는 에러, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: “스트림 처리·정확히 한 번이 막막해요”
실제 겪는 문제 시나리오
Kafka 기본(#52-5)에서 프로듀서·컨슈머·오프셋·리밸런싱을 다뤘다면, 이 글에서는 고급 기능을 다룹니다. 실무에서 자주 맞닥뜨리는 문제와 해결 방법을 제시합니다.
시나리오 1: 토픽 A → 처리 → 토픽 B 파이프라인
상황: 클릭 로그를 읽어 집계 후 요약 토픽에 저장하는 파이프라인
문제: C++에서 Kafka Streams/KSQL 없이 어떻게 구현하나?
결과: 컨슈머-프로듀서 패턴으로 read → transform → produce
시나리오 2: 주문 이벤트 중복 처리로 재고가 음수
상황: 컨슈머 재시작 시 같은 메시지를 다시 처리해 재고가 두 번 차감됨
문제: at-least-once만으로는 중복 제거 불가
결과: 멱등성 키 설계 또는 정확히 한 번(idempotence + 트랜잭션) 적용
시나리오 3: 여러 토픽에 원자적으로 쓰기
상황: 주문 생성 시 orders 토픽 + order-events 토픽에 동시에 저장해야 함
문제: 중간에 실패하면 일부만 반영되어 데이터 불일치
결과: 트랜잭션 프로듀서로 여러 토픽에 원자적 발행
시나리오 4: 실시간 윈도우 집계
상황: 1분 단위로 API 호출 수를 집계해 대시보드에 표시
문제: C++에서 Kafka Streams 없이 윈도우 집계를 어떻게?
결과: 시간 윈도우 버퍼 + 주기적 flush로 시뮬레이션
시나리오 5: 컨슈머 처리 중 오프셋 커밋 타이밍
상황: 배치 처리 후 커밋 vs 메시지별 커밋 시 리밸런싱 시 중복 유실
문제: 리밸런싱 시 처리 중인 메시지가 다른 컨슈머로 넘어감
결과: 처리 완료 직후 커밋, 리밸런싱 콜백에서 상태 정리
flowchart TB
subgraph 문제[실무 문제]
P1[스트림 파이프라인] --> S1[컨슈머-프로듀서]
P2[중복 처리] --> S2[멱등성·정확히 한 번]
P3[원자적 다중 토픽] --> S3[트랜잭션]
P4[윈도우 집계] --> S4[버퍼·주기 flush]
P5[리밸런싱] --> S5[상태·커밋 전략]
end
목표:
- 스트림 처리: read → transform → produce
- 트랜잭션: 여러 토픽 원자적 발행
- 정확히 한 번: idempotence·멱등성·트랜잭션
- 윈도우 집계: C++에서 시뮬레이션
- 프로덕션: 모니터링·백프레셔·재시도
요구 환경: C++17 이상, librdkafka 2.0+, Kafka 기본(#52-5) 선행
이 글을 읽으면:
- 스트림 처리 파이프라인을 C++로 구현할 수 있습니다.
- 트랜잭션·정확히 한 번 전달을 적용할 수 있습니다.
- 실무 에러·성능·프로덕션 패턴을 활용할 수 있습니다.
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
1. 스트림 처리 패턴
C++에서 스트림 처리란?
Kafka Streams나 KSQL은 JVM 기반입니다. C++에서는 컨슈머로 읽고 → 변환 → 프로듀서로 쓰는 패턴으로 스트림 처리를 구현합니다.
sequenceDiagram
participant C as 컨슈머
participant App as C++ 앱
participant P as 프로듀서
C->>App: consume(topic-a)
App->>App: transform/filter/aggregate
App->>P: produce(topic-b)
1.1 단순 파이프라인: topic-a → topic-b
// stream_pipeline.cpp
// 컴파일: g++ -std=c++17 -o stream_pipeline stream_pipeline.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sig_handler(int) { run = 0; }
int main() {
std::string errstr;
std::string brokers = "localhost:9092";
// 1. 컨슈머 설정
RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
c_conf->set("bootstrap.servers", brokers, errstr);
c_conf->set("group.id", "stream-pipeline-group", errstr);
c_conf->set("enable.auto.commit", "false", errstr);
RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
delete c_conf;
if (!consumer) {
std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
return 1;
}
// 2. 프로듀서 설정
RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
p_conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer* producer = RdKafka::Producer::create(p_conf, errstr);
delete p_conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
delete consumer;
return 1;
}
consumer->subscribe({"raw-logs"});
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
std::cout << "스트림 파이프라인 시작 (raw-logs → processed-logs)" << std::endl;
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string payload(static_cast<const char*>(msg->payload()),
msg->len());
// 변환: 대문자 변환 (예시)
std::string transformed;
for (char c : payload) {
transformed += (c >= 'a' && c <= 'z') ? (c - 32) : c;
}
// 출력 토픽에 발행
RdKafka::ErrorCode err = producer->produce(
"processed-logs",
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(transformed.data()),
transformed.size(),
msg->key() ? msg->key()->data() : nullptr,
msg->key() ? msg->key()->size() : 0,
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
} else {
consumer->commit(msg); // 처리 완료 후 커밋
}
break;
}
case RdKafka::ERR__TIMED_OUT:
case RdKafka::ERR__PARTITION_EOF:
break;
default:
std::cerr << "소비 에러: " << msg->errstr() << std::endl;
run = 0;
break;
}
delete msg;
producer->poll(0);
}
producer->flush(5000);
consumer->close();
delete producer;
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
raw-logs→ 읽기 → 변환 →processed-logs발행- 처리 완료 후
commit(): 발행 성공 후에만 커밋해 유실 방지 producer->poll(0): delivery report 처리
주의: 발행 실패 시 commit()하지 않으면 재시작 시 재처리됩니다. at-least-once 보장.
1.2 필터링: 조건에 맞는 메시지만 전달
// stream_filter.cpp - 에러 로그만 필터링
bool should_forward(const std::string& payload) {
return payload.find("ERROR") != std::string::npos ||
payload.find("FATAL") != std::string::npos;
}
// consume 루프 내부
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::string payload(static_cast<const char*>(msg->payload()), msg->len());
if (should_forward(payload)) {
producer->produce("error-logs", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()), payload.size(),
nullptr, 0, 0, nullptr);
}
consumer->commit(msg); // 필터 통과 여부와 관계없이 소비 완료
}
1.3 시간 윈도우 집계 (1분 단위)
// stream_window_aggregate.cpp - 1분 윈도우 카운트
#include <librdkafka/rdkafkacpp.h>
#include <chrono>
#include <map>
#include <string>
class WindowAggregator {
public:
using WindowKey = std::string;
using CountMap = std::map<WindowKey, int64_t>;
void add(const std::string& key, int64_t count = 1) {
auto window = current_window();
counts_[window][key] += count;
}
void flush_if_needed(RdKafka::Producer* producer) {
auto now = std::chrono::system_clock::now();
auto current = current_window();
for (auto it = counts_.begin(); it != counts_.end();) {
if (it->first < current) {
for (const auto& [k, v] : it->second) {
std::string msg = k + ":" + std::to_string(v);
producer->produce("aggregated", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg.data()), msg.size(),
nullptr, 0, 0, nullptr);
}
it = counts_.erase(it);
} else {
++it;
}
}
}
private:
std::string current_window() {
auto now = std::chrono::system_clock::now();
auto sec = std::chrono::duration_cast<std::chrono::seconds>(
now.time_since_epoch()).count();
return std::to_string((sec / 60) * 60); // 1분 단위
}
std::map<std::string, CountMap> counts_;
};
2. 정확히 한 번 전달
2.1 전달 시맨틱 비교
| 시맨틱 | 설명 | 중복 | 유실 |
|---|---|---|---|
| at-most-once | 커밋 후 처리 | 없음 | 가능 |
| at-least-once | 처리 후 커밋 | 가능 | 없음 |
| exactly-once | 멱등성·트랜잭션 | 없음 | 없음 |
2.2 멱등성 키로 중복 제거
비즈니스 로직에서 멱등성 키(idempotency key)를 사용해 중복 처리를 방지합니다.
// exactly_once_idempotent.cpp - 멱등성 키 기반 중복 제거
#include <unordered_set>
#include <string>
class IdempotencyCache {
public:
bool try_acquire(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if (seen_.count(key)) return false; // 이미 처리됨
seen_.insert(key);
if (seen_.size() > max_size_) {
// FIFO: 오래된 키 제거 (간단히 처음 것 제거)
seen_.erase(seen_.begin());
}
return true;
}
private:
std::unordered_set<std::string> seen_;
std::mutex mutex_;
size_t max_size_ = 100000;
};
// 메시지 처리 시
std::string idempotency_key = msg->key() ?
std::string(msg->key()->data(), msg->key()->size()) :
std::string(msg->topic_name()) + "-" + std::to_string(msg->partition()) +
"-" + std::to_string(msg->offset());
if (idempotency_cache.try_acquire(idempotency_key)) {
do_business_logic(msg); // 실제 처리
}
consumer->commit(msg); // 커밋은 항상 (재처리 시 멱등성으로 스킵)
주의: idempotency_key는 Redis·DB에 저장해 재시작 후에도 유지하는 것이 좋습니다. 메모리 캐시는 재시작 시 초기화됩니다.
2.3 프로듀서 멱등성 (enable.idempotence)
프로듀서가 메시지를 중복 발행하지 않도록 브로커 설정을 합니다.
// 프로듀서 설정
conf->set("enable.idempotence", "true", errstr);
// 이 설정 시 retries, acks, max.in.flight.requests.per.connection가 자동으로
// 안전한 값으로 설정됨
효과: 네트워크 오류로 재시도 시 브로커가 중복을 제거합니다. 단일 토픽 발행에만 적용됩니다.
3. 트랜잭션 프로듀서
3.1 트랜잭션이 필요한 경우
- 여러 토픽에 원자적으로 쓰기
- consume → transform → produce를 원자적으로 처리 (read-process-write)
sequenceDiagram
participant C as 컨슈머
participant App as 앱
participant P as 트랜잭션 프로듀서
P->>P: init_transactions()
P->>P: begin_transaction()
C->>App: consume
App->>P: produce (topic-a)
App->>P: produce (topic-b)
P->>P: commit_transaction()
App->>C: commit (offsets)
3.2 librdkafka 트랜잭션 API (C++)
librdkafka는 C API로 트랜잭션을 지원합니다. C++에서는 RdKafka::Producer의 init_transactions() 등이 있습니다.
// transactional_producer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
conf->set("transactional.id", "my-txn-id", errstr); // 트랜잭션 ID 필수
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
// 1. 트랜잭션 초기화
RdKafka::ErrorCode err = producer->init_transactions(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "init_transactions 실패: " << RdKafka::err2str(err) << std::endl;
delete producer;
return 1;
}
// 2. 트랜잭션 시작
err = producer->begin_transaction();
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "begin_transaction 실패: " << RdKafka::err2str(err) << std::endl;
delete producer;
return 1;
}
// 3. 여러 토픽에 발행
std::string msg1 = "order-123";
std::string msg2 = "event-order-123";
producer->produce("orders", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg1.data()), msg1.size(),
nullptr, 0, 0, nullptr);
producer->produce("order-events", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg2.data()), msg2.size(),
nullptr, 0, 0, nullptr);
// 4. 트랜잭션 커밋
err = producer->commit_transaction(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "commit_transaction 실패: " << RdKafka::err2str(err) << std::endl;
producer->abort_transaction(5000);
}
producer->flush(5000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
주의: transactional.id는 클러스터 내에서 유일해야 합니다. enable.idempotence는 트랜잭션 사용 시 자동으로 활성화됩니다.
3.3 컨슈머 read_committed
트랜잭션으로 커밋된 메시지만 읽으려면:
conf->set("isolation.level", "read_committed", errstr);
3.4 트랜잭션 미지원 브로커
Kafka 2.8+ 및 transaction.state.log.replication.factor가 1 이상이어야 합니다. 구버전이면 트랜잭션 API가 실패합니다. 이 경우 멱등성 키 + at-least-once로 대체합니다.
4. 완전한 Kafka 예제
예제 1: 로그 집계 파이프라인 (에러율 계산)
// log_aggregator.cpp - 에러 로그 비율 집계
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <chrono>
#include <atomic>
struct LogStats {
std::atomic<int64_t> total{0};
std::atomic<int64_t> errors{0};
};
void run_log_aggregator(const std::string& brokers) {
std::string errstr;
RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
c_conf->set("bootstrap.servers", brokers, errstr);
c_conf->set("group.id", "log-aggregator", errstr);
c_conf->set("enable.auto.commit", "false", errstr);
auto* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
delete c_conf;
RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
p_conf->set("bootstrap.servers", brokers, errstr);
auto* producer = RdKafka::Producer::create(p_conf, errstr);
delete p_conf;
consumer->subscribe({"app-logs"});
LogStats stats;
while (true) {
RdKafka::Message* msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::string payload(static_cast<const char*>(msg->payload()), msg->len());
stats.total++;
if (payload.find("ERROR") != std::string::npos) stats.errors++;
// 1분마다 요약 발행
auto now = std::chrono::system_clock::now();
static auto last_flush = now;
if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_flush).count() > 60000) {
double rate = (stats.total > 0) ? (100.0 * stats.errors / stats.total) : 0;
std::string summary = "{\"total\":" + std::to_string(stats.total) +
",\"errors\":" + std::to_string(stats.errors) +
",\"error_rate\":" + std::to_string(rate) + "}";
producer->produce("log-summary", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(summary.data()), summary.size(),
nullptr, 0, 0, nullptr);
last_flush = now;
}
consumer->commit(msg);
}
delete msg;
producer->poll(0);
}
delete producer;
delete consumer;
}
예제 2: 주문 이벤트 처리 (멱등성)
// order_processor.cpp - 멱등성 키로 중복 주문 방지
#include <librdkafka/rdkafkacpp.h>
#include <unordered_set>
#include <mutex>
#include <string>
class OrderProcessor {
public:
void process(RdKafka::Message* msg, RdKafka::KafkaConsumer* consumer) {
if (msg->err() != RdKafka::ERR_NO_ERROR) return;
std::string order_id = extract_order_id(msg);
if (order_id.empty()) {
consumer->commit(msg);
return;
}
{
std::lock_guard<std::mutex> lock(mutex_);
if (processed_.count(order_id)) {
consumer->commit(msg); // 이미 처리됨, 스킵
return;
}
processed_.insert(order_id);
if (processed_.size() > 100000) {
processed_.erase(processed_.begin());
}
}
// 실제 비즈니스 로직: 재고 차감, DB 저장 등
do_order_processing(order_id, msg);
consumer->commit(msg);
}
private:
std::string extract_order_id(RdKafka::Message* msg) {
if (msg->key()) {
return std::string(msg->key()->data(), msg->key()->size());
}
return "";
}
void do_order_processing(const std::string& order_id, RdKafka::Message* msg) {
(void)order_id;
(void)msg;
// 재고 차감, 주문 저장 등
}
std::unordered_set<std::string> processed_;
std::mutex mutex_;
};
예제 3: Graceful Shutdown + Flush
// graceful_shutdown.cpp
static std::atomic<bool> running{true};
void shutdown_handler(int sig) {
(void)sig;
running = false;
}
int main() {
signal(SIGINT, shutdown_handler);
signal(SIGTERM, shutdown_handler);
// ... 프로듀서·컨슈머 생성
while (running) {
RdKafka::Message* msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
process(msg);
consumer->commit(msg);
}
delete msg;
}
// 1. 프로듀서: 대기 중인 메시지 전송 완료
producer->flush(10000);
delete producer;
// 2. 컨슈머: 정리
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
5. 자주 발생하는 에러와 해결법
에러 1: “Transaction coordinator not found”
증상: init_transactions() 또는 begin_transaction() 실패.
원인: 브로커가 트랜잭션을 지원하지 않거나, transaction.state.log.replication.factor가 0.
해결법:
# 브로커 설정 확인
# broker 설정에서 transaction.state.log.replication.factor >= 1
// ✅ 트랜잭션 미지원 시 fallback: 멱등성 키 + at-least-once
err = producer->init_transactions(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "트랜잭션 미지원, 멱등성 모드로 전환" << std::endl;
use_transaction_ = false;
}
에러 2: “Fatal error: Local: Invalid argument (transactional.id)”
증상: transactional.id 설정 후 오류.
원인: transactional.id가 비어 있거나, 이미 사용 중인 ID.
해결법:
// ✅ 유일한 ID 사용 (인스턴스별)
conf->set("transactional.id", "my-app-" + std::to_string(getpid()), errstr);
에러 3: “Commit failed: Local: No offset stored”
증상: 리밸런싱 직후 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS.
원인: 컨슈머 그룹 리밸런싱 중에는 커밋이 거부됨.
해결법:
// ✅ 리밸런싱 콜백에서 assign 후에만 commit
if (msg->err() == RdKafka::ERR_NO_ERROR) {
consumer->commit(msg);
} else if (msg->err() == RdKafka::ERR__REBALANCE_IN_PROGRESS) {
// rebalance_cb에서 처리됨, 여기서는 consume 계속
}
에러 4: “Producer: Queue full”
증상: produce() 호출 시 ERR__QUEUE_FULL.
원인: 발행 속도 > 브로커 수신 속도.
해결법:
// ✅ 백프레셔: 큐가 비워질 때까지 대기
while (producer->outq_len() > 10000) {
producer->poll(100);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
producer->produce(...);
에러 5: 스트림 파이프라인에서 메시지 유실
증상: 컨슈머가 commit() 후 프로듀서 발행 전에 크래시.
원인: 처리 후 커밋 → 발행 순서가 잘못되면, 커밋은 됐는데 발행이 안 된 상태로 재시작됨.
해결법:
// ✅ 발행 성공 후 커밋 (at-least-once)
RdKafka::ErrorCode err = producer->produce(...);
if (err == RdKafka::ERR_NO_ERROR) {
producer->poll(0); // dr_cb 확인
consumer->commit(msg);
}
// ✅ 정확히 한 번이 필요하면 트랜잭션 + send_offsets_to_transaction
에러 6: “Broker: Not leader for partition” 반복
증상: 리더가 자주 바뀌는 파티션에서 지속적 실패.
원인: 브로커 불안정, 네트워크 지연.
해결법:
// ✅ 재시도·백오프
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "500", errstr);
conf->set("message.send.max.retries", "5", errstr);
에러 7: “Consumer group is rebalancing” 후 메시지 중복
증상: 리밸런싱 시 처리 중이던 메시지가 다른 컨슈머로 재할당되어 중복 처리.
원인: rebalance_cb에서 unassign 시 처리 중인 메시지 정리 전에 새 컨슈머가 할당받음.
해결법:
// ✅ rebalance_cb에서 파티션 해제 시 처리 중인 작업 완료 대기
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) override {
if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
// 처리 중인 메시지 완료 대기
wait_for_pending_processing();
consumer->unassign();
} else if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
}
}
에러 8: “Message delivery failed: Broker: Message size too large”
증상: 큰 메시지 발행 시 실패.
해결법:
// 프로듀서
conf->set("message.max.bytes", "10485760", errstr); // 10MB
// 브로커: message.max.bytes, replica.fetch.max.bytes 동일하게
6. 성능 최적화 팁
6.1 스트림 파이프라인 배치 처리
메시지를 모아서 한 번에 처리하면 오버헤드가 줄어듭니다.
// 배치 크기만큼 모아서 처리
const size_t BATCH_SIZE = 100;
std::vector<RdKafka::Message*> batch;
while (batch.size() < BATCH_SIZE) {
RdKafka::Message* msg = consumer->consume(100);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
batch.push_back(msg);
} else {
bool timed_out = (msg->err() == RdKafka::ERR__TIMED_OUT);
delete msg;
if (timed_out && !batch.empty()) break;
}
}
for (auto* m : batch) {
process_and_produce(m);
consumer->commit(m);
delete m;
}
6.2 프로듀서 병렬화
// ✅ 여러 스레드에서 동일 프로듀서 사용 (librdkafka는 스레드 안전)
std::thread t1([&]() { producer->produce(...); producer->poll(100); });
std::thread t2([&]() { producer->produce(...); producer->poll(100); });
6.3 압축
conf->set("compression.type", "lz4", errstr); // 속도·압축률 균형
6.4 linger.ms vs 처리량
| linger.ms | 처리량 | 지연 |
|---|---|---|
| 0 | 낮음 | 1ms |
| 5 | 중간 | 5ms |
| 50 | 높음 | 50ms |
conf->set("linger.ms", "5", errstr);
conf->set("batch.size", "32768", errstr); // 32KB
6.5 파티션 수와 컨슈머 수
- 파티션 수 ≥ 컨슈머 수: 모든 컨슈머가 활용됨
- 파티션 수 < 컨슈머 수: 일부 컨슈머는 idle
# 토픽 생성 시 파티션 수 결정
kafka-topics --create --topic events --partitions 12 --replication-factor 2 \
--bootstrap-server localhost:9092
7. 프로덕션 패턴
7.1 헬스 체크
bool health_check(RdKafka::Producer* producer) {
RdKafka::Metadata* meta = nullptr;
RdKafka::ErrorCode err = producer->metadata(true, nullptr, &meta, 5000);
bool ok = (err == RdKafka::ERR_NO_ERROR && meta);
if (meta) delete meta;
return ok;
}
bool health_check(RdKafka::KafkaConsumer* consumer) {
std::vector<RdKafka::TopicPartition*> partitions;
RdKafka::ErrorCode err = consumer->assignment(partitions);
bool ok = (err == RdKafka::ERR_NO_ERROR);
for (auto* p : partitions) delete p;
return ok;
}
7.2 메트릭 수집
// librdkafka 통계 (JSON)
conf->set("statistics.interval.ms", "10000", errstr);
class StatsCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) override {
if (event.type() == RdKafka::Event::EVENT_STATS) {
// event.str()에 JSON 통계
std::cout << event.str() << std::endl;
}
}
};
7.3 환경 변수 기반 설정
struct KafkaConfig {
std::string brokers = "localhost:9092";
std::string group_id = "cpp-consumer";
std::string transactional_id;
int linger_ms = 5;
};
KafkaConfig load_config() {
KafkaConfig c;
if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
if (const char* t = std::getenv("KAFKA_TRANSACTIONAL_ID")) c.transactional_id = t;
return c;
}
7.4 SSL/TLS + SASL
conf->set("security.protocol", "sasl_ssl", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", std::getenv("KAFKA_USER"), errstr);
conf->set("sasl.password", std::getenv("KAFKA_PASSWORD"), errstr);
conf->set("ssl.ca.location", "/etc/ssl/certs/ca-certificates.crt", errstr);
7.5 멀티 브로커
conf->set("bootstrap.servers",
"broker1:9092,broker2:9092,broker3:9092", errstr);
7.6 디버그 로깅
conf->set("debug", "broker,topic,msg", errstr);
// 프로덕션에서는 제거
8. 구현 체크리스트
스트림 처리
- consume → transform → produce 순서 준수
- 발행 성공 후 commit (유실 방지)
- producer->poll() 주기적 호출
정확히 한 번
- 멱등성 키 설계 (키 또는 topic-partition-offset)
- 멱등성 캐시 저장소 (Redis/DB 권장)
- enable.idempotence (프로듀서)
트랜잭션
- transactional.id 설정 (유일값)
- init_transactions → begin → produce → commit
- 실패 시 abort_transaction
- isolation.level=read_committed (컨슈머)
에러 처리
- produce() 반환값 검사
- msg->err() 검사
- ERR__QUEUE_FULL 시 백프레셔
- rebalance_cb 구현
프로덕션
- Graceful Shutdown (flush, close)
- SSL/TLS, SASL (필요 시)
- 환경 변수로 설정 외부화
- 헬스 체크·메트릭
9. 정리
| 항목 | 요약 |
|---|---|
| 스트림 처리 | consume → transform → produce, 발행 후 커밋 |
| 정확히 한 번 | 멱등성 키 + enable.idempotence + 트랜잭션 |
| 트랜잭션 | transactional.id, init_transactions, begin/commit/abort |
| 에러 | Transaction coordinator, Queue full, Rebalance |
| 성능 | 배치 처리, linger.ms, 압축, 파티션 수 |
| 프로덕션 | Graceful Shutdown, SSL/SASL, 헬스 체크 |
핵심 원칙:
- 발행 후 커밋: 처리 완료 후 프로듀서 발행 성공 확인 후 commit
- 멱등성: 비즈니스 키로 중복 처리 방지
- 트랜잭션: 다중 토픽 원자적 쓰기
- 리밸런싱: rebalance_cb에서 상태 정리·재할당 처리
자주 묻는 질문 (FAQ)
Q. 이 내용을 실무에서 언제 쓰나요?
A. 이벤트 스트리밍, 로그 수집, 실시간 데이터 파이프라인, CDC, 금융·주문 시스템의 정확히 한 번 전달 등에 활용합니다. Kafka 기본(#52-5)을 선행으로 읽으세요.
Q. C++에서 Kafka Streams 대신 뭘 쓰나요?
A. Kafka Streams는 JVM 전용입니다. C++에서는 컨슈머-프로듀서 패턴으로 read → transform → produce를 구현합니다. 복잡한 윈도우·조인은 별도 상태 저장소(Redis, DB)와 함께 구현합니다.
Q. 정확히 한 번 전달이 꼭 필요한가요?
A. 금융·주문 시스템에서는 필수입니다. 로그·메트릭은 at-least-once로도 충분할 수 있습니다. 멱등성 키 설계가 가능하면 정확히 한 번 전달을 시뮬레이션할 수 있습니다.
Q. 트랜잭션 프로듀서가 실패하면?
A. abort_transaction()을 호출해 롤백합니다. 재시도 시 begin_transaction()부터 다시 시작합니다.
한 줄 요약: 스트림 처리·트랜잭션·정확히 한 번 전달을 C++로 구현해 실무에 적용할 수 있습니다.
다음 글: C++ 시리즈 목차
이전 글: Kafka 완벽 가이드(#52-5)
참고 자료
- librdkafka 공식 문서
- Apache Kafka 트랜잭션
- Kafka 완벽 가이드(#52-5) — 프로듀서·컨슈머 기초
관련 글
- C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인