C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·오프셋·정확히 한 번 전달
이 글의 핵심
C++에서 Apache Kafka 연동: librdkafka로 프로듀서·컨슈머 그룹·오프셋 관리 구현. Connection timeout·메시지 유실·리밸런싱 등 흔한 에러 해결, 배치·압축 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: C++에서 Kafka를 왜 쓰나요?
실제 겪는 문제 시나리오
시나리오 1: 로그 수집 시 DB 부하 폭증
여러 마이크로서비스에서 로그를 중앙 DB에 직접 INSERT합니다. 트래픽이 늘면 DB 연결 수가 폭증하고, 로그 쓰기 지연으로 애플리케이션 응답이 느려집니다. “로그 때문에 메인 서비스가 죽어요.”
시나리오 2: 주문 이벤트를 여러 서비스에 전달
주문 완료 시 재고 차감, 포인트 적립, 알림 발송 등 여러 서비스에 이벤트를 보내야 합니다. HTTP로 순차 호출하면 지연이 누적되고, 한 서비스 장애 시 전체가 멈춥니다. “어떤 서비스가 느리면 전체 주문 처리가 막혀요.”
시나리오 3: 실시간 분석 파이프라인
클릭 스트림, 센서 데이터를 실시간으로 수집해 분석 엔진에 전달해야 합니다. 폴링 방식은 지연이 크고, 직접 DB 조회는 부하가 큽니다. “1초 단위로 집계해야 하는데 10초씩 밀려요.”
시나리오 4: librdkafka 도입 후 “Connection refused” 에러
Kafka 브로커 주소를 설정했는데 연결이 안 됩니다. localhost:9092 vs 127.0.0.1:9092, 방화벽, Docker 네트워크 등 확인할 것이 많아 막막합니다.
시나리오 5: 컨슈머 재시작 시 메시지 중복 처리
컨슈머가 메시지 처리 중 크래시했습니다. 재시작 후 같은 메시지를 다시 읽어 중복 처리됩니다. “주문이 두 번 차감돼요.”
시나리오 6: 파티션 리밸런싱 시 처리 중단
컨슈머 그룹에 새 인스턴스를 추가했더니 기존 컨슈머가 담당하던 파티션이 재할당됩니다. 리밸런싱 중 메시지 처리 순서가 꼬이거나 유실될 수 있습니다.
Kafka로 해결:
- 이벤트 스트리밍: 프로듀서가 메시지를 토픽에 발행, 컨슈머가 구독. DB에 직접 쓰지 않아 메인 서비스 부하 감소
- 비동기·디커플링: 프로듀서는 발행만 하고, 여러 컨슈머가 각자 처리. 한 서비스 장애가 다른 서비스에 영향 없음
- 오프셋 관리: 컨슈머 그룹이 읽은 위치를 커밋해, 재시작 시 이어서 처리
- 파티션·스케일아웃: 파티션 수만큼 컨슈머를 늘려 처리량 확장
flowchart LR
subgraph Producer["프로듀서 (C++)"]
P1[앱 로그]
P2[주문 이벤트]
P3[센서 데이터]
end
subgraph Kafka[Apache Kafka]
T1[logs 토픽]
T2[orders 토픽]
T3[events 토픽]
end
subgraph Consumer["컨슈머 (C++)"]
C1[로그 저장]
C2[재고 차감]
C3[실시간 분석]
end
P1 --> T1
P2 --> T2
P3 --> T3
T1 --> C1
T2 --> C2
T3 --> C3
Kafka 프로듀서-컨슈머 흐름
sequenceDiagram
participant P as 프로듀서
participant B as 브로커
participant C as 컨슈머
P->>B: produce(topic, key, value)
B->>B: 파티션에 저장
B->>P: dr_cb (delivery report)
C->>B: subscribe(topic)
B->>C: 파티션 할당
loop consume
C->>B: poll() → 메시지 수신
C->>C: 비즈니스 로직 처리
C->>B: commit() 오프셋
end
RabbitMQ vs Kafka 비교
| 항목 | RabbitMQ | Kafka |
|---|---|---|
| 모델 | 큐, Exchange | 토픽, 파티션 |
| 메시지 보존 | 소비 후 삭제 (기본) | 보존 기간 동안 유지 |
| 재처리 | 별도 구현 | 오프셋 이동으로 가능 |
| 처리량 | 수만 msg/s | 수백만 msg/s |
| C++ 클라이언트 | rabbitmq-c | librdkafka |
이 글에서 다루는 것:
- librdkafka 설치 및 CMake 연동
- 완전한 프로듀서·컨슈머 C++ 예제
- 자주 발생하는 에러와 해결법
- 성능 최적화 (배치, 압축, 파티션)
- 프로덕션 배포 패턴
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
1. 환경 설정
필수 의존성
| 항목 | 버전 | 비고 |
|---|---|---|
| C++ | C++14 이상 | C++17 권장 |
| librdkafka | 2.0+ | vcpkg, Homebrew, 또는 소스 빌드 |
| Apache Kafka | 2.8+ | 브로커 (Docker 권장) |
| CMake | 3.16+ | FindPackage 지원 |
Kafka 브로커 실행 (Docker)
# 단일 브로커 (개발용)
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
apache/kafka:3.6
# 토픽 생성
docker exec kafka kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
librdkafka 설치
# vcpkg (권장)
vcpkg install rdkafka
# macOS (Homebrew)
brew install librdkafka
# Ubuntu/Debian
sudo apt-get install librdkafka-dev
CMakeLists.txt 기본 설정
cmake_minimum_required(VERSION 3.16)
project(kafka_example LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(RdKafka CONFIG REQUIRED)
add_executable(kafka_producer producer.cpp)
target_link_libraries(kafka_producer PRIVATE RdKafka::rdkafka)
add_executable(kafka_consumer consumer.cpp)
target_link_libraries(kafka_consumer PRIVATE RdKafka::rdkafka)
주의: vcpkg 사용 시 -DCMAKE_TOOLCHAIN_FILE=[vcpkg root]/scripts/buildsystems/vcpkg.cmake를 CMake에 전달해야 합니다.
2. 완전한 프로듀서 예제
2.1 최소 동작 프로듀서
// producer.cpp
// 컴파일: g++ -std=c++17 -o producer producer.cpp -lrdkafka -lrdkafka++
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
int main(int argc, char** argv) {
std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
std::string topic = argc > 2 ? argv[2] : "app-logs";
std::string errstr;
// 1. 전역 설정
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
// 2. 프로듀서 생성
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
// 3. 메시지 발행
std::string payload = "Hello, Kafka from C++!";
RdKafka::ErrorCode err = producer->produce(
topic,
RdKafka::Topic::PARTITION_UA, // 파티션 자동 할당
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()),
payload.size(),
"key1", 5, // 키 (선택)
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
std::cerr << "발행 실패: " << RdKafka::err2str(err) << std::endl;
} else {
std::cout << "메시지 발행 완료" << std::endl;
}
// 4. 대기 중인 delivery report 처리
while (producer->outq_len() > 0) {
producer->poll(100);
}
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
PARTITION_UA: 파티션을 자동 선택 (키 해시 또는 라운드로빈)RK_MSG_COPY: 페이로드를 복사해 저장 (호출 후 버퍼 수정 가능)outq_len(): 전송 대기 중인 메시지 수. 0이 될 때까지poll()호출 필요
2.2 Delivery Report 콜백 (실전용)
발행 결과를 콜백으로 받아 로깅·재시도할 수 있습니다.
// producer_with_dr_cb.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
class DeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) {
std::cerr << "[DR] 전달 실패: " << msg.errstr() << std::endl;
} else {
std::cout << "[DR] 전달 완료: " << msg.topic_name() << "["
<< msg.partition() << "] @ " << msg.offset() << std::endl;
}
}
};
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
DeliveryReportCb dr_cb;
conf->set("dr_cb", &dr_cb, errstr);
RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);
delete conf;
if (!producer) {
std::cerr << "프로듀서 생성 실패: " << errstr << std::endl;
return 1;
}
std::string payload = "Test message with delivery report";
producer->produce("app-logs", RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(payload.data()), payload.size(),
nullptr, 0, 0, nullptr);
// poll을 호출해야 dr_cb가 실행됨
for (int i = 0; i < 10 && producer->outq_len() > 0; ++i) {
producer->poll(100);
}
producer->flush(5000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
주의: poll()을 주기적으로 호출하지 않으면 delivery report 콜백이 실행되지 않습니다. 프로듀서 스레드에서 루프로 poll(100)을 호출하는 것이 일반적입니다.
2.3 RAII 래퍼 클래스 (재사용 가능)
// kafka_producer.hpp
#pragma once
#include <librdkafka/rdkafkacpp.h>
#include <memory>
#include <stdexcept>
#include <string>
class KafkaProducer {
public:
KafkaProducer(const std::string& brokers,
const std::string& client_id = "cpp-producer") {
std::string errstr;
conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
conf_->set("bootstrap.servers", brokers, errstr);
conf_->set("client.id", client_id, errstr);
conf_->set("dr_cb", &dr_cb_, errstr);
producer_.reset(RdKafka::Producer::create(conf_.get(), errstr));
if (!producer_) {
throw std::runtime_error("프로듀서 생성 실패: " + errstr);
}
}
void produce(const std::string& topic, const std::string& key,
const std::string& value) {
RdKafka::ErrorCode err = producer_->produce(
topic, RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(value.data()), value.size(),
key.empty() ? nullptr : key.data(), key.size(),
0, nullptr);
if (err != RdKafka::ERR_NO_ERROR) {
throw std::runtime_error("발행 실패: " + RdKafka::err2str(err));
}
producer_->poll(0);
}
void flush(int timeout_ms = 10000) {
producer_->flush(timeout_ms);
}
void poll(int timeout_ms = 0) {
producer_->poll(timeout_ms);
}
private:
struct DeliveryReportCb : public RdKafka::DeliveryReportCb {
void dr_cb(RdKafka::Message& msg) override {
if (msg.err()) {
std::cerr << "[DR] 실패: " << msg.errstr() << std::endl;
}
}
} dr_cb_;
std::unique_ptr<RdKafka::Conf> conf_;
std::unique_ptr<RdKafka::Producer> producer_;
};
3. 완전한 컨슈머 예제
3.1 최소 동작 컨슈머
// consumer.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <memory>
#include <string>
#include <csignal>
static volatile sig_atomic_t run = 1;
void sigterm_handler(int) { run = 0; }
int main(int argc, char** argv) {
std::string brokers = argc > 1 ? argv[1] : "localhost:9092";
std::string group_id = argc > 2 ? argv[2] : "cpp-consumer-group";
std::string topic = argc > 3 ? argv[3] : "app-logs";
std::string errstr;
signal(SIGINT, sigterm_handler);
signal(SIGTERM, sigterm_handler);
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("group.id", group_id, errstr);
conf->set("enable.auto.commit", "false", errstr); // 수동 커밋
RdKafka::KafkaConsumer* consumer =
RdKafka::KafkaConsumer::create(conf, errstr);
delete conf;
if (!consumer) {
std::cerr << "컨슈머 생성 실패: " << errstr << std::endl;
return 1;
}
std::vector<std::string> topics = {topic};
RdKafka::ErrorCode err = consumer->subscribe(topics);
if (err) {
std::cerr << "구독 실패: " << RdKafka::err2str(err) << std::endl;
delete consumer;
return 1;
}
std::cout << "메시지 수신 대기 중... (Ctrl+C로 종료)" << std::endl;
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
std::cout << "[" << msg->topic_name() << ":" << msg->partition()
<< "@" << msg->offset() << "] "
<< std::string(static_cast<const char*>(msg->payload()),
msg->len())
<< std::endl;
consumer->commit(msg); // 오프셋 커밋
break;
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR__PARTITION_EOF:
break;
default:
std::cerr << "소비 에러: " << msg->errstr() << std::endl;
run = 0;
break;
}
delete msg;
}
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
코드 설명:
enable.auto.commit=false: 메시지 처리 완료 후 수동으로commit()호출. 처리 중 크래시 시 재처리 가능consume(1000): 1초 타임아웃. 메시지 없으면ERR__TIMED_OUT반환commit(msg): 해당 메시지의 오프셋까지 커밋
3.2 리밸런싱 콜백 (컨슈머 그룹)
파티션 재할당 시 리밸런싱 콜백에서 assign/unassign를 처리해야 합니다.
// consumer_rebalance.cpp
#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <signal.h>
static volatile sig_atomic_t run = 1;
class RebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb(RdKafka::KafkaConsumer* consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*>& partitions) override {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
std::cerr << "[리밸런싱] 파티션 할당: ";
for (auto* p : partitions)
std::cerr << p->topic() << "[" << p->partition() << "] ";
std::cerr << std::endl;
} else {
consumer->unassign();
std::cerr << "[리밸런싱] 파티션 해제" << std::endl;
}
}
};
int main() {
std::string errstr;
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("bootstrap.servers", "localhost:9092", errstr);
conf->set("group.id", "cpp-group", errstr);
conf->set("enable.auto.commit", "false", errstr);
RebalanceCb rebalance_cb;
conf->set("rebalance_cb", &rebalance_cb, errstr);
RdKafka::KafkaConsumer* consumer =
RdKafka::KafkaConsumer::create(conf, errstr);
delete conf;
consumer->subscribe({"app-logs"});
while (run) {
RdKafka::Message* msg = consumer->consume(1000);
if (msg->err() == RdKafka::ERR_NO_ERROR) {
std::cout << std::string(static_cast<const char*>(msg->payload()),
msg->len()) << std::endl;
consumer->commit(msg);
}
delete msg;
}
consumer->close();
delete consumer;
RdKafka::wait_destroyed(5000);
return 0;
}
3.3 이벤트 콜백 (에러·통계)
// EventCb: 연결 에러, throttle 등 수신
class EventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event& event) override {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
if (event.fatal()) {
std::cerr << "치명적 에러: " << event.str() << std::endl;
} else {
std::cerr << "에러: " << event.str() << std::endl;
}
break;
case RdKafka::Event::EVENT_THROTTLE:
std::cerr << "Throttled: " << event.throttle_time() << "ms by "
<< event.broker_name() << std::endl;
break;
default:
break;
}
}
};
// 설정 시
EventCb event_cb;
conf->set("event_cb", &event_cb, errstr);
4. 자주 발생하는 에러와 해결법
에러 1: “Connection refused” / “Broker: Connection refused”
증상: 프로듀서/컨슈머 생성은 되지만 메시지 발행·소비 시 연결 실패.
원인:
- Kafka 브로커가 실행 중이 아님
- 잘못된 주소/포트
- Docker 환경에서
localhostvs127.0.0.1혼동 - 방화벽 차단
해결법:
# 브로커 실행 확인
docker ps | grep kafka
# Docker 내부에서 접속 시
# bootstrap.servers = localhost:9092 (호스트에서)
# bootstrap.servers = host.docker.internal:9092 (Docker 컨테이너에서 호스트 접속)
// ✅ 여러 브로커 지정 (고가용성)
conf->set("bootstrap.servers", "broker1:9092,broker2:9092,broker3:9092", errstr);
// ✅ 연결 타임아웃 설정
conf->set("socket.connection.setup.timeout.ms", "5000", errstr);
에러 2: “Topic ‘xxx’ not present in metadata”
증상: produce() 호출 시 ERR__UNKNOWN_TOPIC 또는 토픽을 찾을 수 없음.
원인: 토픽이 아직 생성되지 않았거나, auto.create.topics.enable=true인데 브로커가 아직 메타데이터를 갱신하지 않음.
해결법:
# 토픽 사전 생성
kafka-topics --create --topic app-logs --partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
// ✅ 토픽 자동 생성 대기 (metadata.max.age.ms)
conf->set("metadata.max.age.ms", "5000", errstr);
에러 3: “Message delivery failed: Broker: Message size too large”
증상: 큰 메시지 발행 시 실패.
원인: 브로커의 message.max.bytes 설정보다 메시지가 큼.
해결법:
// 프로듀서: 메시지 크기 제한
conf->set("message.max.bytes", "10485760", errstr); // 10MB
// 브로커 설정: message.max.bytes, replica.fetch.max.bytes 동일하게
에러 4: “Commit failed: Local: No offset stored”
증상: consumer->commit(msg) 호출 시 에러.
원인: enable.auto.commit=true이면서 수동 commit()을 호출하거나, 아직 커밋할 오프셋이 없음.
해결법:
// ✅ 수동 커밋 사용 시
conf->set("enable.auto.commit", "false", errstr);
// ✅ commitSync 호출
consumer->commit(msg);
// 또는
consumer->commitSync(); // 현재 오프셋까지 커밋
에러 5: 메시지 중복 처리
증상: 컨슈머 재시작 후 같은 메시지를 다시 처리.
원인: 메시지 처리 완료 전에 커밋하거나, 처리 후 커밋 전에 크래시.
해결법:
// ✅ 처리 완료 후 커밋 (at-least-once)
void process_msg(RdKafka::Message* msg) {
// 1. 비즈니스 로직 처리
doBusinessLogic(msg);
// 2. 처리 성공 후 커밋
consumer->commit(msg);
}
// ❌ 나쁜 예: 처리 전 커밋
consumer->commit(msg); // 커밋
doBusinessLogic(msg); // 처리 중 크래시 시 재처리됨
정확히 한 번 전달이 필요하면 프로듀서의 enable.idempotence=true와 트랜잭션을 사용합니다. Kafka 고급(#52-6)에서 다룹니다.
에러 6: “Out of memory” / “Queue full”
증상: 프로듀서가 produce() 호출 시 ERR__QUEUE_FULL 반환.
원인: 브로커 전송 속도보다 발행 속도가 빠름. 내부 큐가 가득 참.
해결법:
// ✅ 큐 크기 증가
conf->set("queue.buffering.max.messages", "1000000", errstr);
conf->set("queue.buffering.max.kbytes", "1048576", errstr); // 1GB
// ✅ 발행 시 블로킹 대기
while (producer->outq_len() > 10000) {
producer->poll(100);
}
producer->produce(...);
에러 7: “Broker: Not leader for partition”
증상: 리더가 변경된 파티션에 발행 시도.
원인: 브로커 장애·리밸런싱으로 리더가 변경됨. librdkafka는 자동으로 새 리더로 재시도합니다.
해결법:
// ✅ 재시도 설정 (기본값으로 충분)
conf->set("retries", "10", errstr);
conf->set("retry.backoff.ms", "100", errstr);
에러 8: “Consumer group is rebalancing”
증상: 컨슈머가 commit() 호출 시 ERR__REBALANCE_IN_PROGRESS.
원인: 컨슈머 그룹에 인스턴스 추가/제거로 리밸런싱 중.
해결법:
// ✅ 리밸런싱 콜백에서 assign 후 재처리
// rebalance_cb에서 파티션 해제 시 처리 중인 메시지 정리
// assign 후 새 파티션에서 consume 재개
에러 9: DeliveryReportCb가 호출되지 않음
증상: dr_cb가 한 번도 실행되지 않음.
원인: poll()을 호출하지 않음. delivery report는 poll() 호출 시 처리됩니다.
해결법:
// ✅ 프로듀서 스레드에서 주기적 poll
void producer_loop() {
while (running) {
producer->produce(...);
producer->poll(100); // 필수!
}
producer->flush(10000);
}
5. 성능 최적화
5.1 배치 발행 (Linger)
메시지를 모아서 한 번에 전송하면 RTT를 줄일 수 있습니다.
// 배치 대기 시간 (ms)
conf->set("linger.ms", "5", errstr); // 5ms 대기 후 전송
// 배치 크기 (bytes)
conf->set("batch.size", "16384", errstr); // 16KB
주의: linger.ms가 크면 지연이 증가합니다. 처리량 vs 지연 트레이드오프를 고려하세요.
5.2 압축
네트워크 대역폭을 줄입니다.
conf->set("compression.type", "gzip", errstr);
// none, gzip, snappy, lz4, zstd
| 압축 | 속도 | 압축률 | CPU |
|---|---|---|---|
| none | 빠름 | 없음 | 낮음 |
| snappy | 빠름 | 중간 | 중간 |
| lz4 | 빠름 | 좋음 | 중간 |
| gzip | 느림 | 높음 | 높음 |
| zstd | 중간 | 매우 높음 | 중간 |
5.3 프로듀서 풀링
// ❌ 나쁜 예: 매 요청마다 새 프로듀서
void handle_request() {
auto producer = create_producer();
producer->produce(...);
delete producer;
}
// ✅ 좋은 예: 프로듀서 재사용 (싱글톤 또는 풀)
static KafkaProducer* get_producer() {
static KafkaProducer producer("localhost:9092");
return &producer;
}
5.4 파티션 수와 컨슈머 수
- 파티션 수 ≥ 컨슈머 수일 때만 모든 컨슈머가 활용됩니다.
- 파티션 수를 늘리면 처리량은 늘지만, 순서 보장은 파티션 내에서만입니다.
// 키가 같은 메시지는 같은 파티션으로
producer->produce(topic, partition, ..., key, key_len, ...);
// partition = PARTITION_UA면 키 해시로 파티션 선택
5.5 성능 비교 (참고)
| 설정 | 초당 메시지 (대략) | 지연 |
|---|---|---|
| linger.ms=0, batch 작음 | 5만 | 1ms |
| linger.ms=5, batch 16KB | 50만 | 5ms |
| linger.ms=5, 압축 | 30만 | 5ms |
6. 프로덕션 패턴
6.1 Graceful Shutdown
static std::atomic<bool> running{true};
void sig_handler(int) {
running = false;
}
int main() {
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
// ... 프로듀서 생성
while (running) {
producer->produce(...);
producer->poll(100);
}
// 종료 전 모든 메시지 전송 완료 대기
producer->flush(10000);
delete producer;
RdKafka::wait_destroyed(5000);
return 0;
}
6.2 메시지 키 설계
// 파티션 내 순서 보장: 같은 키 → 같은 파티션
// 예: 사용자별 이벤트 순서
std::string key = "user:" + std::to_string(user_id);
producer->produce(topic, RdKafka::Topic::PARTITION_UA, ..., key.data(), key.size(), ...);
6.3 헬스 체크
// 프로듀서: 메타데이터 조회로 브로커 연결 확인
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = producer->metadata(true, nullptr, &metadata, 5000);
bool healthy = (err == RdKafka::ERR_NO_ERROR && metadata);
if (metadata) delete metadata;
6.4 환경 변수 기반 설정
struct KafkaConfig {
std::string brokers = "localhost:9092";
std::string group_id = "cpp-consumer";
int linger_ms = 5;
};
KafkaConfig load_config() {
KafkaConfig c;
if (const char* b = std::getenv("KAFKA_BROKERS")) c.brokers = b;
if (const char* g = std::getenv("KAFKA_GROUP_ID")) c.group_id = g;
return c;
}
6.5 로깅 (debug)
// 문제 디버깅 시
conf->set("debug", "broker,topic,msg", errstr);
// broker, topic, msg, protocol, cgrp 등
6.6 SSL/TLS (프로덕션)
conf->set("security.protocol", "ssl", errstr);
conf->set("ssl.ca.location", "/path/to/ca-cert", errstr);
conf->set("ssl.certificate.location", "/path/to/client-cert", errstr);
conf->set("ssl.key.location", "/path/to/client-key", errstr);
6.7 SASL 인증
conf->set("security.protocol", "sasl_plaintext", errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", "user", errstr);
conf->set("sasl.password", "pass", errstr);
7. 구현 체크리스트
환경 설정
- Kafka 브로커 실행 확인
- 토픽 사전 생성 (또는 auto.create 설정)
- librdkafka 설치 및 CMake 연동
프로듀서
-
bootstrap.servers설정 -
dr_cb등록 및poll()주기적 호출 -
flush()또는outq_len()==0대기 후 종료 - 배치·압축 설정 (성능 요구 시)
컨슈머
-
group.id설정 -
enable.auto.commit=false시 수동commit() -
rebalance_cb등록 (컨슈머 그룹 사용 시) -
event_cb등록 (에러 모니터링)
에러 처리
-
produce()반환값 검사 -
msg->err()검사 -
ERR__QUEUE_FULL시 대기 또는 백프레셔
프로덕션
- Graceful Shutdown (flush, wait_destroyed)
- SSL/TLS, SASL (필요 시)
- 환경 변수로 설정 외부화
정리
| 항목 | 요약 |
|---|---|
| librdkafka | C/C++ Kafka 클라이언트의 사실상 표준 |
| 프로듀서 | Conf → Producer::create → produce → poll → flush |
| 컨슈머 | Conf → KafkaConsumer::create → subscribe → consume → commit |
| 오프셋 | 수동 커밋으로 메시지 처리 완료 후 commit |
| 에러 | Connection refused, Topic not found, Queue full, Rebalance |
| 성능 | linger.ms, batch.size, 압축, 프로듀서 재사용 |
| 프로덕션 | Graceful Shutdown, SSL/TLS, SASL, 환경 변수 |
핵심 원칙:
- poll() 필수: 프로듀서/컨슈머 모두 이벤트 루프에서 poll 호출
- 수동 커밋: 처리 완료 후 commit으로 중복·유실 최소화
- 리밸런싱: rebalance_cb에서 assign/unassign 처리
- 프로듀서 재사용: 매 요청마다 새 프로듀서 생성 금지
자주 묻는 질문 (FAQ)
Q. Kafka와 RabbitMQ 중 어떤 것을 써야 하나요?
A. 대용량 이벤트 스트리밍, 로그 수집, 재처리/이벤트 소싱이 필요하면 Kafka가 적합합니다. 작업 큐, 우선순위 큐, 복잡한 라우팅이 필요하면 RabbitMQ를 고려하세요.
Q. 정확히 한 번 전달(at-least-once vs exactly-once)은 어떻게 하나요?
A. at-least-once: 처리 후 커밋. 중복 가능성 있음. exactly-once: 프로듀서 enable.idempotence=true + 트랜잭션 + 컨슈머 read_committed. Kafka 고급(#52-6)에서 다룹니다.
Q. 메시지 순서가 보장되나요?
A. 파티션 내에서만 순서가 보장됩니다. 같은 키를 사용하면 같은 파티션으로 가므로, 키별 순서를 보장할 수 있습니다.
Q. librdkafka는 스레드 안전한가요?
A. 네. librdkafka API는 스레드 안전합니다. 단, 콜백 객체(dr_cb, rebalance_cb 등)는 프로듀서/컨슈머 생성 시점에 등록되어야 하며, 수명이 프로듀서/컨슈머보다 길어야 합니다.
한 줄 요약: librdkafka로 C++에서 Kafka 프로듀서·컨슈머를 구현하고, poll·commit·리밸런싱을 올바르게 처리하면 실무에 바로 적용할 수 있습니다.
다음 글: Kafka 고급: 스트림 처리·트랜잭션·정확히 한 번(#52-6)
이전 글: C++ 시리즈 목차
참고 자료
- librdkafka 공식 문서
- Apache Kafka 문서
- 메시지 큐 개요(#50-7) — Kafka vs RabbitMQ
관련 글
- C++ Kafka 완벽 가이드 | librdkafka 프로듀서·컨슈머·콜백·트랜잭션·스트리밍 파이프라인
- C++ Kafka 고급 활용 | 스트림 처리·트랜잭션·정확히 한 번 전달 완벽 가이드 [#52-6]
- C++ RabbitMQ 완벽 가이드 | SimpleAmqpClient·rabbitmq-c
- C++ RabbitMQ 고급 | 토픽 라우팅·우선순위 큐·Dead Letter·TTL 가이드