C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]
이 글의 핵심
C++17 스레드 풀: 작업 큐, std::async 대안, 데드락·경쟁 조건 방지, 성능 측정, 프로덕션 패턴. 실무 문제 시나리오와 해결법. HTTP 요청을 병렬로 처리하려고 std::async나 std::thread를 루프 안에서 호출했습니다. 요청이 1000개 들어오니 스레드가 1000개 생성되었고, 컨텍스트 스위칭·스택 메모리로 인해 시스템이 멈추는 것처럼 느려졌습니다.
들어가며: 작업마다 스레드를 만들면 안 되는 이유
”요청 1000개 처리하는데 스레드 1000개가 생성돼요”
HTTP 요청을 병렬로 처리하려고 std::async나 std::thread를 루프 안에서 호출했습니다. 요청이 1000개 들어오니 스레드가 1000개 생성되었고, 컨텍스트 스위칭·스택 메모리로 인해 시스템이 멈추는 것처럼 느려졌습니다.
문제의 코드:
// ❌ 나쁜 예: 작업마다 스레드 생성
void process_requests(const std::vector<Request>& requests) {
std::vector<std::thread> threads;
for (const auto& req : requests) {
threads.emplace_back([&req]() {
handle_request(req); // 1000개 요청 → 1000개 스레드
});
}
for (auto& t : threads) t.join();
}
스레드 풀을 쓰면:
// ✅ 좋은 예: 고정된 워커 스레드가 작업 큐에서 가져와 처리
ThreadPool pool(std::thread::hardware_concurrency());
for (const auto& req : requests) {
pool.submit([&req]() { handle_request(req); });
}
pool.wait(); // CPU 코어 수만큼만 스레드 유지
원인: 스레드 생성·파괴 비용이 크고, 과도한 스레드는 메모리와 스케줄링 부담을 줍니다. 스레드 풀은 미리 생성한 워커들이 작업 큐에서 작업을 가져와 처리하므로, 스레드 수를 제한하면서도 병렬 처리가 가능합니다.
이 글을 읽으면:
- 스레드 풀의 개념과 동작 원리를 이해할 수 있습니다.
- C++17로 완전한 스레드 풀을 구현할 수 있습니다.
- 데드락·경쟁 조건 등 자주 발생하는 에러를 피할 수 있습니다.
- 성능 벤치마크와 프로덕션 패턴을 적용할 수 있습니다.
문제 시나리오
시나리오 1: 웹 서버 요청 처리
초당 1000개 HTTP 요청을 처리하는 서버에서, 요청마다 std::async를 호출하면 스레드가 무한정 늘어납니다. 스레드 풀에 작업을 제출하면 코어 수(예: 8개)만큼의 워커가 큐에서 순차적으로 가져와 처리합니다.
시나리오 2: 대량 이미지 리사이징
10,000장의 이미지를 썸네일로 변환할 때, std::thread를 10,000번 생성하면 메모리 부족이나 시스템 과부하가 발생할 수 있습니다. 스레드 풀에 8~16개 워커를 두고 작업을 분배하면 안정적으로 처리됩니다.
시나리오 3: 병렬 정렬/검색
대용량 배열을 병렬로 정렬하거나 검색할 때, 재귀적으로 std::async를 호출하면 작업 수만큼 future가 생성되어 오버헤드가 큽니다. 스레드 풀 + 작업 분할로 깊이를 제한하면 효율적입니다.
시나리오 4: 게임 엔진 작업 스케줄링
물리, 렌더링, AI 등 여러 시스템이 매 프레임 실행될 때, 각 시스템 내부에서 세부 작업을 스레드 풀에 제출하면 프레임 간 일관된 스레드 사용이 가능합니다.
시나리오 5: 배치 ETL 파이프라인
DB에서 읽은 레코드를 변환·필터링·저장하는 파이프라인에서, 변환 단계를 스레드 풀에 병렬로 제출하면 처리량이 향상됩니다.
시나리오 6: 실시간 로그 집계
여러 소스에서 들어오는 로그를 실시간으로 집계할 때, 파싱·집계 작업을 스레드 풀에 제출하고 결과를 메인 스레드에서 수집하면 블로킹 없이 처리할 수 있습니다.
시나리오 7: “메모리 사용량이 계속 증가해요”
// ❌ 나쁜 예: 제출 속도 > 처리 속도 → 큐 무한 증가
while (true) {
pool.submit([&]() { process_incoming_request(); });
// 처리보다 빠르게 제출 → OOM 위험
}
원인: 작업 큐에 상한이 없어 제출 속도가 처리 속도를 넘으면 메모리가 폭증합니다. 해결: 큐 크기 제한 + 블로킹 submit 또는 backpressure 적용.
시나리오 8: “서버 종료 시 진행 중 작업이 날아가요”
// ❌ 나쁜 예: 소멸자에서 즉시 stop_ = true
~ThreadPool() {
stop_ = true;
condition_.notify_all();
for (auto& w : workers_) w.join();
// 큐에 남은 1000개 작업은 처리되지 않고 버려짐
}
원인: Graceful shutdown 없이 즉시 중단하면 큐에 남은 작업이 손실됩니다. 해결: shutdown_when_idle()로 큐가 비고 진행 중 작업이 완료될 때까지 대기 후 종료합니다.
시나리오 9: “람다 캡처로 크래시가 나요”
// ❌ 나쁜 예: 참조 캡처 → use-after-free
void process(const std::string& path) {
pool.submit([&path]() {
load_file(path); // path는 process() 반환 후 무효화
});
} // path 소멸, 워커는 아직 실행 전
원인: [&]로 캡처한 지역 변수가 작업 실행 전에 스코프를 벗어납니다. 해결: 값 캡처 [path] 또는 std::shared_ptr 사용.
목차
- 스레드 풀 아키텍처
- 기본 스레드 풀 구현
- 완전한 스레드 풀 (future 반환)
- 완전한 스레드 풀 예제 (작업 큐·워커·제출·종료)
- 우선순위 큐·작업 취소
- 자주 발생하는 에러와 해결법
- 베스트 프랙티스
- 성능 벤치마크
- 프로덕션 패턴
- 실전 예제
1. 스레드 풀 아키텍처
핵심 구성 요소
flowchart TB
subgraph main["메인 스레드"]
S1[submit 작업1]
S2[submit 작업2]
S3[submit 작업3]
end
subgraph queue["작업 큐"]
Q[FIFO 큐]
end
subgraph workers["워커 스레드"]
W1[워커 1]
W2[워커 2]
W3[워커 3]
end
S1 --> Q
S2 --> Q
S3 --> Q
Q --> W1
Q --> W2
Q --> W3
동작 흐름:
- 메인 스레드가
submit()으로 작업(함수/람다)을 큐에 넣습니다. - 워커 스레드들이 큐에서 작업을 꺼내 실행합니다.
- 큐가 비면 워커는
condition_variable으로 대기합니다. - 새 작업이 들어오면 한 워커가 깨어나 처리합니다.
작업 제출 시퀀스 다이어그램
sequenceDiagram
participant M as 메인 스레드
participant Q as 작업 큐
participant W1 as 워커 1
participant W2 as 워커 2
M->>Q: submit(task1)
M->>Q: submit(task2)
Note over Q: notify_one()
Q->>W1: task1 꺼냄
W1->>W1: task1 실행
Q->>W2: task2 꺼냄
W2->>W2: task2 실행
M->>M: wait() 또는 get()
W1-->>M: 완료
W2-->>M: 완료
워커 상태 전이
stateDiagram-v2
[*] --> 대기: 풀 시작
대기 --> 실행: 큐에서 작업 획득
실행 --> 대기: 작업 완료, 큐 비어있음
실행 --> 대기: 작업 완료, 다음 작업 있음
대기 --> 종료: stop_ && 큐 비어있음
std::async vs 스레드 풀
| 항목 | std::async | 스레드 풀 |
|---|---|---|
| 스레드 수 | 작업마다 생성 가능 | 고정 (코어 수 등) |
| 오버헤드 | future·스레드 생성 비용 | 큐 접근만 |
| 제어 | 제한 어려움 | 명시적 제어 |
| 적합한 경우 | 소량 작업, 간단한 병렬화 | 대량 작업, 서버 |
2. 기본 스레드 풀 구현
최소 동작 코드
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads) : stop_(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
template<typename F>
void submit(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_) throw std::runtime_error("submit on stopped pool");
tasks_.emplace(std::forward<F>(f));
}
condition_.notify_one();
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& w : workers_) w.join();
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};
사용 예:
int main() {
ThreadPool pool(4);
for (int i = 0; i < 8; ++i) {
pool.submit([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "task " << i << " done\n";
});
}
return 0; // 소멸자에서 join
}
주의: submit은 비동기입니다. 작업 완료를 기다리려면 별도 동기화가 필요합니다. 아래 완전한 구현에서 std::future 반환으로 해결합니다.
3. 완전한 스레드 풀 (future 반환)
결과를 받을 수 있는 submit
#include <atomic>
#include <future>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
#include <memory>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads)
: stop_(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] { worker_loop(); });
}
}
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (stop_) throw std::runtime_error("submit on stopped pool");
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return result;
}
void wait() {
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_empty_.wait(lock, [this] {
return tasks_.empty() && busy_count_ == 0;
});
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& w : workers_) w.join();
}
private:
void worker_loop() {
for (;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
++busy_count_;
}
task();
{
std::unique_lock<std::mutex> lock(queue_mutex_);
--busy_count_;
if (tasks_.empty() && busy_count_ == 0)
condition_empty_.notify_all();
}
}
}
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::condition_variable condition_empty_;
std::atomic<size_t> busy_count_{0};
bool stop_;
};
C++17 result_of 대체 (C++20에서는 std::invoke_result 사용):
// C++17: result_of는 C++20에서 deprecated
// C++20에서는:
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>> {
using return_type = std::invoke_result_t<F, Args...>;
// ...
}
사용 예: 병렬 합산:
int main() {
ThreadPool pool(std::thread::hardware_concurrency());
std::vector<int> data(1000000, 1);
auto sum_range = [&data](size_t start, size_t end) {
int sum = 0;
for (size_t i = start; i < end; ++i) sum += data[i];
return sum;
};
const size_t chunk = data.size() / 4;
std::vector<std::future<int>> futures;
for (int i = 0; i < 4; ++i) {
size_t s = i * chunk;
size_t e = (i == 3) ? data.size() : (i + 1) * chunk;
futures.push_back(pool.submit(sum_range, s, e));
}
int total = 0;
for (auto& f : futures) total += f.get();
std::cout << "sum = " << total << "\n";
}
4. 완전한 스레드 풀 예제 (작업 큐·워커·제출·종료)
단계별 동작 흐름
아래 예제는 작업 큐 생성 → 워커 스레드 기동 → 작업 제출(void/future) → Graceful Shutdown까지 한 번에 보여주는 실행 가능한 코드입니다.
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class CompleteThreadPool {
public:
explicit CompleteThreadPool(size_t num_threads) : stop_(false) {
if (num_threads == 0) num_threads = 1;
workers_.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this, i] { worker_loop(i); });
}
}
// 1. void 반환 작업 제출 (fire-and-forget)
template<typename F>
void submit(F&& f) {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
if (stop_) throw std::runtime_error("submit on stopped pool");
tasks_.emplace(std::function<void()>(std::forward<F>(f)));
}
condition_.notify_one();
}
// 2. future 반환 작업 제출 (결과 수신)
template<typename F, typename... Args>
auto submit_with_result(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>> {
using return_type = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> result = task->get_future();
submit([task]() { (*task)(); });
return result;
}
// 3. Graceful Shutdown: 큐 비우기 + 진행 중 작업 완료 대기
void shutdown() {
{
std::lock_guard<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& w : workers_) {
if (w.joinable()) w.join();
}
workers_.clear();
}
// 4. 유휴 상태까지 대기 후 shutdown
void shutdown_when_idle() {
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_idle_.wait(lock, [this] {
return tasks_.empty() && busy_count_ == 0;
});
stop_ = true;
lock.unlock();
condition_.notify_all();
for (auto& w : workers_) {
if (w.joinable()) w.join();
}
workers_.clear();
}
~CompleteThreadPool() { shutdown(); }
private:
void worker_loop(size_t id) {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
++busy_count_;
}
try {
task();
} catch (const std::exception& e) {
std::cerr << "[워커 " << id << "] 예외: " << e.what() << "\n";
}
{
std::lock_guard<std::mutex> lock(queue_mutex_);
--busy_count_;
if (tasks_.empty() && busy_count_ == 0)
condition_idle_.notify_all();
}
}
}
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::condition_variable condition_idle_;
std::atomic<size_t> busy_count_{0};
std::atomic<bool> stop_;
};
사용 예: 작업 제출과 Shutdown
CompleteThreadPool pool(4);
for (int i = 0; i < 4; ++i)
pool.submit([i]() { /* 작업 처리 */ });
auto f = pool.submit_with_result( { return 42; });
std::cout << f.get() << "\n";
pool.shutdown_when_idle();
실행 흐름 요약:
- 작업 큐:
std::queue<std::function<void()>>+std::mutex+condition_variable - 워커 스레드:
worker_loop()에서 큐에서 작업을 꺼내 실행, 예외 처리 포함 - 작업 제출:
submit()(void),submit_with_result()(future) - Shutdown:
shutdown()(즉시),shutdown_when_idle()(유휴 시)
5. 우선순위 큐·작업 취소
우선순위 작업 큐
#include <queue>
struct PriorityTask {
int priority;
std::function<void()> task;
bool operator<(const PriorityTask& o) const {
return priority < o.priority; // 높은 숫자 = 높은 우선순위
}
};
class PriorityThreadPool {
// ...
std::priority_queue<PriorityTask> tasks_;
// submit 시 priority 인자 추가
};
Graceful Shutdown
void shutdown() {
{
std::unique_lock<std::mutex> lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for (auto& w : workers_) w.join();
workers_.clear();
}
// 소멸자에서 즉시 중단 대신, 큐 비우기 대기 옵션
void shutdown_when_idle() {
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_empty_.wait(lock, [this] {
return tasks_.empty() && busy_count_ == 0;
});
stop_ = true;
condition_.notify_all();
lock.unlock();
for (auto& w : workers_) w.join();
}
6. 자주 발생하는 에러와 해결법
문제 1: 람다에서 참조 캡처로 use-after-free
증상: 크래시, 잘못된 결과
원인: [&]로 캡처한 지역 변수가 스레드가 실행되기 전에 스코프를 벗어남
// ❌ 잘못된 사용
void bad() {
int x = 42;
pool.submit([&]() { std::cout << x; }); // x는 이미 파괴됐을 수 있음
} // x 소멸
// ✅ 올바른 사용: 값 캡처
pool.submit([x]() { std::cout << x; });
// 또는 shared_ptr로 공유
auto data = std::make_shared<Data>(...);
pool.submit([data]() { process(data); });
문제 2: submit 내부에서 pool에 다시 submit (데드락 가능)
증상: 교착 상태, 프로그램 멈춤
원인: 큐 락을 잡은 채로 submit이 호출되고, 내부 작업이 또 submit을 기다리는 경우
// ❌ 위험한 패턴
pool.submit([&pool]() {
auto result = heavy_compute();
pool.submit([result]() { save(result); }); // 데드락 가능
});
// ✅ 해결: submit은 락 밖에서 notify
// (우리 구현은 이미 그렇게 되어 있음)
// 또는 재귀 submit을 별도 큐에 넣고 메인 루프에서 처리
문제 3: stop_ 플래그만으로는 진행 중 작업 중단 불가
증상: shutdown 후에도 작업이 계속 실행됨
해결법: 각 작업 내부에서 주기적으로 stop_requested() 체크
void long_running_task(std::atomic<bool>& stop) {
for (int i = 0; i < 1000000; ++i) {
if (stop.load()) return;
do_work(i);
}
}
문제 4: 예외가 워커를 종료시킴
증상: 워커가 예외로 죽어서 스레드 수가 줄어듦
해결법: 워커 루프에서 try-catch
void worker_loop() {
for (;;) {
// ...
try {
task();
} catch (const std::exception& e) {
std::cerr << "task exception: " << e.what() << "\n";
} catch (...) {
std::cerr << "unknown task exception\n";
}
}
}
문제 5: future.get()에서 데드락
증상: 메인 스레드가 멈춤
원인: 스레드 풀 크기가 1이고, 메인 스레드가 get()으로 대기하는 동안 그 유일한 워커가 다른 작업을 기다리는 경우
// ❌ 위험: 단일 스레드 풀
ThreadPool pool(1);
auto f = pool.submit([&pool]() {
auto inner = pool.submit( { return 1; });
return inner.get(); // 데드락: 유일한 워커가 inner 완료 대기
});
// ✅ 해결: 스레드 풀 크기 >= 2 이상
// 또는 재귀 submit을 피하고, 작업을 평탄화
문제 6: 캐시 라인 분할 (False Sharing)
증상: 멀티스레드에서 카운터 증가 시 예상보다 느림
해결법: 스레드별 데이터를 alignas(64)로 캐시 라인 경계 정렬
문제 7: 큐 무한 증가 (Backpressure 없음)
증상: 메모리 사용량이 계속 증가하다 OOM 발생
원인: 제출 속도가 처리 속도보다 빠를 때 큐에 작업이 무한히 쌓임
해결법: 큐 크기 제한 + 블로킹 submit
template<typename F>
void submit_blocking(F&& f, size_t max_queue_size = 1000) {
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_full_.wait(lock, [this, max_queue_size] {
return tasks_.size() < max_queue_size;
});
tasks_.emplace(std::forward<F>(f));
condition_.notify_one();
}
문제 8: shared_ptr 순환 참조
증상: pool이 task를 들고, task가 pool을 캡처하면 메모리 누수
해결법: weak_ptr로 pool 참조 후 lock()으로 유효성 확인
7. 베스트 프랙티스
1. 람다 캡처: 값 캡처 우선
// ✅ 좋은 예: 값 캡처
pool.submit([path = std::string(path)]() { process(path); });
// ✅ shared_ptr로 공유 시
auto data = std::make_shared<Data>(...);
pool.submit([data]() { process(data); });
2. 워커 수 설정
// CPU 바운드: 코어 수
size_t n = std::thread::hardware_concurrency();
// I/O 바운드: 2~4배
size_t n = std::thread::hardware_concurrency() * 2;
3. 예외 처리: 워커 루프에서 try-catch
void worker_loop() {
for (;;) {
// ...
try {
task();
} catch (const std::exception& e) {
log_error(e.what());
} catch (...) {
log_error("unknown exception");
}
}
}
4. 재귀 submit 시 데드락 방지
// ❌ 위험: 단일 스레드 풀에서 재귀 submit + get()
ThreadPool pool(1);
auto f = pool.submit([&pool]() {
auto inner = pool.submit( { return 1; });
return inner.get(); // 데드락
});
// ✅ 해결: 풀 크기 >= 2 또는 작업 평탄화
ThreadPool pool(std::max(2uz, std::thread::hardware_concurrency()));
5. Shutdown 시점 명확히
// 서버 종료 시: 큐에 남은 작업까지 처리
pool.shutdown_when_idle();
// 긴급 종료 시: 즉시 중단
pool.shutdown();
6. 큐 크기 제한 (Backpressure)
// 제출 속도 > 처리 속도 시 OOM 방지
void submit_blocking(F&& f, size_t max_queue = 1000) {
std::unique_lock<std::mutex> lock(queue_mutex_);
condition_full_.wait(lock, [this, max_queue] {
return tasks_.size() < max_queue;
});
tasks_.emplace(std::forward<F>(f));
condition_.notify_one();
}
7. 모니터링 포인트
프로덕션에서 queue_size, busy_workers, total_submitted, total_completed 등을 추적합니다.
8. 성능 벤치마크
벤치마크 설정
- CPU: 8코어 (예: Apple M1 / Intel i7)
- 작업: 100만 개 정수 합산을 8개 청크로 분할
- 비교:
std::asyncvs 스레드 풀
예시 결과 (표)
| 방식 | 10만 작업 | 100만 작업 | 스레드 수 |
|---|---|---|---|
| std::async (매번) | 2.3초 | 23초 | 100만 개 생성 |
| 스레드 풀 (8 워커) | 0.15초 | 1.5초 | 8개 고정 |
| 스레드 풀 (16 워커) | 0.14초 | 1.4초 | 16개 고정 |
벤치마크 코드 예시
void benchmark_pool(ThreadPool& pool, size_t num_tasks) {
auto start = std::chrono::high_resolution_clock::now();
std::vector<std::future<int>> futures;
for (size_t i = 0; i < num_tasks; ++i)
futures.push_back(pool.submit([i]() { return static_cast<int>(i); }));
for (auto& f : futures) f.get();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start).count();
std::cout << "pool " << num_tasks << ": " << elapsed << " ms\n";
}
최적 워커 수
- CPU 바운드:
std::thread::hardware_concurrency()(코어 수) - I/O 바운드: 코어 수의 2~4배
- 혼합: 프로파일링으로 조정
7. 프로덕션 패턴
패턴 1: 싱글톤 스레드 풀
class ThreadPoolSingleton {
public:
static ThreadPool& instance() {
static ThreadPool pool(std::thread::hardware_concurrency());
return pool;
}
};
// 사용
ThreadPoolSingleton::instance().submit([]{ do_work(); });
패턴 2: 스레드 풀 크기 설정
// 환경 변수 또는 설정 파일에서 읽기
size_t get_pool_size() {
if (const char* p = std::getenv("THREAD_POOL_SIZE")) {
return std::max(1uz, static_cast<size_t>(std::atoi(p)));
}
return std::thread::hardware_concurrency();
}
패턴 3: 작업 타임아웃
template<typename F>
auto submit_with_timeout(F&& f, std::chrono::milliseconds timeout) {
auto result = submit(std::forward<F>(f));
if (result.wait_for(timeout) == std::future_status::timeout) {
// 타임아웃 처리: 작업은 백그라운드에서 계속 실행됨
throw std::runtime_error("task timeout");
}
return result.get();
}
패턴 4: 배치 제출로 락 경합 감소
template<typename F>
void submit_batch(std::vector<F>&& tasks) {
std::unique_lock<std::mutex> lock(queue_mutex_);
for (auto& t : tasks) {
tasks_.emplace(std::move(t));
}
condition_.notify_all(); // 한 번에 모두 깨움
}
패턴 5: Work Stealing (고급)
부하가 불균형할 때, 유휴 워커가 다른 워커의 로컬 큐에서 작업을 가져옵니다. 구현이 복잡하므로 cpp-series-51-3-multithreading-tuning 참고.
패턴 6: 모니터링
get_stats()로 queue_size, busy_workers, total_workers를 반환해 모니터링합니다.
패턴 7: 스레드 로컬 스토리지
thread_local로 워커별 캐시·DB 연결 등 독립 리소스를 둡니다.
패턴 8: 작업 그룹 (Task Group)
여러 작업을 한 번에 제출하고 모두 완료될 때까지 대기:
template<typename... Fs>
void submit_and_wait(Fs&&... fs) {
std::tuple<std::future<void>...> futures = {
submit(std::forward<Fs>(fs))...
};
std::apply( { (f.get(), ...); }, futures);
}
패턴 9: Work Stealing 개요
부하 불균형 시 유휴 워커가 다른 워커의 로컬 큐에서 작업을 훔칩니다. C++ 표준에는 없어 직접 구현하거나 BS::thread_pool 같은 라이브러리를 사용합니다.
10. 실전 예제
예제 1: 병렬 이미지 처리
void resize_images_parallel(const std::vector<std::string>& paths) {
ThreadPool pool(std::thread::hardware_concurrency());
std::vector<std::future<void>> futures;
for (const auto& path : paths)
futures.push_back(pool.submit([path]() {
auto thumb = resize(load_image(path), 128, 128);
save_image("thumb_" + path, thumb);
}));
for (auto& f : futures) f.get();
}
예제 2: Map-Reduce 스타일 합산
template<typename T>
T parallel_sum(const std::vector<T>& data) {
size_t num_workers = std::thread::hardware_concurrency();
ThreadPool pool(num_workers);
const size_t chunk = std::max(size_t(1), data.size() / num_workers);
std::vector<std::future<T>> futures;
for (size_t i = 0; i < data.size(); i += chunk) {
size_t end = std::min(i + chunk, data.size());
futures.push_back(pool.submit([&data, i, end]() {
T sum = 0;
for (size_t j = i; j < end; ++j) sum += data[j];
return sum;
}));
}
T total = 0;
for (auto& f : futures) total += f.get();
return total;
}
예제 3: 파이프라인 (Producer-Consumer)
Producer가 BlockingQueue에서 꺼낸 작업을 pool.submit()으로 제출하고, pool.wait()으로 완료를 기다립니다.
예제 4: 병렬 퀵소트 (작업 분할)
template<typename T>
void parallel_quicksort(std::vector<T>& arr, size_t left, size_t right,
ThreadPool& pool, int depth = 0) {
if (left >= right) return;
size_t pivot = partition(arr, left, right);
const int max_depth = 4; // 분할 깊이 제한
if (depth < max_depth) {
auto f = pool.submit([&, pivot, right]() {
parallel_quicksort(arr, pivot + 1, right, pool, depth + 1);
});
parallel_quicksort(arr, left, pivot - 1, pool, depth + 1);
f.get();
} else {
std::sort(arr.begin() + left, arr.begin() + right + 1);
}
}
예제 5: 병렬 HTTP 요청 배치
std::vector<Response> fetch_all(const std::vector<std::string>& urls) {
ThreadPool pool(std::min(urls.size(), size_t(16)));
std::vector<std::future<Response>> futures;
for (const auto& url : urls) futures.push_back(pool.submit([url]() { return http_get(url); }));
std::vector<Response> results;
for (auto& f : futures) results.push_back(f.get());
return results;
}
예제 6: 게임 렌더링 작업 분할
void render_frame(Scene& scene, ThreadPool& pool) {
const int num_chunks = 4;
std::vector<std::future<void>> futures;
for (int i = 0; i < num_chunks; ++i) {
size_t start = i * (scene.meshes.size() / num_chunks);
size_t end = (i + 1) * (scene.meshes.size() / num_chunks);
futures.push_back(pool.submit([&scene, start, end]() {
for (size_t j = start; j < end; ++j) scene.meshes[j].draw();
}));
}
for (auto& f : futures) f.get();
}
스레드 풀 라이브러리 비교
| 라이브러리 | 특징 | 라이선스 |
|---|---|---|
| BS::thread_pool | 헤더 전용, C++17, Work stealing | MIT |
| ctpl | 가벼움, 헤더 전용 | MIT |
| ThreadPool (progschj) | 단순, C++11 | Zlib |
| HPX | 대규모 분산, C++20 | BSL |
참고 자료
- cppreference - thread
- cppreference - condition_variable
- C++ Concurrency in Action
- BS::thread_pool, ctpl
구현 체크리스트
- 워커 수를
hardware_concurrency()또는 설정값으로 결정 -
submit시 람다는 값 캡처 또는shared_ptr사용 - 워커 루프에서 예외 처리 (try-catch)
- shutdown 시 진행 중 작업 완료 대기 옵션 제공
- 재귀 submit 시 데드락 가능성 검토 (풀 크기 >= 2)
- 프로덕션에서 큐 크기·통계 모니터링
- False sharing 방지 (스레드별 데이터 정렬)
정리
| 항목 | 설명 |
|---|---|
| 아키텍처 | 작업 큐 + 고정 워커 스레드 |
| 기본 구현 | mutex + condition_variable |
| future 반환 | packaged_task로 결과 수신 |
| 에러 방지 | 참조 캡처 금지, 예외 처리, 데드락 주의 |
| 성능 | std::async 대비 대량 작업에서 유리 |
| 프로덕션 | 싱글톤, 설정, 모니터링, 배치 제출 |
핵심 원칙:
- 스레드 수를 제한하고 작업 큐로 부하 분산
- 람다 캡처와 예외 처리로 안정성 확보
- 벤치마크로 워커 수와 패턴 검증
- 프로덕션에서는 모니터링과 graceful shutdown 적용
자주 묻는 질문 (FAQ)
Q. std::async 대신 스레드 풀을 써야 할 때는?
A. 작업 수가 수백~수천 개 이상이고, 스레드 생성 비용을 줄이고 싶을 때 스레드 풀을 사용합니다. 소량의 일회성 병렬 작업은 std::async로 충분합니다.
Q. 워커 수는 어떻게 정하면 되나요?
A. CPU 바운드 작업은 코어 수, I/O 바운드는 2~4배를 시도해 보고 프로파일링으로 조정합니다.
Q. 기존 라이브러리를 써도 되나요?
A. BS::thread_pool, ctpl 등 검증된 라이브러리를 사용해도 됩니다. 이 글의 구현은 원리 이해와 커스터마이징용입니다.
Q. 선행으로 읽으면 좋은 글은?
A. 원자 연산, 데이터 레이스와 뮤텍스, 태스크 큐를 먼저 읽으면 도움이 됩니다.
Q. 더 깊이 공부하려면?
A. cppreference - thread, C++ Concurrency in Action을 참고하세요.
한 줄 요약: 스레드 풀로 스레드 수를 제한하면서 대량 작업을 안정적으로 병렬 처리할 수 있습니다.
관련 글
- C++ 고급 프로파일링 완벽 가이드 | perf·gprof
- C++ 멀티스레드 성능 튜닝 | 락 프리·작업 훔치기·스레드 풀 [#51-3]
- C++ Lock-Free 프로그래밍 실전 | CAS·ABA·메모리 순서·고성능 큐 [#51-5]
- C++ Lock-Free 프로그래밍 실전 | CAS·ABA·메모리 순서·고성능 큐 [#34-3]
- C++ condition_variable 기초 |
- C++ Atomic Operations |