C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인
이 글의 핵심
C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머·DeliveryReport·리밸런싱 콜백·트랜잭션·스트리밍 파이프라인 구현. | 시나리오 | 상황 | 문제 | 결과 | |----------|------|------|------| | 로그 수집 | 여러 서비스에서 DB에 직접 INSERT | DB 연결 폭증,.
들어가며: “C++에서 Kafka 연동이 막막해요”
실제 겪는 문제 시나리오
| 시나리오 | 상황 | 문제 | 결과 |
|---|---|---|---|
| 로그 수집 | 여러 서비스에서 DB에 직접 INSERT | DB 연결 폭증, 응답 저하 | Kafka 비동기 발행 → 부하 분산 |
| 주문 이벤트 | 재고·포인트·알림 등 여러 서비스 전달 | HTTP 순차 호출 시 지연·장애 전파 | Kafka 토픽 발행 → 독립 구독 |
| Connection refused | bootstrap.servers 설정 후 연결 실패 | localhost/Docker/방화벽 혼동 | 브로커·포트 점검, EventCb 모니터링 |
| 메시지 중복 | 처리 중 크래시 후 재시작 | at-least-once만으로 중복 제거 불가 | 멱등성 키 또는 트랜잭션 |
| dr_cb 미호출 | dr_cb 등록했는데 실행 안 됨 | poll() 호출 누락 | 주기적 poll(100) 필수 |
| 리밸런싱 | 컨슈머 그룹 인스턴스 추가 | 파티션 재할당 시 순서 꼬임·유실 | rebalance_cb에서 assign/unassign |
flowchart TB
subgraph 문제[실무 문제]
P1[DB 부하] --> S1[Kafka 비동기]
P2[서비스 결합] --> S2[이벤트 디커플링]
P3[Connection refused] --> S3[설정·EventCb]
P4[메시지 중복] --> S4[멱등성·트랜잭션]
P5[dr_cb 미호출] --> S5[poll 필수]
P6[리밸런싱] --> S6[rebalance_cb]
end
이 글에서 다루는 것:
- librdkafka 설치 및 CMake 연동
- 완전한 프로듀서·컨슈머 예제 (콜백 포함)
- 트랜잭션 프로듀서·정확히 한 번 전달
- 스트리밍 파이프라인 (read → transform → produce)
- 자주 발생하는 에러와 해결법
- 베스트 프랙티스·프로덕션 패턴
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
- 환경 설정
- 완전한 프로듀서 예제
- 완전한 컨슈머 예제
- 콜백 (DeliveryReport·리밸런싱·Event)
- 트랜잭션·정확히 한 번 전달
- 스트리밍 파이프라인
- 자주 발생하는 에러와 해결법
- 베스트 프랙티스
- 프로덕션 패턴
- 구현 체크리스트
- 정리
1. 환경 설정
필수 의존성
| 항목 | 버전 | 비고 |
|---|---|---|
| C++ | C++14 이상 | C++17 권장 |
| librdkafka | 2.0+ | vcpkg, Homebrew, 또는 소스 빌드 |
| Apache Kafka | 2.8+ | 브로커 (Docker 권장) |
| CMake | 3.16+ | FindPackage 지원 |
Kafka 브로커 실행 (Docker)
# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
apache/kafka:3.6
# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic processed-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
librdkafka 설치
# vcpkg (권장)
vcpkg install rdkafka
# macOS (Homebrew)
brew install librdkafka
# Ubuntu/Debian
sudo apt-get install librdkafka-dev
CMakeLists.txt 기본 설정
cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(RdKafka CONFIG REQUIRED)
add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)
add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)
주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.
2. 완전한 프로듀서 예제
2.1 최소 동작 프로듀서
// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
int main(int argc, char** argv) {
std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
std::string topic = argc > 2 ? argv[2] : "app-logs";
std::string errstr;
// 1. 전역 설정
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
// 2. 프로듀서 생성
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
// 3. 메시지 발행
std::string payload = "Hello, Kafka from C++!";
RdKafka::ErrorCode err = producer->produce(
topic,
RdKafka::Topic::PARTITION_UA, // 파티션 자동 할당
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()),
payload.size(),
"key1", 5, // 키 (선택)
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
} else {
std::cout << "메시지 발행 완료" << std::endl;
}
// 4. 대기 중인 delivery report 처리 (필수!)
while (producer->outq_len() > 0) {
producer->poll(100);
}
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
PARTITION_UA: 파티션 자동 선택 (키 해시 또는 라운드로빈)RK_MSG_COPY: 페이로드 복사 저장 (호출 후 버퍼 수정 가능)outq_len(): 전송 대기 메시지 수. 0이 될 때까지poll()호출 필수
2.2 Delivery Report 콜백 (실전용)
발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다.
// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) {
std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
} else {
std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
<< msg.partition() << "] @ " << msg.offset() << std::endl;
}
}
};
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
DeliveryReportCb dr_cb;
conf->set("dr_cb", &dr_cb, errstr);
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
std::string payload = "Test message with delivery report";
producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()), payload.size(),
nullptr, 0, 0, nullptr);
// poll을 호출해야 dr_cb가 실행됨
for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
producer->poll(100);
}
producer->flush(5000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다.
2.3 RAII 래퍼 클래스 (재사용 가능)
// kafka_producer.hpp
class KafkaProducer {
public:
KafkaProducer(const std::string& brokers, const std::string& client_id = "cpp-producer");
void produce(const std::string& topic, const std::string& key, const std::string& value);
void flush(int timeout_ms = 10000);
void poll(int timeout_ms = 0);
private:
struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
}
} dr_cb_;
std::unique_ptr<RdKafka::Conf> conf_;
std::unique_ptr<RdKafka::Producer> producer_;
};
// produce() 후 poll(0) 호출 필수
3. 완전한 컨슈머 예제
3.1 최소 동작 컨슈머
// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sigterm_handler(int) { run = 0; }
int main(int argc, char** argv) {
std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
std::string topic = argc > 3 ? argv[3] : "app-logs";
std::string errstr;
signal(SIGINT, sigterm_handler);
signal(SIGTERM, sigterm_handler);
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
conf->set("enable.auto.commit", "false", errstr); // 수동 커밋
RdKafka::KafkaConsumer* consumer =
RdKafka::KafkaConsumer::create(conf, errstr);
delete conf;
if (!consumer) {
std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
return 1;
}
std::vector<std::string> topics = {topic};
RdKafka::ErrorCode err = consumer->subscribe(topics);
if (err) {
std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
delete consumer;
return 1;
}
std::cout << "메시지 수신 대기 중... (Ctrl+C로 종료)" << std::endl;
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
std::cout << "[" << msg->topic_name() << ":" << msg->partition()
<< "@" << msg->offset() << "] "
<< std::string(static_cast<const char*>(msg->payload()),
msg->len())
<< std::endl;
consumer->commit(msg); // 오프셋 커밋
break;
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR__PARTITION_EOF:
break;
default:
std::cerr << "소비 에러: " << msg->errstr() << std::endl;
run = 0;
break;
}
delete msg;
}
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
enable.auto.commit=false: 처리 완료 후 수동commit(). 크래시 시 재처리 가능consume(1000): 1초 타임아웃. 메시지 없으면ERR__TIMED_OUTcommit(msg): 해당 메시지 오프셋까지 커밋
3.2 consume 루프 시퀀스
sequenceDiagram
participant C as 컨슈머
participant B as 브로커
C->>B: subscribe(topics)
B->>C: 파티션 할당
loop consume
C->>B: consume(1000)
alt 메시지 있음
B->>C: Message
C->>C: 비즈니스 로직
C->>B: commit(msg)
else 타임아웃
B->>C: ERR__TIMED_OUT
end
end
4. 콜백 (DeliveryReport·리밸런싱·Event)
4.1 DeliveryReportCb (프로듀서)
발행 결과(성공/실패, 파티션, 오프셋)를 비동기로 수신합니다.
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) {
std::cerr << "[DR] 실패: " << msg.errstr()
<< " topic=" << msg.topic_name()
<< " partition=" << msg.partition() << std::endl;
} else {
std::cout << "[DR] 성공: " << msg.topic_name()
<< "[" << msg.partition() << "] @ " << msg.offset() << std::endl;
}
}
};
// 설정
DeliveryReportCb dr_cb;
conf->set("dr_cb", &dr_cb, errstr);
4.2 RebalanceCb (컨슈머 그룹)
파티션 재할당 시 호출됩니다. ERR__ASSIGN_PARTITIONS면 assign, ERR__REVOKE_PARTITIONS면 unassign 처리.
class RebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) override {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
std::cout << "[리밸런싱] 파티션 할당: ";
for (auto* p : partitions)
std::cout << p->topic() << "[" << p->partition() << "] ";
std::cout << std::endl;
} else {
consumer->unassign();
std::cout << "[리밸런싱] 파티션 해제" << std::endl;
}
}
};
// 설정
RebalanceCb rebalance_cb;
conf->set("rebalance_cb", &rebalance_cb, errstr);
4.3 EventCb (에러·통계)
연결 에러, throttle, 통계 이벤트를 수신합니다.
class EventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) override {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
if (event.fatal()) {
std::cerr << "치명적 에러: " << event.str() << std::endl;
} else {
std::cerr << "에러: " << event.str() << std::endl;
}
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "Throttled: " << event.throttle_time() << "ms by "
<< event.broker_name() << std::endl;
break;
case RdKafka::Event::EVENT_STATS:
// event.str()에 JSON 통계
break;
default:
break;
}
}
};
// 설정
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);
콜백 등록: conf->set("rebalance_cb", &rebalance_cb, errstr), conf->set("event_cb", &event_cb, errstr)로 등록. 콜백 객체는 컨슈머 수명보다 길어야 합니다.
5. 트랜잭션·정확히 한 번 전달
5.1 전달 시맨틱 비교
| 시맨틱 | 설명 | 중복 | 유실 |
|---|---|---|---|
| at-most-once | 커밋 후 처리 | 없음 | 가능 |
| at-least-once | 처리 후 커밋 | 가능 | 없음 |
| exactly-once | 멱등성·트랜잭션 | 없음 | 없음 |
5.2 프로듀서 멱등성 (enable.idempotence)
conf->set("enable.idempotence", "true", errstr);
// retries, acks, max.in.flight.requests.per.connection가 자동으로 안전한 값으로 설정됨
5.3 트랜잭션 프로듀서
여러 토픽에 원자적으로 쓰기, consume-transform-produce를 원자적으로 처리할 때 사용합니다.
// transactional_producer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
conf->set("transactional.id", "my-txn-id", errstr); // 유일값 필수
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
// 1. 트랜잭션 초기화
RdKafka::ErrorCode err = producer->init_transactions(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "init_transactions 실패: " << RdKafka::err2str(err) << std::endl;
delete producer;
return 1;
}
// 2. 트랜잭션 시작
err = producer->begin_transaction();
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "begin_transaction 실패: " << RdKafka::err2str(err) << std::endl;
delete producer;
return 1;
}
// 3. 여러 토픽에 발행
std::string msg1 = "order-123";
std::string msg2 = "event-order-123";
producer->produce("orders", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg1.data()), msg1.size(),
nullptr, 0, 0, nullptr);
producer->produce("order-events", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg2.data()), msg2.size(),
nullptr, 0, 0, nullptr);
// 4. 트랜잭션 커밋
err = producer->commit_transaction(5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "commit_transaction 실패: " << RdKafka::err2str(err) << std::endl;
producer->abort_transaction(5000);
}
producer->flush(5000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
주의: transactional.id는 클러스터 내에서 유일해야 합니다.
5.4 컨슈머 read_committed
트랜잭션으로 커밋된 메시지만 읽으려면:
conf->set("isolation.level", "read_committed", errstr);
5.5 멱등성 키로 중복 제거
// 멱등성 캐시 (Redis/DB 권장)
class IdempotencyCache {
public:
bool try_acquire(const std::string& key) {
std::lock_guard<std::mutex> lock(mutex_);
if (seen_.count(key)) return false;
seen_.insert(key);
if (seen_.size() > 100000) seen_.erase(seen_.begin());
return true;
}
private:
std::unordered_set<std::string> seen_;
std::mutex mutex_;
};
// 메시지 처리 시
std::string idempotency_key = msg->key() ?
std::string(msg->key()->data(), msg->key()->size()) :
std::string(msg->topic_name()) + "-" + std::to_string(msg->partition()) +
"-" + std::to_string(msg->offset());
if (idempotency_cache.try_acquire(idempotency_key)) {
do_business_logic(msg);
}
consumer->commit(msg);
6. 스트리밍 파이프라인
6.1 topic-a → transform → topic-b
C++에서는 컨슈머로 읽고 → 변환 → 프로듀서로 쓰는 패턴으로 스트림 처리를 구현합니다.
sequenceDiagram
participant C as 컨슈머
participant App as C++ 앱
participant P as 프로듀서
C->>App: consume(topic-a)
App->>App: transform/filter/aggregate
App->>P: produce(topic-b)
App->>C: commit (발행 성공 후)
6.2 완전한 스트리밍 파이프라인 예제
// stream_pipeline.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sig_handler(int) { run = 0; }
int main() {
std::string errstr;
std::string brokers = "localhost:9092";
RdKafka::Conf* c_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
c_conf->set("bootstrap.servers", brokers, errstr);
c_conf->set("group.id", "stream-pipeline-group", errstr);
c_conf->set("enable.auto.commit", "false", errstr);
RdKafka::KafkaConsumer* consumer = RdKafka::KafkaConsumer::create(c_conf, errstr);
delete c_conf;
RdKafka::Conf* p_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
p_conf->set("bootstrap.servers", brokers, errstr);
RdKafka::Producer* producer = RdKafka::Producer::create(p_conf, errstr);
delete p_conf;
consumer->subscribe({"raw-logs"});
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string payload(static_cast<const char*>(msg->payload()),
msg->len());
std::string transformed;
for (char c : payload) {
transformed += (c >= 'a' && c <= 'z') ? (c - 32) : c;
}
RdKafka::ErrorCode err = producer->produce(
"processed-logs",
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(transformed.data()),
transformed.size(),
msg->key() ? msg->key()->data() : nullptr,
msg->key() ? msg->key()->size() : 0,
0, nullptr);
if (err == RdKafka::ERR_NO_ERROR) {
consumer->commit(msg);
}
break;
}
case RdKafka::ERR__TIMED_OUT:
case RdKafka::ERR__PARTITION_EOF:
break;
default:
run = 0;
break;
}
delete msg;
producer->poll(0);
}
producer->flush(5000);
consumer->close();
delete producer;
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
핵심: 발행 성공 후에만 commit(). at-least-once 보장.
6.3 필터링 파이프라인
// 에러 로그만 필터링
bool should_forward(const std::string& payload) {
return payload.find("ERROR") != std::string::npos ||
payload.find("FATAL") != std::string::npos;
}
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::string payload(static_cast<const char*>(msg->payload()), msg->len());
if (should_forward(payload)) {
producer->produce("error-logs", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()), payload.size(),
nullptr, 0, 0, nullptr);
}
consumer->commit(msg);
}
7. 자주 발생하는 에러와 해결법
에러 1: “Connection refused”
원인: 브로커 미실행, 주소/포트 오류, Docker/방화벽.
해결:
docker ps | grep kafka # 브로커 실행 확인
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);
에러 2: “Topic ‘xxx’ not present in metadata”
해결: kafka-topics --create로 토픽 사전 생성, metadata.max.age.ms=5000.
에러 3: “Message size too large”
해결: conf->set("message.max.bytes", "10485760", errstr); (10MB).
에러 4: “Commit failed: Local: No offset stored”
원인: enable.auto.commit=true + 수동 commit, 또는 리밸런싱 중. 해결: enable.auto.commit=false, rebalance_cb 구현.
에러 5: 메시지 중복 처리
해결:
// ✅ 처리 완료 후 커밋
doBusinessLogic(msg);
consumer->commit(msg);
// ❌ 나쁜 예: 처리 전 커밋 → 크래시 시 재처리됨
consumer->commit(msg);
doBusinessLogic(msg);
에러 6: “Queue full” (ERR__QUEUE_FULL)
해결: queue.buffering.max.messages, queue.buffering.max.kbytes 증가. 백프레셔: while (producer->outq_len() > 10000) producer->poll(100);
에러 7: DeliveryReportCb가 호출되지 않음
원인: poll() 미호출. 해결: while (running) { producer->produce(...); producer->poll(100); } — poll 필수.
에러 8: “Transaction coordinator not found”
원인: 브로커가 트랜잭션 미지원. 해결: init_transactions() 실패 시 멱등성 모드로 fallback.
에러 9: “Consumer group is rebalancing”
해결: rebalance_cb에서 ERR__REVOKE_PARTITIONS 시 wait_for_pending_processing() 후 unassign, ERR__ASSIGN_PARTITIONS 시 assign.
8. 베스트 프랙티스
| 항목 | 권장 |
|---|---|
| poll() | 프로듀서: produce() 후 poll(100) 필수. 컨슈머: consume() 내부적으로 poll |
| 커밋 | 처리 완료 후 consumer->commit(msg) |
| 프로듀서 | 매 요청마다 생성 금지 — 풀/싱글톤 재사용 |
| 메시지 키 | 같은 키 → 같은 파티션 (순서 보장): key = "user:" + std::to_string(user_id) |
| 에러 처리 | produce() 반환값 검사, ERR__QUEUE_FULL 시 백프레셔 |
9. 프로덕션 패턴
9.1 Graceful Shutdown
static std::atomic<bool> running{true};
void sig_handler(int) { running = false; }
int main() {
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
while (running) {
producer->produce(...);
producer->poll(100);
}
producer->flush(10000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
9.2 헬스 체크·환경 변수·SSL
- 헬스 체크:
producer->metadata(true, nullptr, &meta, 5000)로 브로커 연결 확인 - 환경 변수:
std::getenv("KAFKA_BROKERS"),KAFKA_GROUP_ID로 설정 외부화 - SSL/TLS + SASL:
security.protocol=sasl_ssl,sasl.mechanisms=PLAIN,ssl.ca.location - 성능:
linger.ms=5,batch.size=16384,compression.type=lz4
10. 구현 체크리스트
환경 설정
- Kafka 브로커 실행 확인
- 토픽 사전 생성
- librdkafka 설치 및 CMake 연동
프로듀서
-
bootstrap.servers설정 -
dr_cb등록 및poll()주기적 호출 -
flush()또는outq_len()==0대기 후 종료 - 배치·압축 설정 (성능 요구 시)
컨슈머
-
group.id설정 -
enable.auto.commit=false시 수동commit() -
rebalance_cb등록 (컨슈머 그룹 사용 시) -
event_cb등록 (에러 모니터링)
에러 처리
-
produce()반환값 검사 -
msg->err()검사 -
ERR__QUEUE_FULL시 백프레셔
프로덕션
- Graceful Shutdown (flush, wait_destroyed)
- SSL/TLS, SASL (필요 시)
- 환경 변수로 설정 외부화
11. 정리
| 항목 | 요약 |
|---|---|
| librdkafka | C/C++ Kafka 클라이언트의 사실상 표준 |
| 프로듀서 | Conf → Producer::create → produce → poll → flush |
| 컨슈머 | Conf → KafkaConsumer::create → subscribe → consume → commit |
| 콜백 | dr_cb, rebalance_cb, event_cb |
| 트랜잭션 | transactional.id, init_transactions, begin/commit/abort |
| 스트리밍 | consume → transform → produce, 발행 후 커밋 |
| 에러 | Connection refused, Topic not found, Queue full, Rebalance |
| 프로덕션 | Graceful Shutdown, SSL/TLS, SASL, 환경 변수 |
핵심 원칙:
- poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
- 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
- 리밸런싱: rebalance_cb에서 assign/unassign 처리
- 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지
자주 묻는 질문 (FAQ)
Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?
A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.
Q. 정확히 한 번 전달은 어떻게 하나요?
A. at-least-once: 처리 후 커밋. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. 멱등성 키로 비즈니스 레벨 중복 제거도 가능합니다.
Q. 메시지 순서가 보장되나요?
A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.
Q. librdkafka는 스레드 안전한가요?
A. 네. librdkafka API는 스레드 안전합니다. 콜백 객체는 프로듀서/컨슈머 수명보다 길어야 합니다.
한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인을 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다.
다음 글: C++ 시리즈 목차
이전 글: Kafka 완벽 가이드(#52-5)
참고 자료
- librdkafka 공식 문서
- Apache Kafka 문서
- 메시지 큐 개요(#50-7) — Kafka vs RabbitMQ
관련 글
- C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달
- C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]
- C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c