C++ Elasticsearch 완벽 가이드 | Elasticlient·REST API
이 글의 핵심
C++에서 Elasticsearch 연동: Elasticlient·libcurl REST API 설치·연결, 인덱싱·검색·집계 실전 코드. Connection refused·JSON 파싱·벌크 타임아웃 등 흔한 에러 해결, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: C++에서 Elasticsearch를 왜 쓰나요?
실제 겪는 문제 시나리오
시나리오 1: 로그 검색이 너무 느림
수백만 건의 로그를 MySQL LIKE '%keyword%'로 검색하면 풀 스캔이 발생하고 수 초씩 걸립니다. “에러 로그를 1초 안에 찾아야 하는데 10초 넘게 걸려요.”
시나리오 2: 전문 검색(Full-Text Search) 필요
제품명, 설명에서 “무선 이어폰 블루투스”처럼 여러 단어로 검색하고, 유사어·동의어까지 매칭해야 합니다. 관계형 DB는 전문 검색에 한계가 있습니다.
시나리오 3: 실시간 집계·대시보드
클릭 스트림, API 호출 수를 실시간으로 집계해 대시보드에 표시해야 합니다. 1분 단위로 COUNT, AVG를 계산하는 쿼리가 필요합니다.
시나리오 4: “Connection refused” 에러
Elasticsearch 서버 주소를 설정했는데 연결이 안 됩니다. localhost:9200 vs 127.0.0.1:9200, Docker 네트워크, 방화벽 등 확인할 것이 많습니다.
시나리오 5: JSON 직렬화/파싱 에러
Elasticsearch REST API는 JSON을 사용합니다. C++에서 JSON을 잘못 구성하면 "mapper_parsing_exception" 또는 400 Bad Request가 발생합니다.
시나리오 6: 벌크 인덱싱 시 타임아웃
수십만 건을 한 번에 인덱싱하려다 RequestTimeout 또는 EsRejectedExecutionException이 발생합니다. 배치 크기와 재시도 전략이 필요합니다.
Elasticsearch C++ 연동으로 해결:
- 전문 검색: 역인덱스 기반 빠른 검색, 유사어·푸지 매칭
- 집계: 실시간 COUNT, AVG, 히스토그램, 날짜 히스토그램
- 스케일아웃: 샤드 분산으로 수평 확장
- REST API: 공식 C++ 클라이언트는 없지만 REST API로 모든 기능 사용 가능
flowchart LR
subgraph App[C++ 애플리케이션]
P1[로그 수집]
P2[검색 요청]
P3[집계 쿼리]
end
subgraph Client[클라이언트]
E1[Elasticlient]
C1[libcurl REST]
end
subgraph ES[Elasticsearch]
I1[(인덱스)]
I2[(인덱스)]
end
P1 --> E1
P2 --> C1
P3 --> E1
E1 --> I1
C1 --> I2
Elasticsearch C++ 클라이언트 옵션
flowchart TB
subgraph Options[C++ 연동 방식]
A[Elasticlient]
B[libcurl REST API]
C[기타 elasticpp 등]
end
subgraph A_Desc[Elasticlient]
A1[커뮤니티 라이브러리]
A2[인덱싱·검색·스크롤·벌크]
A3[cpr 의존]
end
subgraph B_Desc[REST API]
B1[libcurl + nlohmann/json]
B2[모든 API 지원]
B3[의존성 최소]
end
A --> A_Desc
B --> B_Desc
중요: Elasticsearch는 공식 C++ 클라이언트가 없습니다. Elastic에서 공식 지원하는 클라이언트는 Java, Python, Go, .NET 등이며, C++는 커뮤니티 라이브러리 또는 REST API 직접 호출을 사용합니다.
| 방식 | 장점 | 단점 |
|---|---|---|
| Elasticlient | API 단순, 검색·벌크·스크롤 지원 | 유지보수 느림, cpr 의존 |
| libcurl + JSON | 의존성 최소, 모든 API 지원 | 코드 직접 작성 필요 |
이 글에서 다루는 것:
- Elasticlient 설치 및 기본 사용 예제
- libcurl REST API 직접 호출 예제 (의존성 최소)
- 완전한 인덱싱·검색·집계 C++ 예제
- 자주 발생하는 에러와 해결법
- 성능 최적화 (벌크, 연결 재사용, 스크롤)
- 프로덕션 배포 패턴
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
1. 환경 설정
필수 의존성
| 항목 | 버전 | 비고 |
|---|---|---|
| C++ | C++14 이상 | C++17 권장 |
| Elasticsearch | 7.x/8.x | Docker 권장 |
| Elasticlient | - | 선택, cpr 의존 |
| libcurl | 7.x+ | REST API용 |
| nlohmann/json | 3.x | JSON 파싱 |
Elasticsearch 서버 실행 (Docker)
# 단일 노드 (개발용)
docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
docker.elastic.co/elasticsearch/elasticsearch:8.11.0
# 연결 확인
curl -X GET "localhost:9200/?pretty"
출력 예시:
{
"name" : "node-1",
"cluster_name" : "docker-cluster",
"version" : {
"number" : "8.11.0"
}
}
Elasticlient 설치 (선택)
Elasticlient는 cpr(HTTP 클라이언트)에 의존합니다.
# 소스 빌드 (GitHub)
git clone https://github.com/seznam/elasticlient.git
cd elasticlient
mkdir build && cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
cmake --build .
sudo cmake --build . --target install
# cpr 설치 (Ubuntu/Debian)
sudo apt-get install libcurl4-openssl-dev
# cpr은 헤더 전용 또는 vcpkg로 설치
libcurl + nlohmann/json 설치 (REST API용)
# Ubuntu/Debian
sudo apt-get install libcurl4-openssl-dev
# macOS (Homebrew)
brew install curl
brew install nlohmann-json
# vcpkg
vcpkg install curl
vcpkg install nlohmann-json
CMakeLists.txt 기본 설정
Elasticlient 사용 시:
cmake_minimum_required(VERSION 3.16)
project(elasticsearch_demo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(elasticlient REQUIRED)
find_package(cpr REQUIRED)
add_executable(es_demo main.cpp)
target_link_libraries(es_demo PRIVATE elasticlient::elasticlient cpr::cpr)
REST API 직접 사용 시 (권장):
cmake_minimum_required(VERSION 3.16)
project(elasticsearch_demo LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(CURL REQUIRED)
find_package(nlohmann_json REQUIRED)
add_executable(es_demo main.cpp)
target_link_libraries(es_demo PRIVATE CURL::libcurl nlohmann_json::nlohmann_json)
2. 완전한 Elasticlient 예제
2.1 최소 동작: 인덱싱·조회·삭제
Elasticlient를 사용한 기본 예제입니다. (Elasticlient는 7.x 이전 API 스타일을 사용할 수 있으므로, 실제 Elasticsearch 버전에 맞게 조정이 필요할 수 있습니다.)
// elasticlient_basic.cpp
// Elasticlient 사용 시 (docType은 7.x 이후 deprecated, _doc 사용 권장)
#include <elasticlient/client.h>
#include <iostream>
#include <string>
int main() {
try {
// 1. 클라이언트 생성 (여러 노드 가능)
elasticlient::Client client({"http://localhost:9200/"});
// 2. 문서 인덱싱
std::string doc = R"({"message": "Hello Elasticsearch!", "timestamp": "2024-01-15T10:00:00"})";
cpr::Response indexResp = client.index("logs", "_doc", "1", doc);
if (indexResp.status_code != 200 && indexResp.status_code != 201) {
std::cerr << "인덱싱 실패: " << indexResp.status_code << " " << indexResp.text << std::endl;
return 1;
}
std::cout << "인덱싱 완료: " << indexResp.text << std::endl;
// 3. 문서 조회
cpr::Response getResp = client.get("logs", "_doc", "1");
if (getResp.status_code == 200) {
std::cout << "조회 결과: " << getResp.text << std::endl;
}
// 4. 문서 삭제
cpr::Response delResp = client.remove("logs", "_doc", "1");
std::cout << "삭제 완료: " << delResp.status_code << std::endl;
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
코드 설명:
Client: Elasticsearch 클러스터 URL 목록index(index, type, id, doc): 문서 인덱싱get(index, type, id): 문서 조회remove(index, type, id): 문서 삭제
2.2 검색 (Search)
// elasticlient_search.cpp
#include <elasticlient/client.h>
#include <iostream>
#include <string>
int main() {
elasticlient::Client client({"http://localhost:9200/"});
// match_all 쿼리
std::string query = R"({
"query": {
"match_all": {}
},
"size": 10
})";
cpr::Response resp = client.search("logs", query);
if (resp.status_code == 200) {
std::cout << "검색 결과:\n" << resp.text << std::endl;
} else {
std::cerr << "검색 실패: " << resp.status_code << resp.text << std::endl;
}
return 0;
}
2.3 벌크 인덱싱 (Bulk)
// elasticlient_bulk.cpp
#include <elasticlient/client.h>
#include <elasticlient/bulk.h>
#include <iostream>
#include <string>
int main() {
elasticlient::Client client({"http://localhost:9200/"});
elasticlient::Bulk bulk(client);
// 벌크 액션 추가
bulk.index("logs", "_doc", "1", R"({"msg":"log1","level":"info"})");
bulk.index("logs", "_doc", "2", R"({"msg":"log2","level":"error"})");
bulk.index("logs", "_doc", "3", R"({"msg":"log3","level":"info"})");
cpr::Response resp = bulk.perform();
if (resp.status_code == 200) {
std::cout << "벌크 완료: " << resp.text << std::endl;
} else {
std::cerr << "벌크 실패: " << resp.status_code << resp.text << std::endl;
}
return 0;
}
3. 완전한 REST API 예제
Elasticlient 없이 libcurl과 nlohmann/json으로 REST API를 직접 호출하는 방식입니다. 의존성 최소이며 모든 Elasticsearch API를 사용할 수 있습니다.
3.1 HTTP 헬퍼 클래스
// es_client.hpp
#pragma once
#include <curl/curl.h>
#include <nlohmann/json.hpp>
#include <string>
#include <stdexcept>
#include <sstream>
using json = nlohmann::json;
class EsClient {
public:
EsClient(const std::string& base_url = "http://localhost:9200")
: base_url_(base_url.rfind('/') == base_url.size() - 1 ? base_url : base_url + "/") {
curl_global_init(CURL_GLOBAL_DEFAULT);
curl_ = curl_easy_init();
if (!curl_) {
throw std::runtime_error("curl 초기화 실패");
}
}
~EsClient() {
if (curl_) curl_easy_cleanup(curl_);
curl_global_cleanup();
}
// GET 요청
std::string get(const std::string& path) {
return request("GET", path, "");
}
// PUT 요청 (인덱스 생성, 문서 인덱싱)
std::string put(const std::string& path, const std::string& body = "{}") {
return request("PUT", path, body);
}
// POST 요청 (검색, 벌크)
std::string post(const std::string& path, const std::string& body = "{}") {
return request("POST", path, body);
}
// DELETE 요청
std::string del(const std::string& path) {
return request("DELETE", path, "");
}
private:
static size_t write_callback(void* contents, size_t size, size_t nmemb, void* userp) {
size_t total = size * nmemb;
static_cast<std::string*>(userp)->append(static_cast<char*>(contents), total);
return total;
}
std::string request(const std::string& method, const std::string& path,
const std::string& body) {
std::string url = base_url_ + (path.front() == '/' ? path.substr(1) : path);
std::string response;
curl_easy_reset(curl_);
curl_easy_setopt(curl_, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, write_callback);
curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &response);
curl_easy_setopt(curl_, CURLOPT_TIMEOUT, 30L);
if (method == "GET") {
curl_easy_setopt(curl_, CURLOPT_HTTPGET, 1L);
} else if (method == "PUT") {
curl_easy_setopt(curl_, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, body.c_str());
curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, body.size());
} else if (method == "POST") {
curl_easy_setopt(curl_, CURLOPT_POST, 1L);
curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, body.c_str());
curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, body.size());
} else if (method == "DELETE") {
curl_easy_setopt(curl_, CURLOPT_CUSTOMREQUEST, "DELETE");
}
struct curl_slist* headers = nullptr;
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, headers);
CURLcode res = curl_easy_perform(curl_);
curl_slist_free_all(headers);
if (res != CURLE_OK) {
throw std::runtime_error(std::string("curl 실패: ") + curl_easy_strerror(res));
}
long http_code = 0;
curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &http_code);
if (http_code >= 400) {
throw std::runtime_error("HTTP " + std::to_string(http_code) + ": " + response);
}
return response;
}
std::string base_url_;
CURL* curl_;
};
3.2 최소 동작: Ping·인덱싱·조회
// rest_basic.cpp
#include "es_client.hpp"
#include <iostream>
int main() {
try {
EsClient client("http://localhost:9200");
// 1. Ping (클러스터 상태 확인)
std::string ping = client.get("");
std::cout << "Ping: " << ping << std::endl;
// 2. 인덱스 매핑 생성 (선택)
std::string mapping = R"({
"mappings": {
"properties": {
"message": { "type": "text" },
"level": { "type": "keyword" },
"timestamp": { "type": "date" }
}
}
})";
client.put("logs", mapping);
// 3. 문서 인덱싱
std::string doc = R"({
"message": "Application started",
"level": "info",
"timestamp": "2024-01-15T10:00:00Z"
})";
std::string indexResp = client.put("logs/_doc/1", doc);
std::cout << "인덱싱: " << indexResp << std::endl;
// 4. 문서 조회
std::string getResp = client.get("logs/_doc/1");
std::cout << "조회: " << getResp << std::endl;
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
3.3 전문 검색 (Match Query)
// rest_search.cpp
#include "es_client.hpp"
#include <iostream>
#include <nlohmann/json.hpp>
int main() {
try {
EsClient client("http://localhost:9200");
// match 쿼리: message 필드에서 "error" 검색
nlohmann::json query = {
{"query", {
{"match", {
{"message", "error"}
}}
}},
{"size", 10},
{"_source", {"message", "level", "timestamp"}}
};
std::string body = query.dump();
std::string resp = client.post("logs/_search", body);
auto j = nlohmann::json::parse(resp);
auto hits = j[hits][hits];
std::cout << "총 " << j[hits][total][value] << "건\n";
for (const auto& hit : hits) {
std::cout << " - " << hit[_source][message] << " ["
<< hit[_source][level] << "]\n";
}
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
3.4 집계 (Aggregation)
// rest_aggregation.cpp
#include "es_client.hpp"
#include <iostream>
#include <nlohmann/json.hpp>
int main() {
try {
EsClient client("http://localhost:9200");
// level별 문서 수 집계
nlohmann::json query = {
{"size", 0},
{"aggs", {
{"levels", {
{"terms", {
{"field", "level"}
}}
}}
}}
};
std::string resp = client.post("logs/_search", query.dump());
auto j = nlohmann::json::parse(resp);
std::cout << "level별 건수:\n";
for (const auto& bucket : j[aggregations][levels][buckets]) {
std::cout << " " << bucket[key] << ": " << bucket[doc_count] << "\n";
}
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
3.5 벌크 인덱싱 (Bulk API)
// rest_bulk.cpp
#include "es_client.hpp"
#include <iostream>
#include <sstream>
#include <nlohmann/json.hpp>
int main() {
try {
EsClient client("http://localhost:9200");
// 벌크 형식: 액션\n문서\n액션\n문서...
std::ostringstream bulk;
for (int i = 1; i <= 100; ++i) {
nlohmann::json action = {
{"index", {
{"_index", "logs"},
{"_id", std::to_string(i)}
}}
};
nlohmann::json doc = {
{"message", "Log entry " + std::to_string(i)},
{"level", (i % 5 == 0 ? "error" : "info")},
{"timestamp", "2024-01-15T10:00:00Z"}
};
bulk << action.dump() << "\n" << doc.dump() << "\n";
}
std::string resp = client.post("_bulk", bulk.str());
auto j = nlohmann::json::parse(resp);
if (j.contains("errors") && j[errors]) {
std::cerr << "벌크 중 일부 실패\n";
for (const auto& item : j[items]) {
if (item.contains("index") && item[index].contains("error")) {
std::cerr << " " << item[index][error][reason] << "\n";
}
}
} else {
std::cout << "벌크 인덱싱 완료: 100건\n";
}
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
3.6 스크롤 API (대용량 검색)
// rest_scroll.cpp
#include "es_client.hpp"
#include <iostream>
#include <nlohmann/json.hpp>
int main() {
try {
EsClient client("http://localhost:9200");
// 1. 스크롤 시작
nlohmann::json search = {
{"query", {{"match_all", {}}}},
{"size", 100},
{"sort", {{"_id", "asc"}}}
};
std::string resp = client.post("logs/_search?scroll=1m", search.dump());
auto j = nlohmann::json::parse(resp);
std::string scroll_id = j[_scroll_id];
auto hits = j[hits][hits];
size_t total = 0;
while (!hits.empty()) {
total += hits.size();
for (const auto& hit : hits) {
// 문서 처리
}
// 2. 다음 스크롤
nlohmann::json scroll_req = {{"scroll", "1m"}, {"scroll_id", scroll_id}};
resp = client.post("_search/scroll", scroll_req.dump());
j = nlohmann::json::parse(resp);
scroll_id = j[_scroll_id];
hits = j[hits][hits];
}
std::cout << "총 처리: " << total << "건\n";
} catch (const std::exception& e) {
std::cerr << "에러: " << e.what() << std::endl;
return 1;
}
return 0;
}
4. 자주 발생하는 에러와 해결법
에러 1: “Connection refused” / “Could not resolve host”
증상: Elasticsearch 연결 시도 시 실패.
원인:
- Elasticsearch 서버가 실행 중이 아님
- 잘못된 주소/포트 (기본 9200)
- Docker 환경에서
localhostvshost.docker.internal - 방화벽 차단
해결법:
# Elasticsearch 실행 확인
curl -X GET "localhost:9200/"
# Docker 컨테이너에서 호스트 접속 시
# URL: http://host.docker.internal:9200
// ✅ 여러 노드 지정 (고가용성)
EsClient client("http://node1:9200,http://node2:9200");
// ✅ 타임아웃 설정 (es_client에 추가)
curl_easy_setopt(curl_, CURLOPT_CONNECTTIMEOUT, 5L);
curl_easy_setopt(curl_, CURLOPT_TIMEOUT, 30L);
에러 2: “mapper_parsing_exception” / “400 Bad Request”
증상: 문서 인덱싱 시 400 응답.
원인:
- JSON 형식 오류 (쉼표 누락, 따옴표 오류)
- 필드 타입 불일치 (예: 숫자 필드에 문자열 전달)
- 매핑에 없는 필드 타입
해결법:
// ❌ 잘못된 JSON
std::string bad = R"({"message": "test" "level": "info"})"; // 쉼표 누락
// ✅ nlohmann::json으로 안전하게 구성
nlohmann::json doc = {
{"message", "test"},
{"level", "info"},
{"timestamp", "2024-01-15T10:00:00Z"}
};
std::string body = doc.dump();
// ✅ 에러 응답 파싱
try {
client.put("logs/_doc/1", body);
} catch (const std::exception& e) {
// "HTTP 400: {\"error\":{\"type\":\"mapper_parsing_exception\",...}}"
std::cerr << e.what() << std::endl;
// JSON 파싱해서 error.reason 확인
}
에러 3: “index_not_found_exception”
증상: 검색/조회 시 인덱스가 없다는 에러.
원인: 인덱스를 생성하지 않았거나 이름 오타.
해결법:
// ✅ 인덱스 사전 생성
std::string mapping = R"({
"mappings": {
"properties": {
"message": { "type": "text" },
"level": { "type": "keyword" }
}
}
})";
client.put("logs", mapping);
// ✅ 동적 매핑 허용 시 인덱스 자동 생성 가능 (첫 문서 인덱싱 시)
// 단, 필드 타입이 고정되지 않을 수 있음
에러 4: “RequestTimeout” / “EsRejectedExecutionException”
증상: 벌크 인덱싱 또는 대용량 검색 시 타임아웃.
원인:
- 한 번에 너무 많은 문서 전송
- Elasticsearch 스레드 풀 포화
- 네트워크 지연
해결법:
// ✅ 벌크 배치 크기 제한 (1000~5000 권장)
const size_t BATCH_SIZE = 1000;
for (size_t offset = 0; offset < total; offset += BATCH_SIZE) {
std::ostringstream bulk;
for (size_t i = offset; i < std::min(offset + BATCH_SIZE, total); ++i) {
// bulk 액션 추가
}
client.post("_bulk", bulk.str());
// 배치 간 짧은 대기 (선택)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// ✅ 타임아웃 증가
curl_easy_setopt(curl_, CURLOPT_TIMEOUT, 120L); // 2분
에러 5: “version_conflict_engine_exception”
증상: 동일 ID로 동시 인덱싱 시 충돌.
원인: 낙관적 잠금(optimistic locking)으로 버전 불일치.
해결법:
// ✅ 다른 ID 사용 (자동 생성)
std::string resp = client.post("logs/_doc", doc); // ID 없이 POST
// ✅ 또는 if_seq_no, if_primary_term으로 낙관적 잠금
// PUT logs/_doc/1?if_seq_no=5&if_primary_term=1
에러 6: JSON 파싱 에러 (nlohmann::json::parse)
증상: parse_error 예외 발생.
원인: Elasticsearch 응답이 유효한 JSON이 아님 (에러 HTML, 잘린 응답 등).
해결법:
// ✅ try-catch로 파싱
std::string resp = client.post("logs/_search", body);
try {
auto j = nlohmann::json::parse(resp);
// ...
} catch (const nlohmann::json::exception& e) {
std::cerr << "JSON 파싱 실패: " << e.what() << "\n원본: " << resp << std::endl;
}
에러 7: “security_exception” / 401 Unauthorized
증상: Elasticsearch 8.x 보안 활성화 시 인증 실패.
원인: xpack.security.enabled=true인데 인증 정보 없음.
해결법:
// ✅ Basic 인증 추가 (es_client 수정)
#include <curl/curl.h>
#include <curl/easy.h>
// URL에 user:password 포함
// http://elastic:password@localhost:9200
// 또는 CURLOPT_USERPWD 설정
curl_easy_setopt(curl_, CURLOPT_USERPWD, "elastic:password");
에러 8: 메모리 부족 (대용량 응답)
증상: 수십만 건 검색 시 OOM.
원인: 전체 결과를 한 번에 메모리에 로드.
해결법:
// ✅ 스크롤 API 사용 (위 rest_scroll.cpp 참고)
// size를 100~1000으로 제한하고 scroll_id로 순차 조회
5. 성능 최적화
5.1 벌크 인덱싱 배치 크기
// 배치 크기: 1000~5000 권장
// 너무 크면: 메모리·타임아웃
// 너무 작으면: HTTP 오버헤드
const size_t BATCH_SIZE = 2000;
void bulk_index(EsClient& client, const std::vector<Document>& docs) {
for (size_t i = 0; i < docs.size(); i += BATCH_SIZE) {
std::ostringstream bulk;
for (size_t j = i; j < std::min(i + BATCH_SIZE, docs.size()); ++j) {
bulk << R"({"index":{"_index":"logs","_id":")" << docs[j].id << R"("}})" << "\n"
<< docs[j].to_json() << "\n";
}
client.post("_bulk", bulk.str());
}
}
5.2 연결 재사용
// ❌ 나쁜 예: 매 요청마다 새 연결
void handle_request() {
EsClient client("http://localhost:9200");
client.get("logs/_search");
}
// ✅ 좋은 예: EsClient 싱글톤 또는 풀
class SearchService {
EsClient client_;
public:
SearchService() : client_("http://localhost:9200") {}
std::string search(const std::string& query) {
return client_.post("logs/_search", query);
}
};
5.3 _source 필드 제한
// 필요한 필드만 요청 (네트워크·파싱 부하 감소)
nlohmann::json query = {
{"query", {{"match_all", {}}}},
{"_source", {"message", "level"}}, // timestamp 등 제외
{"size", 100}
};
5.4 스크롤 vs Search After
| 방식 | 용도 | 메모리 |
|---|---|---|
| Scroll | 대용량 일괄 처리 | 서버에 스크롤 컨텍스트 유지 |
| Search After | 페이지네이션, 실시간에 가까움 | stateless |
// Search After (실시간에 가까운 페이지네이션)
nlohmann::json query = {
{"query", {{"match_all", {}}}},
{"size", 100},
{"sort", {{"_id", "asc"}}}
};
// 첫 요청
auto resp = client.post("logs/_search", query.dump());
auto j = nlohmann::json::parse(resp);
auto hits = j[hits][hits];
auto last = hits.back()[sort];
// 다음 페이지
query[search_after] = last;
resp = client.post("logs/_search", query.dump());
5.5 성능 비교 (참고)
| 방식 | 1만 건 인덱싱 (대략) | 10만 건 검색 |
|---|---|---|
| 단건 인덱싱 | 수 분 | - |
| 벌크 1000 | 10초 | 1초 |
| 벌크 5000 | 5초 | 1초 |
| 스크롤 100 | - | 5초 |
6. 프로덕션 패턴
6.1 재시도 로직
std::string request_with_retry(const std::string& method, const std::string& path,
const std::string& body, int max_retries = 3) {
for (int i = 0; i < max_retries; ++i) {
try {
if (method == "GET") return client_.get(path);
if (method == "POST") return client_.post(path, body);
if (method == "PUT") return client_.put(path, body);
} catch (const std::exception& e) {
if (i == max_retries - 1) throw;
std::this_thread::sleep_for(std::chrono::milliseconds(100 * (i + 1)));
}
}
throw std::runtime_error("재시도 실패");
}
6.2 헬스 체크
bool is_healthy() {
try {
std::string resp = client_.get("_cluster/health");
auto j = nlohmann::json::parse(resp);
std::string status = j[status];
return status == "green" || status == "yellow";
} catch (...) {
return false;
}
}
6.3 환경 변수 기반 설정
struct EsConfig {
std::string url = "http://localhost:9200";
int timeout_sec = 30;
size_t bulk_size = 2000;
};
EsConfig load_config() {
EsConfig c;
if (const char* u = std::getenv("ELASTICSEARCH_URL")) c.url = u;
if (const char* t = std::getenv("ES_TIMEOUT")) c.timeout_sec = std::stoi(t);
return c;
}
6.4 인덱스 별칭 (Zero-Downtime 재인덱싱)
// 1. 새 인덱스 생성 및 데이터 마이그레이션
// 2. 별칭 전환
client_.post("_aliases", R"({
"actions": [
{"remove": {"index": "logs_v1", "alias": "logs"}},
{"add": {"index": "logs_v2", "alias": "logs"}}
]
})");
6.5 로깅 (요청/응답)
std::string EsClient::request(const std::string& method, const std::string& path,
const std::string& body) {
// ...
std::string response = /* ... */;
if (log_level_ >= LogLevel::Debug) {
std::cerr << "[ES] " << method << " " << path << " -> " << response.size() << " bytes\n";
}
return response;
}
6.6 TLS/SSL (프로덕션)
// HTTPS 사용
EsClient client("https://elasticsearch.example.com:9200");
// 자체 서명 인증서 허용 (개발용만)
curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYHOST, 0L);
// 프로덕션: CA 번들 사용
curl_easy_setopt(curl_, CURLOPT_CAINFO, "/etc/ssl/certs/ca-certificates.crt");
7. 구현 체크리스트
환경 설정
- Elasticsearch 서버 실행 확인 (
curl localhost:9200) - libcurl, nlohmann/json 설치 (또는 Elasticlient)
- CMake/빌드 연동
REST API 클라이언트
- Content-Type: application/json 헤더 설정
- 타임아웃 설정 (connect, read)
- HTTP 에러 코드 검사 (4xx, 5xx)
- JSON 파싱 예외 처리
인덱싱
- 인덱스 매핑 사전 정의 (필드 타입)
- 벌크 배치 크기 1000~5000
- 벌크 에러 시 개별 항목 실패 처리
검색
- _source 필드 제한 (필요 시)
- size 제한 (기본 10, 최대 10000)
- 대용량 결과 시 스크롤 또는 Search After
에러 처리
- Connection refused 재시도
- 400/500 응답 파싱 및 로깅
- JSON 파싱 실패 처리
프로덕션
- 연결 재사용 (EsClient 풀/싱글톤)
- 헬스 체크 (/_cluster/health)
- 환경 변수로 URL·타임아웃 외부화
- TLS/SSL (HTTPS)
정리
| 항목 | 요약 |
|---|---|
| 공식 C++ 클라이언트 | 없음. Elasticlient 또는 REST API 사용 |
| REST API | libcurl + nlohmann/json으로 모든 API 호출 가능 |
| 인덱싱 | PUT/POST, 벌크는 _bulk API |
| 검색 | POST index/_search, query DSL |
| 집계 | aggs 필드로 terms, date_histogram 등 |
| 에러 | Connection refused, mapper_parsing, RequestTimeout |
| 성능 | 벌크 배치, 연결 재사용, _source 제한, 스크롤 |
| 프로덕션 | 재시도, 헬스 체크, 환경 변수, TLS |
핵심 원칙:
- 벌크 사용: 단건 인덱싱 대신 _bulk API로 배치 처리
- 연결 재사용: 매 요청마다 새 클라이언트 생성 금지
- 에러 파싱: HTTP 상태·JSON error.reason 확인
- 대용량 검색: 스크롤 또는 Search After
자주 묻는 질문 (FAQ)
Q. Elasticlient와 REST API 중 어떤 것을 써야 하나요?
A. Elasticlient는 API가 단순하지만 유지보수가 느리고 cpr 의존이 있습니다. 새 프로젝트에서는 libcurl + nlohmann/json으로 REST API를 직접 호출하는 방식을 권장합니다.
Q. 대용량 로그 인덱싱 시 주의할 점은?
A. 벌크 배치 크기 1000~5000, 인덱스별 refresh_interval 조정, ILM으로 오래된 인덱스 정리 등을 고려하세요.
한 줄 요약: Elasticsearch는 공식 C++ 클라이언트가 없지만, libcurl과 nlohmann/json으로 REST API를 호출하면 전문 검색·집계·벌크 인덱싱을 C++에서 완전히 활용할 수 있습니다.
다음 글: C++ 시리즈 목차
이전 글: Elasticsearch 개요(#52-5)
참고 자료
관련 글
- C++ Elasticsearch 통합 | 전문 검색·집계·실시간 인덱싱 [#52-5]
- C++ Elasticsearch 완전 실전 가이드 | Elasticlient·REST API
- C++ HTTP 클라이언트 완벽 가이드 | REST API 호출·연결 풀·타임아웃·프로덕션 패턴