C++ Redis 고급 활용 | Pub/Sub·파이프라인·Lua 스크립팅 완벽 가이드 [#52-3]
이 글의 핵심
C++ Redis 심화: Pub/Sub 실시간 알림, 파이프라인 RTT 최적화, Lua 원자적 스크립팅, 트랜잭션, 클러스터. 실무 문제 시나리오, 완전한 예제, 자주 발생하는 에러, 성능 최적화, 프로덕션 패턴까지 900줄 분량으로 다룹니다.
들어가며: “실시간 알림·대량 처리·원자적 작업이 막막해요”
실제 겪는 문제 시나리오
52-2에서 Redis 기본 연결·GET/SET·캐싱·분산 락을 다뤘다면, 이 글에서는 고급 기능을 다룹니다. 실무에서 자주 맞닥뜨리는 문제와 해결 방법을 제시합니다.
시나리오 1: 실시간 알림·채팅에서 서버가 클라이언트에게 푸시
상황: 주문 완료·채팅 메시지 등 이벤트 발생 시 연결된 모든 클라이언트가 즉시 알림을 받아야 함
문제: 단순 폴링은 지연·부하가 크고, WebSocket만으로는 다중 서버 간 메시지 공유 불가
결과: Redis Pub/Sub으로 메시지 브로드캐스트 → 여러 서버가 같은 채널 구독·발행
시나리오 2: 1000개 키를 한 번에 SET/GET할 때 RTT 폭증
상황: 캐시 워밍업·대량 조회 시 명령당 1ms RTT라면 1000건 = 1초 지연
문제: 단일 명령 반복으로 네트워크 왕복이 누적됨
결과: 파이프라인(Pipeline)으로 여러 명령을 한 번에 전송 → RTT 1회로 감소
시나리오 3: 재고 차감·분산 락 해제 시 경쟁 조건
상황: GET → 값 감소 → SET은 원자적이지 않아, 동시 요청 시 재고가 음수로 떨어질 수 있음
문제: 락 해제 시 "같은 토큰일 때만 DEL"을 클라이언트에서 여러 명령으로 처리하면 경쟁 조건
결과: Lua 스크립트로 서버에서 원자적으로 실행 → EVAL/EVALSHA
시나리오 4: 여러 키에 걸친 원자적 작업
상황: 주문 생성 시 재고 차감 + 주문 기록 + 포인트 적립을 한 번에 처리해야 함
문제: 중간에 실패하면 일부만 반영되어 데이터 불일치
결과: MULTI/EXEC 트랜잭션 또는 Lua 스크립트로 원자적 실행
시나리오 5: Redis Cluster에서 MOVED/ASK 에러
상황: 단일 연결로 Cluster에 접속 시 "MOVED 1234 127.0.0.1:7001" 에러
문제: hiredis 단일 연결은 슬롯 리다이렉트를 자동 처리하지 않음
결과: redis-plus-plus RedisCluster 또는 hiredis Cluster API 사용
flowchart TB
subgraph 문제[실무 문제]
P1[실시간 알림] --> S1[Pub/Sub]
P2[대량 RTT] --> S2[파이프라인]
P3[경쟁 조건] --> S3[Lua 스크립트]
P4[원자적 작업] --> S4[트랜잭션/Lua]
P5[Cluster 분산] --> S5[RedisCluster]
end
목표:
- Pub/Sub: PubLISH/SUBSCRIBE로 실시간 메시지 브로드캐스트
- 파이프라인: 여러 명령을 한 번에 전송해 RTT 최소화
- Lua: EVAL/EVALSHA로 원자적 스크립트 실행
- 트랜잭션: MULTI/EXEC로 명령 그룹 실행
- Cluster: redis-plus-plus RedisCluster로 슬롯 분산
요구 환경: C++17 이상, Redis 6.x 이상, hiredis·redis-plus-plus
이 글을 읽으면:
- Pub/Sub으로 실시간 알림·채팅을 구현할 수 있습니다.
- 파이프라인으로 대량 명령을 효율적으로 처리할 수 있습니다.
- Lua 스크립트로 원자적 재고 차감·락 해제를 구현할 수 있습니다.
- 트랜잭션과 Cluster를 실무에 적용할 수 있습니다.
실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.
목차
- Pub/Sub 실시간 메시징
- 파이프라인 RTT 최적화
- Lua 스크립팅
- 트랜잭션 MULTI/EXEC
- Redis Cluster
- 완전한 Redis 고급 예제
- 자주 발생하는 에러와 해결법
- 성능 최적화 팁
- 프로덕션 패턴
- 구현 체크리스트
- 정리
1. Pub/Sub 실시간 메시징
Pub/Sub이란?
발행자(Publisher)가 채널에 메시지를 보내면, 구독자(Subscriber)가 해당 채널을 구독 중인 모든 연결에 메시지가 전달됩니다. 1:N, N:N 브로드캐스트에 적합합니다.
sequenceDiagram
participant P as Publisher
participant R as Redis
participant S1 as Subscriber 1
participant S2 as Subscriber 2
S1->>R: SUBSCRIBE channel:orders
S2->>R: SUBSCRIBE channel:orders
P->>R: PUBLISH channel:orders "주문 완료 #123"
R->>S1: message
R->>S2: message
hiredis로 Pub/Sub
hiredis는 redisCommand를 사용하면 블로킹됩니다. SUBSCRIBE 후에는 다른 명령을 보낼 수 없으므로 별도 연결(또는 스레드)에서 구독해야 합니다.
// pubsub_hiredis.cpp
// 컴파일: g++ -std=c++17 -o pubsub pubsub_hiredis.cpp -lhiredis -lpthread
#include <hiredis/hiredis.h>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <chrono>
struct RedisConnection {
redisContext* ctx = nullptr;
RedisConnection(const char* host, int port) {
ctx = redisConnect(host, port);
if (!ctx || ctx->err) {
throw std::runtime_error(ctx ? ctx->errstr : "연결 실패");
}
}
~RedisConnection() { if (ctx) redisFree(ctx); }
};
// 발행자: 별도 연결로 PUBLISH
void publisher(const std::string& channel) {
RedisConnection conn("127.0.0.1", 6379);
for (int i = 0; i < 5; ++i) {
redisReply* r = (redisReply*)redisCommand(conn.ctx, "PUBLISH %s %s",
channel.c_str(), ("메시지 " + std::to_string(i)).c_str());
if (r) {
std::cout << "[Publish] 수신자 수: " << r->integer << "\n";
freeReplyObject(r);
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
// 구독자: SUBSCRIBE는 블로킹이므로 별도 스레드
void subscriber(const std::string& channel) {
RedisConnection conn("127.0.0.1", 6379);
redisReply* r = (redisReply*)redisCommand(conn.ctx, "SUBSCRIBE %s", channel.c_str());
freeReplyObject(r);
while (true) {
if (redisGetReply(conn.ctx, (void**)&r) != REDIS_OK) break;
if (r->type == REDIS_REPLY_ARRAY && r->elements >= 3) {
std::string type = r->element[0]->str;
std::string msg = r->element[2]->str;
std::cout << "[Sub] " << type << ": " << msg << "\n";
}
freeReplyObject(r);
}
}
int main() {
std::string channel = "channel:orders";
std::thread sub(subscriber, channel);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::thread pub(publisher, channel);
pub.join();
sub.join(); // Ctrl+C로 종료 전까지 대기
return 0;
}
주의: SUBSCRIBE 이후 해당 연결은 구독 모드가 되어 GET, SET 등 일반 명령을 사용할 수 없습니다. 발행·구독은 독립된 연결로 처리해야 합니다.
redis-plus-plus로 Pub/Sub
redis-plus-plus는 subscriber()로 구독 전용 연결을 만들고, on_message 콜백으로 메시지를 수신합니다.
// pubsub_redispp.cpp
// vcpkg install redis-plus-plus
#include <sw/redis++/redis++.h>
#include <iostream>
#include <thread>
#include <chrono>
using namespace sw::redis;
void run_publisher(const std::string& channel) {
auto redis = Redis("tcp://127.0.0.1:6379");
for (int i = 0; i < 5; ++i) {
auto count = redis.publish(channel, "주문 #" + std::to_string(i + 100));
std::cout << "[Publish] 수신자: " << count << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void run_subscriber(const std::string& channel) {
auto sub = Redis("tcp://127.0.0.1:6379").subscriber();
sub.on_message([channel](std::string ch, std::string msg) {
std::cout << "[Sub] " << ch << ": " << msg << "\n";
});
sub.subscribe(channel);
sub.consume(); // 블로킹
}
int main() {
std::string channel = "channel:orders";
std::thread sub(run_subscriber, channel);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::thread pub(run_publisher, channel);
pub.join();
sub.join();
return 0;
}
패턴 구독 (PSUBSCRIBE)
channel:orders:*처럼 와일드카드로 여러 채널을 한 번에 구독할 수 있습니다.
// hiredis PSUBSCRIBE
redisReply* r = (redisReply*)redisCommand(ctx, "PSUBSCRIBE channel:orders:*");
freeReplyObject(r);
// redis-plus-plus
sub.psubscribe("channel:orders:*");
2. 파이프라인 RTT 최적화
파이프라인이란?
여러 명령을 한 번에 전송하고, 응답을 순서대로 수신합니다. RTT는 1회로 줄어들어 대량 명령 처리 시 성능이 크게 향상됩니다.
flowchart LR
subgraph 단일[단일 명령 (RTT 3회)]
A1[Req1] --> A2[Res1]
A2 --> B1[Req2]
B1 --> B2[Res2]
B2 --> C1[Req3]
C1 --> C2[Res3]
end
subgraph 파이프[파이프라인 (RTT 1회)]
P1[Req1+Req2+Req3] --> P2[Res1+Res2+Res3]
end
hiredis 파이프라인
redisAppendCommand로 명령을 버퍼에 추가하고, redisGetReply로 응답을 순서대로 받습니다.
// pipeline_hiredis.cpp
// 컴파일: g++ -std=c++17 -o pipeline pipeline_hiredis.cpp -lhiredis
#include <hiredis/hiredis.h>
#include <iostream>
#include <vector>
#include <string>
struct RedisConnection {
redisContext* ctx = nullptr;
RedisConnection(const char* host, int port) {
ctx = redisConnect(host, port);
if (!ctx || ctx->err) throw std::runtime_error("Redis 연결 실패");
}
~RedisConnection() { if (ctx) redisFree(ctx); }
};
void pipeline_batch_set(redisContext* ctx, int count) {
for (int i = 0; i < count; ++i) {
std::string key = "key:" + std::to_string(i);
std::string val = "value:" + std::to_string(i);
redisAppendCommand(ctx, "SET %s %s", key.c_str(), val.c_str());
}
redisReply* reply = nullptr;
for (int i = 0; i < count; ++i) {
if (redisGetReply(ctx, (void**)&reply) != REDIS_OK) {
std::cerr << "GetReply 실패\n";
break;
}
freeReplyObject(reply);
}
}
void pipeline_batch_get(redisContext* ctx, int count) {
for (int i = 0; i < count; ++i) {
std::string key = "key:" + std::to_string(i);
redisAppendCommand(ctx, "GET %s", key.c_str());
}
redisReply* reply = nullptr;
for (int i = 0; i < count; ++i) {
if (redisGetReply(ctx, (void**)&reply) != REDIS_OK) break;
if (reply->type == REDIS_REPLY_STRING) {
// std::cout << reply->str << "\n";
}
freeReplyObject(reply);
}
}
int main() {
RedisConnection conn("127.0.0.1", 6379);
auto start = std::chrono::high_resolution_clock::now();
pipeline_batch_set(conn.ctx, 1000);
pipeline_batch_get(conn.ctx, 1000);
auto end = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "파이프라인 2000건: " << ms << " ms\n";
return 0;
}
주의: redisAppendCommand와 redisGetReply 사이에 다른 명령을 보내면 파이프라인 순서가 깨집니다. 한 번에 묶어서 처리하세요.
redis-plus-plus 파이프라인
redis.pipeline()으로 Pipeline 객체를 만들고, command()로 명령을 추가한 뒤 exec()로 실행합니다.
// pipeline_redispp.cpp
#include <sw/redis++/redis++.h>
#include <iostream>
#include <chrono>
using namespace sw::redis;
int main() {
auto redis = Redis("tcp://127.0.0.1:6379");
auto pipe = redis.pipeline();
// 1000개 SET
for (int i = 0; i < 1000; ++i) {
pipe.set("key:" + std::to_string(i), "value:" + std::to_string(i));
}
auto replies = pipe.exec();
std::cout << "파이프라인 SET 완료: " << replies.size() << " 건\n";
// 1000개 GET
auto pipe2 = redis.pipeline();
std::vector<std::string> keys;
for (int i = 0; i < 1000; ++i) {
keys.push_back("key:" + std::to_string(i));
}
for (const auto& k : keys) {
pipe2.get(k);
}
auto get_replies = pipe2.exec();
std::cout << "파이프라인 GET 완료: " << get_replies.size() << " 건\n";
return 0;
}
주의: redis-plus-plus Pipeline은 스레드 안전하지 않습니다. 멀티스레드에서 사용 시 std::mutex로 보호하세요.
3. Lua 스크립팅
Lua가 필요한 이유
Redis 명령은 각각 원자적이지만, 여러 명령을 묶어서 원자적으로 실행하려면 Lua가 필요합니다. Lua 스크립트는 Redis 서버에서 한 번에 실행되므로 네트워크 왕복 없이 원자성이 보장됩니다.
flowchart TB
subgraph 클라이언트[클라이언트 (경쟁 조건)]
C1[GET stock] --> C2[감소]
C2 --> C3[SET stock]
C3 --> C4[다른 요청이 동시에 GET]
end
subgraph Lua["Lua (원자적)"]
L1[EVAL 스크립트] --> L2[GET → 감소 → SET 한 번에]
end
Lua 스크립트 기본
-- decrement_stock.lua
-- KEYS[1]: 재고 키
-- ARGV[1]: 차감 수량
local current = redis.call('GET', KEYS[1])
if not current then
return -1 -- 키 없음
end
local stock = tonumber(current)
local amount = tonumber(ARGV[1])
if stock < amount then
return -2 -- 재고 부족
end
redis.call('SET', KEYS[1], stock - amount)
return stock - amount
hiredis로 EVAL
// lua_hiredis.cpp
#include <hiredis/hiredis.h>
#include <iostream>
#include <string>
const char* DECREMENT_STOCK = R"(
local current = redis.call('GET', KEYS[1])
if not current then return -1 end
local stock = tonumber(current)
local amount = tonumber(ARGV[1])
if stock < amount then return -2 end
redis.call('SET', KEYS[1], stock - amount)
return stock - amount
)";
int main() {
redisContext* ctx = redisConnect("127.0.0.1", 6379);
if (!ctx || ctx->err) {
std::cerr << "연결 실패\n";
return 1;
}
// SET stock 100
redisReply* r = (redisReply*)redisCommand(ctx, "SET stock 100");
freeReplyObject(r);
// EVAL decrement_stock
r = (redisReply*)redisCommand(ctx,
"EVAL %s 1 stock 5",
DECREMENT_STOCK);
if (r && r->type == REDIS_REPLY_INTEGER) {
std::cout << "차감 후 재고: " << r->integer << "\n"; // 95
}
freeReplyObject(r);
redisFree(ctx);
return 0;
}
redis-plus-plus로 EVAL
// lua_redispp.cpp
#include <sw/redis++/redis++.h>
#include <iostream>
using namespace sw::redis;
int main() {
auto redis = Redis("tcp://127.0.0.1:6379");
redis.set("stock", "100");
std::string script = R"(
local current = redis.call('GET', KEYS[1])
if not current then return -1 end
local stock = tonumber(current)
local amount = tonumber(ARGV[1])
if stock < amount then return -2 end
redis.call('SET', KEYS[1], stock - amount)
return stock - amount
)";
auto result = redis.eval<long long>(script, {"stock"}, {"5"});
std::cout << "차감 후 재고: " << result << "\n";
return 0;
}
EVALSHA로 스크립트 캐싱
동일 스크립트를 반복 실행할 때, SCRIPT LOAD로 SHA1 해시를 얻고 EVALSHA로 호출하면 네트워크 전송량이 줄어듭니다.
// hiredis EVALSHA
redisReply* r = (redisReply*)redisCommand(ctx, "SCRIPT LOAD %s", DECREMENT_STOCK);
std::string sha = r->str;
freeReplyObject(r);
r = (redisReply*)redisCommand(ctx, "EVALSHA %s 1 stock 5", sha.c_str());
freeReplyObject(r);
// redis-plus-plus는 eval 시 내부적으로 스크립트 캐싱 활용 가능
분산 락 해제 Lua (같은 토큰일 때만 DEL)
-- unlock.lua
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
// C++에서 Lua 락 해제
const char* UNLOCK = "if redis.call('GET',KEYS[1])==ARGV[1] then return redis.call('DEL',KEYS[1]) else return 0 end";
redisReply* r = (redisReply*)redisCommand(ctx, "EVAL %s 1 lock:resource %s", UNLOCK, token.c_str());
freeReplyObject(r);
4. 트랜잭션 MULTI/EXEC
트랜잭션이란?
MULTI로 트랜잭션을 시작하고, EXEC로 큐에 쌓인 명령을 한 번에 실행합니다. 중간에 DISCARD로 취소할 수 있습니다.
sequenceDiagram
participant C as 클라이언트
participant R as Redis
C->>R: MULTI
R->>C: OK
C->>R: DECRBY stock 1
R->>C: QUEUED
C->>R: HSET order:123 item A
R->>C: QUEUED
C->>R: EXEC
R->>C: [1, 1] (순서대로 결과)
hiredis 트랜잭션
// transaction_hiredis.cpp
#include <hiredis/hiredis.h>
#include <iostream>
int main() {
redisContext* ctx = redisConnect("127.0.0.1", 6379);
if (!ctx || ctx->err) return 1;
redisReply* r;
r = (redisReply*)redisCommand(ctx, "MULTI");
freeReplyObject(r);
r = (redisReply*)redisCommand(ctx, "SET a 1");
freeReplyObject(r);
r = (redisReply*)redisCommand(ctx, "SET b 2");
freeReplyObject(r);
r = (redisReply*)redisCommand(ctx, "EXEC");
if (r->type == REDIS_REPLY_ARRAY) {
for (size_t i = 0; i < r->elements; ++i) {
std::cout << "결과 " << i << ": " << r->element[i]->str << "\n";
}
}
freeReplyObject(r);
redisFree(ctx);
return 0;
}
redis-plus-plus 트랜잭션
// transaction_redispp.cpp
#include <sw/redis++/redis++.h>
#include <iostream>
using namespace sw::redis;
int main() {
auto redis = Redis("tcp://127.0.0.1:6379");
auto tx = redis.transaction();
tx.set("a", "1");
tx.set("b", "2");
auto replies = tx.exec();
std::cout << "트랜잭션 완료: " << replies.size() << " 건\n";
return 0;
}
주의: Redis 트랜잭션은 롤백을 지원하지 않습니다. 명령 실행 중 에러가 나도 이미 실행된 명령은 되돌릴 수 없습니다. 원자적 롤백이 필요하면 Lua를 사용하세요.
5. Redis Cluster
Cluster 개요
Redis Cluster는 데이터를 16384개 슬롯으로 나누어 여러 노드에 분산합니다. 키의 해시로 슬롯을 결정하고, 해당 슬롯을 가진 노드로 요청합니다.
flowchart TB
subgraph Cluster[Redis Cluster]
N1[노드 1: 0-5460]
N2[노드 2: 5461-10922]
N3[노드 3: 10923-16383]
end
C[클라이언트] --> N1
C --> N2
C --> N3
hiredis Cluster 한계
hiredis 단일 연결은 MOVED/ASK 리다이렉트를 자동 처리하지 않습니다. Cluster용으로는 redis-cluster 또는 redis-plus-plus RedisCluster를 사용하는 것이 좋습니다.
redis-plus-plus RedisCluster
// cluster_redispp.cpp
#include <sw/redis++/redis++.h>
#include <iostream>
using namespace sw::redis;
int main() {
// Cluster 노드 주소 (하나만 지정해도 됨)
RedisCluster cluster("tcp://127.0.0.1:7000");
cluster.set("key", "value");
auto val = cluster.get("key");
std::cout << "GET: " << *val << "\n";
return 0;
}
주의: Cluster에서 다중 키 명령(MGET, MSET 등)은 키가 같은 슬롯에 있어야 합니다. {hash_tag}를 사용해 슬롯을 맞출 수 있습니다.
// 같은 슬롯: user:123:name, user:123:age
cluster.set("user:{123}:name", "홍길동");
cluster.set("user:{123}:age", "30");
cluster.mget({"user:{123}:name", "user:{123}:age"});
6. 완전한 Redis 고급 예제
예제 1: 실시간 주문 알림 (Pub/Sub + 채널)
// order_notifier.cpp - 주문 완료 시 Pub/Sub으로 알림
#include <sw/redis++/redis++.h>
#include <iostream>
#include <thread>
#include <chrono>
#include <string>
using namespace sw::redis;
class OrderNotifier {
public:
OrderNotifier() : redis_("tcp://127.0.0.1:6379") {}
void start_subscriber() {
sub_thread_ = std::thread([this]() {
auto sub = redis_.subscriber();
sub.on_message([this](std::string channel, std::string msg) {
std::cout << "[알림] " << channel << ": " << msg << "\n";
});
sub.subscribe("channel:orders");
sub.consume();
});
}
void publish_order(const std::string& order_id) {
redis_.publish("channel:orders", "주문 완료 #" + order_id);
}
~OrderNotifier() {
if (sub_thread_.joinable()) sub_thread_.join();
}
private:
Redis redis_;
std::thread sub_thread_;
};
int main() {
OrderNotifier notifier;
notifier.start_subscriber();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
notifier.publish_order("12345");
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
예제 2: 대량 캐시 워밍업 (파이프라인)
// cache_warmup.cpp - 파이프라인으로 10000건 SET
#include <sw/redis++/redis++.h>
#include <iostream>
#include <chrono>
#include <string>
using namespace sw::redis;
void warmup_cache(Redis& redis, int count) {
auto pipe = redis.pipeline();
for (int i = 0; i < count; ++i) {
std::string key = "cache:product:" + std::to_string(i);
std::string val = "{\"id\":" + std::to_string(i) + ",\"name\":\"상품" + std::to_string(i) + "\"}";
pipe.set(key, val, std::chrono::seconds(300));
}
auto start = std::chrono::high_resolution_clock::now();
pipe.exec();
auto end = std::chrono::high_resolution_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "파이프라인 " << count << "건 SET: " << ms << " ms\n";
}
int main() {
auto redis = Redis("tcp://127.0.0.1:6379");
warmup_cache(redis, 10000);
return 0;
}
예제 3: Lua로 원자적 재고 차감·분산 락
// atomic_stock.cpp - Lua로 재고 차감 + 락 해제
#include <sw/redis++/redis++.h>
#include <iostream>
#include <string>
using namespace sw::redis;
const std::string DECREMENT_STOCK = R"(
local current = redis.call('GET', KEYS[1])
if not current then return {-1, '키 없음'} end
local stock = tonumber(current)
local amount = tonumber(ARGV[1])
if stock < amount then return {-2, '재고 부족'} end
redis.call('SET', KEYS[1], stock - amount)
return {stock - amount, 'OK'}
)";
int main() {
auto redis = Redis("tcp://127.0.0.1:6379");
redis.set("stock:item1", "100");
auto result = redis.eval<std::vector<std::string>>(DECREMENT_STOCK, {"stock:item1"}, {"5"});
std::cout << "차감 후: " << result[0] << ", " << result[1] << "\n";
return 0;
}
7. 자주 발생하는 에러와 해결법
에러 1: SUBSCRIBE 연결에서 GET/SET 시도
증상: (error) ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
원인: SUBSCRIBE한 연결은 구독 모드가 되어 일반 명령 사용 불가
해결법:
// ❌ 잘못된 사용
std::thread t([&]() {
redis.subscribe("channel");
redis.get("key"); // 에러!
});
// ✅ 발행·구독은 별도 연결
auto sub_redis = Redis("tcp://127.0.0.1:6379");
auto pub_redis = Redis("tcp://127.0.0.1:6379");
sub_redis.subscriber().subscribe("channel");
pub_redis.publish("channel", "msg");
에러 2: 파이프라인 중간에 다른 명령
증상: 응답 순서가 꼬이거나 잘못된 결과
원인: redisAppendCommand와 redisGetReply 사이에 redisCommand 호출
해결법:
// ❌ 잘못된 사용
redisAppendCommand(ctx, "SET a 1");
redisCommand(ctx, "GET b"); // 파이프라인 깨짐
redisGetReply(ctx, &reply);
// ✅ 파이프라인은 한 번에
redisAppendCommand(ctx, "SET a 1");
redisAppendCommand(ctx, "SET b 2");
redisGetReply(ctx, &reply); freeReplyObject(reply);
redisGetReply(ctx, &reply); freeReplyObject(reply);
에러 3: Lua 스크립트에서 nil 반환
증상: (error) ERR Error running script (call to f_xxx): @user_script:0: user_script:0: attempt to compare nil with number
원인: redis.call('GET', key)가 키가 없으면 nil 반환
해결법:
-- ❌ nil 체크 없음
local current = redis.call('GET', KEYS[1])
local stock = tonumber(current) -- nil이면 에러
-- ✅ nil 체크
local current = redis.call('GET', KEYS[1])
if not current then
return -1
end
local stock = tonumber(current)
에러 4: Cluster에서 CROSSSLOT
증상: (error) CROSSSLOT Keys in request don't hash to the same slot
원인: MGET, MSET 등에서 키가 서로 다른 슬롯에 있음
해결법:
// ❌ 다른 슬롯
cluster.mget({"user:1:name", "user:2:name"});
// ✅ 같은 슬롯: {tag} 사용
cluster.mget({"user:{1}:name", "user:{1}:age"});
에러 5: 파이프라인 스레드 안전성
증상: redis-plus-plus에서 멀티스레드에서 Pipeline 사용 시 크래시 또는 잘못된 결과
원인: Pipeline은 스레드 안전하지 않음
해결법:
// ❌ 스레드 안전하지 않음
auto pipe = redis.pipeline();
std::thread t1([&]() { pipe.set("a", "1"); });
std::thread t2([&]() { pipe.set("b", "2"); });
// ✅ 스레드당 별도 Pipeline 또는 mutex
std::mutex mtx;
std::thread t1([&]() {
std::lock_guard<std::mutex> lk(mtx);
auto pipe = redis.pipeline();
pipe.set("a", "1");
pipe.exec();
});
에러 6: EVAL 스크립트 인자 이스케이프
증상: Lua 스크립트에 전달한 문자열에 따옴표나 특수문자가 있으면 파싱 에러
원인: Redis 프로토콜에서 인자 이스케이프 필요
해결법:
// redis-plus-plus는 자동 이스케이프
redis.eval(script, {"key"}, {user_input}); // 안전
// hiredis 직접 호출 시 주의
// redisCommand에서 %s는 이스케이프되지 않음
// 사용자 입력은 반드시 검증 후 사용
8. 성능 최적화 팁
팁 1: 파이프라인 배치 크기
너무 큰 배치는 메모리 부담과 블로킹 시간 증가를 유발합니다. 100~500건 단위로 나누는 것이 권장됩니다.
const int BATCH_SIZE = 200;
for (int offset = 0; offset < total; offset += BATCH_SIZE) {
auto pipe = redis.pipeline();
for (int i = 0; i < BATCH_SIZE && offset + i < total; ++i) {
pipe.set(keys[offset + i], values[offset + i]);
}
pipe.exec();
}
팁 2: Lua 스크립트는 짧고 단순하게
Lua 스크립트는 Redis를 블로킹합니다. 긴 스크립트는 다른 클라이언트에 지연을 줍니다. 복잡한 로직은 클라이언트에서 처리하고, 원자성이 필요한 부분만 Lua로 처리하세요.
팁 3: Pub/Sub 대안 — Redis Streams
Pub/Sub은 메시지가 구독자에게만 전달되고 저장되지 않습니다. 메시지가 끊기면 유실됩니다. 영속성이 필요하면 Redis Streams를 사용하세요.
// Redis Streams: XADD, XREADGROUP
redis.xadd("stream:orders", "*", {"event", "order_complete", "id", "123"});
팁 4: 대량 조회 시 MGET + 파이프라인
MGET은 같은 슬롯(Cluster) 또는 단일 노드에서 여러 키를 한 번에 가져옵니다. 파이프라인과 함께 사용하면 대량 조회 시 효율적입니다.
// 1000개 키를 100개씩 MGET
for (size_t i = 0; i < keys.size(); i += 100) {
auto end = std::min(i + 100, keys.size());
std::vector<std::string> batch(keys.begin() + i, keys.begin() + end);
auto vals = redis.mget(batch);
}
팁 5: 연결 풀 크기 조정
ConnectionOptions opts;
opts.host = "127.0.0.1";
opts.port = 6379;
ConnectionPoolOptions pool_opts;
pool_opts.size = 20; // 동시 요청 수에 맞게 조정
pool_opts.wait_timeout = std::chrono::milliseconds(100);
auto redis = Redis(opts, pool_opts);
9. 프로덕션 패턴
패턴 1: Pub/Sub + 구독자 재연결
구독 연결이 끊어지면 자동 재연결하는 루프를 두는 것이 좋습니다.
void run_subscriber_with_reconnect(Redis& redis, const std::string& channel) {
while (true) {
try {
auto sub = redis.subscriber();
sub.on_message( {
std::cout << ch << ": " << msg << "\n";
});
sub.subscribe(channel);
sub.consume();
} catch (const std::exception& e) {
std::cerr << "구독 재연결: " << e.what() << "\n";
std::this_thread::sleep_for(std::chrono::seconds(5));
}
}
}
패턴 2: Lua 스크립트 파일로 관리
스크립트가 길어지면 .lua 파일로 분리하고, 빌드 시 포함하거나 런타임에 읽어옵니다.
std::string load_script(const std::string& path) {
std::ifstream f(path);
return std::string(std::istreambuf_iterator<char>(f), {});
}
auto script = load_script("scripts/decrement_stock.lua");
redis.eval<long long>(script, {"stock"}, {"5"});
패턴 3: 트랜잭션 + WATCH (낙관적 락)
WATCH로 키를 감시하고, EXEC 시점에 값이 바뀌었으면 재시도합니다.
// hiredis WATCH
redisReply* r = (redisReply*)redisCommand(ctx, "WATCH stock");
freeReplyObject(r);
r = (redisReply*)redisCommand(ctx, "MULTI");
// ... 명령 추가 ...
r = (redisReply*)redisCommand(ctx, "EXEC");
if (r->type == REDIS_REPLY_NIL) {
// 재시도
}
패턴 4: 설정 외부화
struct RedisAdvancedConfig {
std::string host = "127.0.0.1";
int port = 6379;
int pipeline_batch_size = 200;
int pool_size = 10;
};
RedisAdvancedConfig load_config() {
RedisAdvancedConfig c;
if (const char* h = std::getenv("REDIS_HOST")) c.host = h;
if (const char* p = std::getenv("REDIS_PORT")) c.port = std::stoi(p);
return c;
}
10. 구현 체크리스트
Pub/Sub
- 발행·구독을 별도 연결로 처리
- 구독 연결 끊김 시 재연결 로직
- 채널 이름 규칙 일관성 (예:
channel:orders)
파이프라인
- 배치 크기 100~500 권장
- 멀티스레드에서 Pipeline 사용 시 mutex 또는 스레드당 별도 Pipeline
-
redisAppendCommand와redisGetReply사이에 다른 명령 금지
Lua
- nil 체크·타입 검증
- KEYS·ARGV 개수 명시
- EVALSHA로 스크립트 캐싱 (선택)
트랜잭션
- Redis 트랜잭션은 롤백 없음 인지
- WATCH로 낙관적 락 (필요 시)
Cluster
- 다중 키 명령 시
{hash_tag}로 같은 슬롯 보장 - redis-plus-plus
RedisCluster또는 Cluster 지원 클라이언트 사용
11. 정리
| 기능 | 용도 | C++ 클라이언트 |
|---|---|---|
| Pub/Sub | 실시간 알림·채팅 | hiredis, redis-plus-plus |
| 파이프라인 | 대량 명령 RTT 감소 | hiredis, redis-plus-plus |
| Lua | 원자적 재고 차감·락 해제 | EVAL/EVALSHA |
| 트랜잭션 | MULTI/EXEC 명령 그룹 | hiredis, redis-plus-plus |
| Cluster | 슬롯 분산 | redis-plus-plus RedisCluster |
핵심 원칙:
- Pub/Sub은 발행·구독 연결 분리
- 파이프라인은 배치 크기 조절, 스레드 안전성 주의
- Lua는 nil·타입 검증, 원자성 활용
- Cluster는 hash tag로 같은 슬롯 보장
이전 글 Redis 클라이언트 완벽 가이드(#52-2)에서 기초를 익혔다면, 이 글의 Pub/Sub·파이프라인·Lua·트랜잭션·Cluster를 실무에 적용해 보세요.
참고 자료
- Redis 공식 문서 — Pub/Sub
- Redis 공식 문서 — Lua 스크립팅
- Redis 공식 문서 — 트랜잭션
- Redis 공식 문서 — Cluster
- redis-plus-plus GitHub
- hiredis GitHub
한 줄 요약: Pub/Sub·파이프라인·Lua 스크립팅·트랜잭션·Cluster를 마스터해 Redis 고급 활용을 완성하세요.
관련 글
- C++ Redis 클라이언트 완벽 가이드 | hiredis·redis-plus-plus·캐싱·세션·분산락
- C++ Redis 완전 실전 가이드 | hiredis·redis-plus-plus
- C++ 데이터베이스 연동 완벽 가이드 | SQLite·PostgreSQL·연결 풀·트랜잭션 [#31-3]