본문으로 건너뛰기
Previous
Next
C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]

C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]

C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]

이 글의 핵심

C++17 스레드 풀: 작업 큐, std::async 대안, 데드락·경쟁 조건 방지, 성능 측정, 프로덕션 패턴. 실무 문제 시나리오와 해결법. HTTP 요청을 병렬로 처리하려고 std::async나 std::thread를 루프 안에서 호출했습니다. 요청이 1000개 들어오니 스레드가 1000개 생성되었고, 컨텍스트 스위칭·스택 메모리로 인해 시스템이 멈추는 것처럼 느려졌습니다.

들어가며: 작업마다 스레드

일상 비유로 이해하기: 동시성은 주방에서 여러 요리를 동시에 하는 것과 비슷합니다. 한 명의 요리사(싱글 스레드)가 국을 끓이다가 불을 줄이고, 그 사이에 야채를 썰고, 다시 국을 확인하는 식이죠. 반면 병렬성은 요리사 여러 명(멀티 스레드)이 각자 다른 요리를 동시에 만드는 겁니다. 를 만들면 안 되는 이유

”요청 1000개 처리하는데 스레드 1000개가 생성돼요”

HTTP 요청을 병렬로 처리하려고 std::asyncstd::thread를 루프 안에서 호출했습니다. 요청이 1000개 들어오니 스레드가 1000개 생성되었고, 컨텍스트 스위칭·스택 메모리로 인해 시스템이 멈추는 것처럼 느려졌습니다. 문제의 코드:

// ❌ 나쁜 예: 작업마다 스레드 생성
void process_requests(const std::vector<Request>& requests) {
    std::vector<std::thread> threads;
    for (const auto& req : requests) {
        threads.emplace_back([&req]() {
            handle_request(req);  // 1000개 요청 → 1000개 스레드
        });
    }
    for (auto& t : threads) t.join();
}

스레드 풀을 쓰면:

// ✅ 좋은 예: 고정된 워커 스레드가 작업 큐에서 가져와 처리
// 실행 예제
ThreadPool pool(std::thread::hardware_concurrency());
for (const auto& req : requests) {
    pool.submit([&req]() { handle_request(req); });
}
pool.wait();  // CPU 코어 수만큼만 스레드 유지

원인: 스레드 생성·파괴 비용이 크고, 과도한 스레드는 메모리와 스케줄링 부담을 줍니다. 스레드 풀은 미리 생성한 워커들이 작업 큐에서 작업을 가져와 처리하므로, 스레드 수를 제한하면서도 병렬 처리가 가능합니다. 이 글을 읽으면:

  • 스레드 풀의 개념과 동작 원리를 이해할 수 있습니다.
  • C++17로 완전한 스레드 풀을 구현할 수 있습니다.
  • 데드락·경쟁 조건 등 자주 발생하는 에러를 피할 수 있습니다.
  • 성능 벤치마크와 프로덕션 패턴을 적용할 수 있습니다.

실무 적용 경험: 이 글은 대규모 C++ 프로젝트에서 실제로 겪은 문제와 해결 과정을 바탕으로 작성되었습니다. 책이나 문서에서 다루지 않는 실전 함정과 디버깅 팁을 포함합니다.

문제 시나리오

시나리오 1: 웹 서버 요청 처리

초당 1000개 HTTP 요청을 처리하는 서버에서, 요청마다 std::async를 호출하면 스레드가 무한정 늘어납니다. 스레드 풀에 작업을 제출하면 코어 수(예: 8개)만큼의 워커가 큐에서 순차적으로 가져와 처리합니다.

시나리오 2: 대량 이미지 리사이징

10,000장의 이미지를 썸네일로 변환할 때, std::thread를 10,000번 생성하면 메모리 부족이나 시스템 과부하가 발생할 수 있습니다. 스레드 풀에 8~16개 워커를 두고 작업을 분배하면 안정적으로 처리됩니다.

시나리오 3: 병렬 정렬/검색

대용량 배열을 병렬로 정렬하거나 검색할 때, 재귀적으로 std::async를 호출하면 작업 수만큼 future가 생성되어 오버헤드가 큽니다. 스레드 풀 + 작업 분할로 깊이를 제한하면 효율적입니다.

시나리오 4: 게임 엔진 작업 스케줄링

물리, 렌더링, AI 등 여러 시스템이 매 프레임 실행될 때, 각 시스템 내부에서 세부 작업을 스레드 풀에 제출하면 프레임 간 일관된 스레드 사용이 가능합니다.

시나리오 5: 배치 ETL 파이프라인

DB에서 읽은 레코드를 변환·필터링·저장하는 파이프라인에서, 변환 단계를 스레드 풀에 병렬로 제출하면 처리량이 향상됩니다.

시나리오 6: 실시간 로그 집계

여러 소스에서 들어오는 로그를 실시간으로 집계할 때, 파싱·집계 작업을 스레드 풀에 제출하고 결과를 메인 스레드에서 수집하면 블로킹 없이 처리할 수 있습니다.

시나리오 7: “메모리 사용량이 계속 증가해요”

// ❌ 나쁜 예: 제출 속도 > 처리 속도 → 큐 무한 증가
while (true) {
    pool.submit([&]() { process_incoming_request(); });
    // 처리보다 빠르게 제출 → OOM 위험
}

원인: 작업 큐에 상한이 없어 제출 속도가 처리 속도를 넘으면 메모리가 폭증합니다. 해결: 큐 크기 제한 + 블로킹 submit 또는 backpressure 적용.

시나리오 8: “서버 종료 시 진행 중 작업이 날아가요”

// ❌ 나쁜 예: 소멸자에서 즉시 stop_ = true
~ThreadPool() {
    stop_ = true;
    condition_.notify_all();
    for (auto& w : workers_) w.join();
    // 큐에 남은 1000개 작업은 처리되지 않고 버려짐
}

원인: Graceful shutdown 없이 즉시 중단하면 큐에 남은 작업이 손실됩니다. 해결: shutdown_when_idle()로 큐가 비고 진행 중 작업이 완료될 때까지 대기 후 종료합니다.

시나리오 9: “람다 캡처로 크래시가 나요”

// ❌ 나쁜 예: 참조 캡처 → use-after-free
void process(const std::string& path) {
    pool.submit([&path]() {
        load_file(path);  // path는 process() 반환 후 무효화
    });
}  // path 소멸, 워커는 아직 실행 전

원인: [&]로 캡처한 지역 변수가 작업 실행 전에 스코프를 벗어납니다. 해결: 값 캡처 [path] 또는 std::shared_ptr 사용.

1. 스레드 풀 아키텍처

핵심 구성 요소

flowchart TB
    subgraph main[메인 스레드]
        S1[submit 작업1]
        S2[submit 작업2]
        S3[submit 작업3]
    end
    subgraph queue[작업 큐]
        Q[FIFO 큐]
    end
    subgraph workers[워커 스레드]
        W1[워커 1]
        W2[워커 2]
        W3[워커 3]
    end
    S1 --> Q
    S2 --> Q
    S3 --> Q
    Q --> W1
    Q --> W2
    Q --> W3

동작 흐름:

  1. 메인 스레드가 submit()으로 작업(함수/람다)을 큐에 넣습니다.
  2. 워커 스레드들이 큐에서 작업을 꺼내 실행합니다.
  3. 큐가 비면 워커는 condition_variable으로 대기합니다.
  4. 새 작업이 들어오면 한 워커가 깨어나 처리합니다.

작업 제출 시퀀스 다이어그램

sequenceDiagram
    participant M as 메인 스레드
    participant Q as 작업 큐
    participant W1 as 워커 1
    participant W2 as 워커 2
    M->>Q: submit(task1)
    M->>Q: submit(task2)
    Note over Q: notify_one()
    Q->>W1: task1 꺼냄
    W1->>W1: task1 실행
    Q->>W2: task2 꺼냄
    W2->>W2: task2 실행
    M->>M: wait() 또는 get()
    W1-->>M: 완료
    W2-->>M: 완료

워커 상태 전이

stateDiagram-v2
    [*] --> 대기: 풀 시작
    대기 --> 실행: 큐에서 작업 획득
    실행 --> 대기: 작업 완료, 큐 비어있음
    실행 --> 대기: 작업 완료, 다음 작업 있음
    대기 --> 종료: stop_ && 큐 비어있음

std::async vs 스레드 풀

항목std::async스레드 풀
스레드 수작업마다 생성 가능고정 (코어 수 등)
오버헤드future·스레드 생성 비용큐 접근만
제어제한 어려움명시적 제어
적합한 경우소량 작업, 간단한 병렬화대량 작업, 서버

2. 기본 스레드 풀 구현

최소 동작 코드

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads) : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] {
                for (;;) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this] {
                            return stop_ || !tasks_.empty();
                        });
                        if (stop_ && tasks_.empty()) return;
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
        }
    }
    template<typename F>
    void submit(F&& f) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace(std::forward<F>(f));
        }
        condition_.notify_one();
    }
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& w : workers_) w.join();
    }
private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

사용 예:

int main() {
    ThreadPool pool(4);
    for (int i = 0; i < 8; ++i) {
        pool.submit([i] {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::cout << "task " << i << " done\n";
        });
    }
    return 0;  // 소멸자에서 join
}

주의: submit은 비동기입니다. 작업 완료를 기다리려면 별도 동기화가 필요합니다. 아래 완전한 구현에서 std::future 반환으로 해결합니다.

3. 완전한 스레드 풀 (future 반환)

결과를 받을 수 있는 submit

#include <atomic>
#include <future>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
#include <memory>
class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads)
        : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] { worker_loop(); });
        }
    }
    template<typename F, typename....Args>
    auto submit(F&& f, Args&&....args)
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<return_type> result = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return result;
    }
    void wait() {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        condition_empty_.wait(lock, [this] {
            return tasks_.empty() && busy_count_ == 0;
        });
    }
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& w : workers_) w.join();
    }
private:
    void worker_loop() {
        for (;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                condition_.wait(lock, [this] {
                    return stop_ || !tasks_.empty();
                });
                if (stop_ && tasks_.empty()) return;
                task = std::move(tasks_.front());
                tasks_.pop();
                ++busy_count_;
            }
            task();
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                --busy_count_;
                if (tasks_.empty() && busy_count_ == 0)
                    condition_empty_.notify_all();
            }
        }
    }
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    std::condition_variable condition_empty_;
    std::atomic<size_t> busy_count_{0};
    bool stop_;
};

C++17 result_of 대체 (C++20에서는 std::invoke_result 사용):

// C++17: result_of는 C++20에서 deprecated
// C++20에서는:
template<typename F, typename....Args>
auto submit(F&& f, Args&&....args)
    -> std::future<std::invoke_result_t<F, Args...>> {
    using return_type = std::invoke_result_t<F, Args...>;
    // ...
}

사용 예: 병렬 합산:

int main() {
    ThreadPool pool(std::thread::hardware_concurrency());
    std::vector<int> data(1000000, 1);
    auto sum_range = [&data](size_t start, size_t end) {
        int sum = 0;
        for (size_t i = start; i < end; ++i) sum += data[i];
        return sum;
    };
    const size_t chunk = data.size() / 4;
    std::vector<std::future<int>> futures;
    for (int i = 0; i < 4; ++i) {
        size_t s = i * chunk;
        size_t e = (i == 3) ? data.size() : (i + 1) * chunk;
        futures.push_back(pool.submit(sum_range, s, e));
    }
    int total = 0;
    for (auto& f : futures) total += f.get();
    std::cout << "sum = " << total << "\n";
}

4. 완전한 스레드 풀 예제 (작업 큐·워커·제출·종료)

단계별 동작 흐름

아래 예제는 작업 큐 생성 → 워커 스레드 기동 → 작업 제출(void/future) → Graceful Shutdown까지 한 번에 보여주는 실행 가능한 코드입니다.

#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
class CompleteThreadPool {
public:
    explicit CompleteThreadPool(size_t num_threads) : stop_(false) {
        if (num_threads == 0) num_threads = 1;
        workers_.reserve(num_threads);
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this, i] { worker_loop(i); });
        }
    }
    // 1. void 반환 작업 제출 (fire-and-forget)
    template<typename F>
    void submit(F&& f) {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace(std::function<void()>(std::forward<F>(f)));
        }
        condition_.notify_one();
    }
    // 2. future 반환 작업 제출 (결과 수신)
    template<typename F, typename....Args>
    auto submit_with_result(F&& f, Args&&....args)
        -> std::future<std::invoke_result_t<F, Args...>> {
        using return_type = std::invoke_result_t<F, Args...>;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> result = task->get_future();
        submit([task]() { (*task)(); });
        return result;
    }
    // 3. Graceful Shutdown: 큐 비우기 + 진행 중 작업 완료 대기
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& w : workers_) {
            if (w.joinable()) w.join();
        }
        workers_.clear();
    }
    // 4. 유휴 상태까지 대기 후 shutdown
    void shutdown_when_idle() {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        condition_idle_.wait(lock, [this] {
            return tasks_.empty() && busy_count_ == 0;
        });
        stop_ = true;
        lock.unlock();
        condition_.notify_all();
        for (auto& w : workers_) {
            if (w.joinable()) w.join();
        }
        workers_.clear();
    }
    ~CompleteThreadPool() { shutdown(); }
private:
    void worker_loop(size_t id) {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                condition_.wait(lock, [this] {
                    return stop_ || !tasks_.empty();
                });
                if (stop_ && tasks_.empty()) return;
                task = std::move(tasks_.front());
                tasks_.pop();
                ++busy_count_;
            }
            try {
                task();
            } catch (const std::exception& e) {
                std::cerr << "[워커 " << id << "] 예외: " << e.what() << "\n";
            }
            {
                std::lock_guard<std::mutex> lock(queue_mutex_);
                --busy_count_;
                if (tasks_.empty() && busy_count_ == 0)
                    condition_idle_.notify_all();
            }
        }
    }
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    std::condition_variable condition_idle_;
    std::atomic<size_t> busy_count_{0};
    std::atomic<bool> stop_;
};

사용 예: 작업 제출과 Shutdown

CompleteThreadPool pool(4);
for (int i = 0; i < 4; ++i)
    pool.submit([i]() { /* 작업 처리 */ });
auto f = pool.submit_with_result( { return 42; });
std::cout << f.get() << "\n";
pool.shutdown_when_idle();

실행 흐름 요약:

  1. 작업 큐: std::queue<std::function<void()>> + std::mutex + condition_variable
  2. 워커 스레드: worker_loop()에서 큐에서 작업을 꺼내 실행, 예외 처리 포함
  3. 작업 제출: submit() (void), submit_with_result() (future)
  4. Shutdown: shutdown() (즉시), shutdown_when_idle() (유휴 시)

5. 우선순위 큐·작업 취소

우선순위 작업 큐

#include <queue>
struct PriorityTask {
    int priority;
    std::function<void()> task;
    bool operator<(const PriorityTask& o) const {
        return priority < o.priority;  // 높은 숫자 = 높은 우선순위
    }
};
class PriorityThreadPool {
    // ...
    std::priority_queue<PriorityTask> tasks_;
    // submit 시 priority 인자 추가
};

Graceful Shutdown

void shutdown() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        stop_ = true;
    }
    condition_.notify_all();
    for (auto& w : workers_) w.join();
    workers_.clear();
}
// 소멸자에서 즉시 중단 대신, 큐 비우기 대기 옵션
void shutdown_when_idle() {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    condition_empty_.wait(lock, [this] {
        return tasks_.empty() && busy_count_ == 0;
    });
    stop_ = true;
    condition_.notify_all();
    lock.unlock();
    for (auto& w : workers_) w.join();
}

6. 자주 발생하는 에러와 해결법

문제 1: 람다에서 참조 캡처로 use-after-free

증상: 크래시, 잘못된 결과 원인: [&]로 캡처한 지역 변수가 스레드가 실행되기 전에 스코프를 벗어남

// ❌ 잘못된 사용
void bad() {
    int x = 42;
    pool.submit([&]() { std::cout << x; });  // x는 이미 파괴됐을 수 있음
}  // x 소멸
// ✅ 올바른 사용: 값 캡처
pool.submit([x]() { std::cout << x; });
// 또는 shared_ptr로 공유
auto data = std::make_shared<Data>(...);
pool.submit([data]() { process(data); });

문제 2: submit 내부에서 pool에 다시 submit (데드락 가능)

증상: 교착 상태, 프로그램 멈춤 원인: 큐 락을 잡은 채로 submit이 호출되고, 내부 작업이 또 submit을 기다리는 경우

// ❌ 위험한 패턴
pool.submit([&pool]() {
    auto result = heavy_compute();
    pool.submit([result]() { save(result); });  // 데드락 가능
});
// ✅ 해결: submit은 락 밖에서 notify
// (우리 구현은 이미 그렇게 되어 있음)
// 또는 재귀 submit을 별도 큐에 넣고 메인 루프에서 처리

문제 3: stop_ 플래그만으로는 진행 중 작업 중단 불가

증상: shutdown 후에도 작업이 계속 실행됨 해결법: 각 작업 내부에서 주기적으로 stop_requested() 체크

void long_running_task(std::atomic<bool>& stop) {
    for (int i = 0; i < 1000000; ++i) {
        if (stop.load()) return;
        do_work(i);
    }
}

문제 4: 예외가 워커를 종료시킴

증상: 워커가 예외로 죽어서 스레드 수가 줄어듦 해결법: 워커 루프에서 try-catch

void worker_loop() {
    for (;;) {
        // ...
        try {
            task();
        } catch (const std::exception& e) {
            std::cerr << "task exception: " << e.what() << "\n";
        } catch (...) {
            std::cerr << "unknown task exception\n";
        }
    }
}

문제 5: future.get()에서 데드락

증상: 메인 스레드가 멈춤 원인: 스레드 풀 크기가 1이고, 메인 스레드가 get()으로 대기하는 동안 그 유일한 워커가 다른 작업을 기다리는 경우

// ❌ 위험: 단일 스레드 풀
ThreadPool pool(1);
auto f = pool.submit([&pool]() {
    auto inner = pool.submit( { return 1; });
    return inner.get();  // 데드락: 유일한 워커가 inner 완료 대기
});
// ✅ 해결: 스레드 풀 크기 >= 2 이상
// 또는 재귀 submit을 피하고, 작업을 평탄화

문제 6: 캐시 라인 분할 (False Sharing)

증상: 멀티스레드에서 카운터 증가 시 예상보다 느림
해결법: 스레드별 데이터를 alignas(64)로 캐시 라인 경계 정렬

문제 7: 큐 무한 증가 (Backpressure 없음)

증상: 메모리 사용량이 계속 증가하다 OOM 발생 원인: 제출 속도가 처리 속도보다 빠를 때 큐에 작업이 무한히 쌓임 해결법: 큐 크기 제한 + 블로킹 submit

template<typename F>
void submit_blocking(F&& f, size_t max_queue_size = 1000) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    condition_full_.wait(lock, [this, max_queue_size] {
        return tasks_.size() < max_queue_size;
    });
    tasks_.emplace(std::forward<F>(f));
    condition_.notify_one();
}

문제 8: shared_ptr 순환 참조

증상: pool이 task를 들고, task가 pool을 캡처하면 메모리 누수
해결법: weak_ptr로 pool 참조 후 lock()으로 유효성 확인

7. 베스트 프랙티스

1. 람다 캡처: 값 캡처 우선

// ✅ 좋은 예: 값 캡처
pool.submit([path = std::string(path)]() { process(path); });
// ✅ shared_ptr로 공유 시
auto data = std::make_shared<Data>(...);
pool.submit([data]() { process(data); });

2. 워커 수 설정

// CPU 바운드: 코어 수
size_t n = std::thread::hardware_concurrency();
// I/O 바운드: 2~4배
size_t n = std::thread::hardware_concurrency() * 2;

3. 예외 처리: 워커 루프에서 try-catch

void worker_loop() {
    for (;;) {
        // ...
        try {
            task();
        } catch (const std::exception& e) {
            log_error(e.what());
        } catch (...) {
            log_error("unknown exception");
        }
    }
}

4. 재귀 submit 시 데드락 방지

// ❌ 위험: 단일 스레드 풀에서 재귀 submit + get()
ThreadPool pool(1);
auto f = pool.submit([&pool]() {
    auto inner = pool.submit( { return 1; });
    return inner.get();  // 데드락
});
// ✅ 해결: 풀 크기 >= 2 또는 작업 평탄화
ThreadPool pool(std::max(2uz, std::thread::hardware_concurrency()));

5. Shutdown 시점 명확히

// 서버 종료 시: 큐에 남은 작업까지 처리
pool.shutdown_when_idle();
// 긴급 종료 시: 즉시 중단
pool.shutdown();

6. 큐 크기 제한 (Backpressure)

// 제출 속도 > 처리 속도 시 OOM 방지
void submit_blocking(F&& f, size_t max_queue = 1000) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    condition_full_.wait(lock, [this, max_queue] {
        return tasks_.size() < max_queue;
    });
    tasks_.emplace(std::forward<F>(f));
    condition_.notify_one();
}

7. 모니터링 포인트

프로덕션에서 queue_size, busy_workers, total_submitted, total_completed 등을 추적합니다.

8. 성능 벤치마크

벤치마크 설정

  • CPU: 8코어 (예: Apple M1 / Intel i7)
  • 작업: 100만 개 정수 합산을 8개 청크로 분할
  • 비교: std::async vs 스레드 풀

예시 결과 (표)

방식10만 작업100만 작업스레드 수
std::async (매번)2.3초23초100만 개 생성
스레드 풀 (8 워커)0.15초1.5초8개 고정
스레드 풀 (16 워커)0.14초1.4초16개 고정

벤치마크 코드 예시

void benchmark_pool(ThreadPool& pool, size_t num_tasks) {
    auto start = std::chrono::high_resolution_clock::now();
    std::vector<std::future<int>> futures;
    for (size_t i = 0; i < num_tasks; ++i)
        futures.push_back(pool.submit([i]() { return static_cast<int>(i); }));
    for (auto& f : futures) f.get();
    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::high_resolution_clock::now() - start).count();
    std::cout << "pool " << num_tasks << ": " << elapsed << " ms\n";
}

최적 워커 수

  • CPU 바운드: std::thread::hardware_concurrency() (코어 수)
  • I/O 바운드: 코어 수의 2~4배
  • 혼합: 프로파일링으로 조정

7. 프로덕션 패턴

패턴 1: 싱글톤 스레드 풀

class ThreadPoolSingleton {
public:
    static ThreadPool& instance() {
        static ThreadPool pool(std::thread::hardware_concurrency());
        return pool;
    }
};
// 사용
ThreadPoolSingleton::instance().submit([]{ do_work(); });

패턴 2: 스레드 풀 크기 설정

// 환경 변수 또는 설정 파일에서 읽기
size_t get_pool_size() {
    if (const char* p = std::getenv("THREAD_POOL_SIZE")) {
        return std::max(1uz, static_cast<size_t>(std::atoi(p)));
    }
    return std::thread::hardware_concurrency();
}

패턴 3: 작업 타임아웃

template<typename F>
auto submit_with_timeout(F&& f, std::chrono::milliseconds timeout) {
    auto result = submit(std::forward<F>(f));
    if (result.wait_for(timeout) == std::future_status::timeout) {
        // 타임아웃 처리: 작업은 백그라운드에서 계속 실행됨
        throw std::runtime_error("task timeout");
    }
    return result.get();
}

패턴 4: 배치 제출로 락 경합 감소

template<typename F>
void submit_batch(std::vector<F>&& tasks) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    for (auto& t : tasks) {
        tasks_.emplace(std::move(t));
    }
    condition_.notify_all();  // 한 번에 모두 깨움
}

패턴 5: Work Stealing (고급)

부하가 불균형할 때, 유휴 워커가 다른 워커의 로컬 큐에서 작업을 가져옵니다. 구현이 복잡하므로 cpp-series-51-3-multithreading-tuning 참고.

패턴 6: 모니터링

get_stats()queue_size, busy_workers, total_workers를 반환해 모니터링합니다.

패턴 7: 스레드 로컬 스토리지

thread_local로 워커별 캐시·DB 연결 등 독립 리소스를 둡니다.

패턴 8: 작업 그룹 (Task Group)

여러 작업을 한 번에 제출하고 모두 완료될 때까지 대기:

template<typename....Fs>
void submit_and_wait(Fs&&....fs) {
    std::tuple<std::future<void>...> futures = {
        submit(std::forward<Fs>(fs))...
    };
    std::apply( { (f.get(), ...); }, futures);
}

패턴 9: Work Stealing 개요

부하 불균형 시 유휴 워커가 다른 워커의 로컬 큐에서 작업을 훔칩니다. C++ 표준에는 없어 직접 구현하거나 BS::thread_pool 같은 라이브러리를 사용합니다.

10. 실전 예제

예제 1: 병렬 이미지 처리

void resize_images_parallel(const std::vector<std::string>& paths) {
    ThreadPool pool(std::thread::hardware_concurrency());
    std::vector<std::future<void>> futures;
    for (const auto& path : paths)
        futures.push_back(pool.submit([path]() {
            auto thumb = resize(load_image(path), 128, 128);
            save_image("thumb_" + path, thumb);
        }));
    for (auto& f : futures) f.get();
}

예제 2: Map-Reduce 스타일 합산

template<typename T>
T parallel_sum(const std::vector<T>& data) {
    size_t num_workers = std::thread::hardware_concurrency();
    ThreadPool pool(num_workers);
    const size_t chunk = std::max(size_t(1), data.size() / num_workers);
    std::vector<std::future<T>> futures;
    for (size_t i = 0; i < data.size(); i += chunk) {
        size_t end = std::min(i + chunk, data.size());
        futures.push_back(pool.submit([&data, i, end]() {
            T sum = 0;
            for (size_t j = i; j < end; ++j) sum += data[j];
            return sum;
        }));
    }
    T total = 0;
    for (auto& f : futures) total += f.get();
    return total;
}

예제 3: 파이프라인 (Producer-Consumer)

Producer가 BlockingQueue에서 꺼낸 작업을 pool.submit()으로 제출하고, pool.wait()으로 완료를 기다립니다.

예제 4: 병렬 퀵소트 (작업 분할)

template<typename T>
void parallel_quicksort(std::vector<T>& arr, size_t left, size_t right,
                        ThreadPool& pool, int depth = 0) {
    if (left >= right) return;
    size_t pivot = partition(arr, left, right);
    const int max_depth = 4;  // 분할 깊이 제한
    if (depth < max_depth) {
        auto f = pool.submit([&, pivot, right]() {
            parallel_quicksort(arr, pivot + 1, right, pool, depth + 1);
        });
        parallel_quicksort(arr, left, pivot - 1, pool, depth + 1);
        f.get();
    } else {
        std::sort(arr.begin() + left, arr.begin() + right + 1);
    }
}

예제 5: 병렬 HTTP 요청 배치

std::vector<Response> fetch_all(const std::vector<std::string>& urls) {
    ThreadPool pool(std::min(urls.size(), size_t(16)));
    std::vector<std::future<Response>> futures;
    for (const auto& url : urls) futures.push_back(pool.submit([url]() { return http_get(url); }));
    std::vector<Response> results;
    for (auto& f : futures) results.push_back(f.get());
    return results;
}

예제 6: 게임 렌더링 작업 분할

void render_frame(Scene& scene, ThreadPool& pool) {
    const int num_chunks = 4;
    std::vector<std::future<void>> futures;
    for (int i = 0; i < num_chunks; ++i) {
        size_t start = i * (scene.meshes.size() / num_chunks);
        size_t end = (i + 1) * (scene.meshes.size() / num_chunks);
        futures.push_back(pool.submit([&scene, start, end]() {
            for (size_t j = start; j < end; ++j) scene.meshes[j].draw();
        }));
    }
    for (auto& f : futures) f.get();
}

스레드 풀 라이브러리 비교

라이브러리특징라이선스
BS::thread_pool헤더 전용, C++17, Work stealingMIT
ctpl가벼움, 헤더 전용MIT
ThreadPool (progschj)단순, C++11Zlib
HPX대규모 분산, C++20BSL

참고 자료


구현 체크리스트

  • 워커 수를 hardware_concurrency() 또는 설정값으로 결정
  • submit 시 람다는 값 캡처 또는 shared_ptr 사용
  • 워커 루프에서 예외 처리 (try-catch)
  • shutdown 시 진행 중 작업 완료 대기 옵션 제공
  • 재귀 submit 시 데드락 가능성 검토 (풀 크기 >= 2)
  • 프로덕션에서 큐 크기·통계 모니터링
  • False sharing 방지 (스레드별 데이터 정렬)

정리

항목설명
아키텍처작업 큐 + 고정 워커 스레드
기본 구현mutex + condition_variable
future 반환packaged_task로 결과 수신
에러 방지참조 캡처 금지, 예외 처리, 데드락 주의
성능std::async 대비 대량 작업에서 유리
프로덕션싱글톤, 설정, 모니터링, 배치 제출
핵심 원칙:
  1. 스레드 수를 제한하고 작업 큐로 부하 분산
  2. 람다 캡처와 예외 처리로 안정성 확보
  3. 벤치마크로 워커 수와 패턴 검증
  4. 프로덕션에서는 모니터링과 graceful shutdown 적용

자주 묻는 질문 (FAQ)

Q. std::async 대신 스레드 풀을 써야 할 때는?

A. 작업 수가 수백~수천 개 이상이고, 스레드 생성 비용을 줄이고 싶을 때 스레드 풀을 사용합니다. 소량의 일회성 병렬 작업은 std::async로 충분합니다.

Q. 워커 수는 어떻게 정하면 되나요?

A. CPU 바운드 작업은 코어 수, I/O 바운드는 2~4배를 시도해 보고 프로파일링으로 조정합니다.

Q. 기존 라이브러리를 써도 되나요?

A. BS::thread_pool, ctpl 등 검증된 라이브러리를 사용해도 됩니다. 이 글의 구현은 원리 이해와 커스터마이징용입니다.

Q. 선행으로 읽으면 좋은 글은?

A. 원자 연산, 데이터 레이스와 뮤텍스, 태스크 큐를 먼저 읽으면 도움이 됩니다.

Q. 더 깊이 공부하려면?

A. cppreference - thread, C++ Concurrency in Action을 참고하세요.

한 줄 요약: 스레드 풀로 스레드 수를 제한하면서 대량 작업을 안정적으로 병렬 처리할 수 있습니다.

관련 글

  • C++ 고급 프로파일링 완벽 가이드 | perf·gprof
  • C++ 멀티스레드 성능 튜닝 | 락 프리·작업 훔치기·스레드 풀 [#51-3]
  • C++ Lock-Free 프로그래밍 실전 | CAS·ABA·메모리 순서·고성능 큐 [#51-5]
  • C++ Lock-Free 프로그래밍 실전 | CAS·ABA·메모리 순서·고성능 큐 [#34-3]
  • C++ condition_variable 기초 |
  • C++ Atomic Operations |

심화 부록: 구현·운영 관점

이 부록은 앞선 본문에서 다룬 주제(「C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]」)를 구현·런타임·운영 관점에서 다시 압축합니다. 도메인별 세부 구현은 글마다 다르지만, 입력 검증 → 핵심 연산 → 부작용(I/O·네트워크·동시성) → 관측의 흐름으로 장애를 나누면 원인 추적이 빨라집니다.

내부 동작과 핵심 메커니즘

flowchart TD
  A[입력·요청·이벤트] --> B[파싱·검증·디코딩]
  B --> C[핵심 연산·상태 전이]
  C --> D[부작용: I/O·네트워크·동시성]
  D --> E[결과·관측·저장]
sequenceDiagram
  participant C as 클라이언트/호출자
  participant B as 경계(런타임·게이트웨이·프로세스)
  participant D as 의존성(API·DB·큐·파일)
  C->>B: 요청/이벤트
  B->>D: 조회·쓰기·RPC
  D-->>B: 지연·부분 실패·재시도 가능
  B-->>C: 응답 또는 오류(코드·상관 ID)
  • 불변 조건(Invariant): 버퍼 경계, 프로토콜 상태, 트랜잭션 격리, FD 상한 등 단계별로 문장으로 적어 두면 디버깅 비용이 줄어듭니다.
  • 결정성: 순수 층과 시간·네트워크·스케줄에 의존하는 층을 분리해야 테스트와 장애 분석이 쉬워집니다.
  • 경계 비용: 직렬화, 인코딩, syscall 횟수, 락 경합, 할당·GC, 캐시 미스를 의심 목록에 둡니다.
  • 백프레셔: 생산자가 소비자보다 빠를 때 버퍼·큐·스트림에서 속도를 줄이는 신호를 어디에 둘지 정의합니다.

프로덕션 운영 패턴

영역운영 관점 질문
관측성요청 단위 상관 ID, 에러율·지연 p95/p99, 의존성 타임아웃·재시도가 대시보드에 보이는가
안전성입력 검증·권한·비밀·감사 로그가 코드 경로마다 일관적인가
신뢰성재시도는 멱등 연산에만 적용되는가, 서킷 브레이커·백오프·DLQ가 있는가
성능캐시·배치 크기·커넥션 풀·인덱스·백프레셔가 데이터 규모에 맞는가
배포롤백 룬북, 카나리/블루그린, 마이그레이션·피처 플래그가 문서화되어 있는가
용량피크 트래픽·디스크·FD·스레드 풀 상한을 주기적으로 검증하는가

스테이징은 데이터 양·네트워크 RTT·동시성을 프로덕션에 가깝게 맞출수록 재현율이 올라갑니다.

확장 예시: 엔드투엔드 미니 시나리오

앞선 본문 주제(「C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]」)를 배포·운영 흐름에 맞춰 옮긴 체크리스트입니다. 도메인에 맞게 단계 이름만 바꿔 적용할 수 있습니다.

  1. 입력 계약 고정: 스키마·버전·최대 페이로드·타임아웃·에러 코드를 경계에 둔다.
  2. 핵심 경로 계측: 요청 ID, 단계별 지연, 외부 호출 결과 코드를 로그·메트릭·트레이스에서 한 흐름으로 본다.
  3. 실패 주입: 의존성 타임아웃·5xx·부분 데이터·락 대기를 스테이징에서 재현한다.
  4. 호환·롤백: 설정/마이그레이션/클라이언트 버전을 되돌릴 수 있는지 확인한다.
  5. 부하 후 검증: 피크 대비 p95/p99, 에러율, 리소스 상한, 알림 임계값을 점검한다.
handle(request):
  ctx = newCorrelationId()
  validated = validateSchema(request)
  authorize(validated, ctx)
  result = domainCore(validated)
  persistOrEmit(result, idempotentKey)
  recordMetrics(ctx, latency, outcome)
  return result

문제 해결(Troubleshooting)

증상가능 원인조치
간헐적 실패레이스, 타임아웃, 외부 의존성, DNS최소 재현 스크립트, 분산 트레이스·로그 상관관계, 재시도·서킷 설정 점검
성능 저하N+1, 동기 I/O, 락 경합, 과도한 직렬화, 캐시 미스프로파일러·APM으로 핫스팟 확인 후 한 가지씩 제거
메모리 증가캐시 무제한, 구독/리스너 누수, 대용량 버퍼, 커넥션 미반납상한·TTL·힙/FD 스냅샷 비교
빌드·배포만 실패환경 변수, 권한, 플랫폼 차이, lockfileCI 로그와 로컬 diff, 런타임·이미지 버전 핀
설정 불일치프로필·시크릿·기본값, 리전스키마 검증된 설정 단일 소스와 배포 매트릭스 표준화
데이터 불일치비멱등 재시도, 부분 쓰기, 캐시 무효화 누락멱등 키·아웃박스·트랜잭션 경계 재검토

권장 순서: (1) 최소 재현 (2) 최근 변경 범위 축소 (3) 환경·의존성 차이 (4) 관측으로 가설 검증 (5) 수정 후 회귀·부하 테스트.

배포 전에는 git addgit commitgit pushnpm run deploy 순서를 권장합니다.


같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.


이 글에서 다루는 키워드 (관련 검색어)

C++, 멀티스레드, 스레드풀, thread-pool, 병렬처리, 성능최적화 등으로 검색하시면 이 글이 도움이 됩니다.