C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]

C++ 스레드 풀 완벽 가이드 | 작업 큐·병렬 처리·성능 벤치마크 [#51-3]

이 글의 핵심

C++17 스레드 풀: 작업 큐, std::async 대안, 데드락·경쟁 조건 방지, 성능 측정, 프로덕션 패턴. 실무 문제 시나리오와 해결법. HTTP 요청을 병렬로 처리하려고 std::async나 std::thread를 루프 안에서 호출했습니다. 요청이 1000개 들어오니 스레드가 1000개 생성되었고, 컨텍스트 스위칭·스택 메모리로 인해 시스템이 멈추는 것처럼 느려졌습니다.

들어가며: 작업마다 스레드를 만들면 안 되는 이유

”요청 1000개 처리하는데 스레드 1000개가 생성돼요”

HTTP 요청을 병렬로 처리하려고 std::asyncstd::thread를 루프 안에서 호출했습니다. 요청이 1000개 들어오니 스레드가 1000개 생성되었고, 컨텍스트 스위칭·스택 메모리로 인해 시스템이 멈추는 것처럼 느려졌습니다.

문제의 코드:

// ❌ 나쁜 예: 작업마다 스레드 생성
void process_requests(const std::vector<Request>& requests) {
    std::vector<std::thread> threads;
    for (const auto& req : requests) {
        threads.emplace_back([&req]() {
            handle_request(req);  // 1000개 요청 → 1000개 스레드
        });
    }
    for (auto& t : threads) t.join();
}

스레드 풀을 쓰면:

// ✅ 좋은 예: 고정된 워커 스레드가 작업 큐에서 가져와 처리
ThreadPool pool(std::thread::hardware_concurrency());
for (const auto& req : requests) {
    pool.submit([&req]() { handle_request(req); });
}
pool.wait();  // CPU 코어 수만큼만 스레드 유지

원인: 스레드 생성·파괴 비용이 크고, 과도한 스레드는 메모리와 스케줄링 부담을 줍니다. 스레드 풀은 미리 생성한 워커들이 작업 큐에서 작업을 가져와 처리하므로, 스레드 수를 제한하면서도 병렬 처리가 가능합니다.

이 글을 읽으면:

  • 스레드 풀의 개념과 동작 원리를 이해할 수 있습니다.
  • C++17로 완전한 스레드 풀을 구현할 수 있습니다.
  • 데드락·경쟁 조건 등 자주 발생하는 에러를 피할 수 있습니다.
  • 성능 벤치마크와 프로덕션 패턴을 적용할 수 있습니다.

문제 시나리오

시나리오 1: 웹 서버 요청 처리

초당 1000개 HTTP 요청을 처리하는 서버에서, 요청마다 std::async를 호출하면 스레드가 무한정 늘어납니다. 스레드 풀에 작업을 제출하면 코어 수(예: 8개)만큼의 워커가 큐에서 순차적으로 가져와 처리합니다.

시나리오 2: 대량 이미지 리사이징

10,000장의 이미지를 썸네일로 변환할 때, std::thread를 10,000번 생성하면 메모리 부족이나 시스템 과부하가 발생할 수 있습니다. 스레드 풀에 8~16개 워커를 두고 작업을 분배하면 안정적으로 처리됩니다.

시나리오 3: 병렬 정렬/검색

대용량 배열을 병렬로 정렬하거나 검색할 때, 재귀적으로 std::async를 호출하면 작업 수만큼 future가 생성되어 오버헤드가 큽니다. 스레드 풀 + 작업 분할로 깊이를 제한하면 효율적입니다.

시나리오 4: 게임 엔진 작업 스케줄링

물리, 렌더링, AI 등 여러 시스템이 매 프레임 실행될 때, 각 시스템 내부에서 세부 작업을 스레드 풀에 제출하면 프레임 간 일관된 스레드 사용이 가능합니다.

시나리오 5: 배치 ETL 파이프라인

DB에서 읽은 레코드를 변환·필터링·저장하는 파이프라인에서, 변환 단계를 스레드 풀에 병렬로 제출하면 처리량이 향상됩니다.

시나리오 6: 실시간 로그 집계

여러 소스에서 들어오는 로그를 실시간으로 집계할 때, 파싱·집계 작업을 스레드 풀에 제출하고 결과를 메인 스레드에서 수집하면 블로킹 없이 처리할 수 있습니다.

시나리오 7: “메모리 사용량이 계속 증가해요”

// ❌ 나쁜 예: 제출 속도 > 처리 속도 → 큐 무한 증가
while (true) {
    pool.submit([&]() { process_incoming_request(); });
    // 처리보다 빠르게 제출 → OOM 위험
}

원인: 작업 큐에 상한이 없어 제출 속도가 처리 속도를 넘으면 메모리가 폭증합니다. 해결: 큐 크기 제한 + 블로킹 submit 또는 backpressure 적용.

시나리오 8: “서버 종료 시 진행 중 작업이 날아가요”

// ❌ 나쁜 예: 소멸자에서 즉시 stop_ = true
~ThreadPool() {
    stop_ = true;
    condition_.notify_all();
    for (auto& w : workers_) w.join();
    // 큐에 남은 1000개 작업은 처리되지 않고 버려짐
}

원인: Graceful shutdown 없이 즉시 중단하면 큐에 남은 작업이 손실됩니다. 해결: shutdown_when_idle()로 큐가 비고 진행 중 작업이 완료될 때까지 대기 후 종료합니다.

시나리오 9: “람다 캡처로 크래시가 나요”

// ❌ 나쁜 예: 참조 캡처 → use-after-free
void process(const std::string& path) {
    pool.submit([&path]() {
        load_file(path);  // path는 process() 반환 후 무효화
    });
}  // path 소멸, 워커는 아직 실행 전

원인: [&]로 캡처한 지역 변수가 작업 실행 전에 스코프를 벗어납니다. 해결: 값 캡처 [path] 또는 std::shared_ptr 사용.


목차

  1. 스레드 풀 아키텍처
  2. 기본 스레드 풀 구현
  3. 완전한 스레드 풀 (future 반환)
  4. 완전한 스레드 풀 예제 (작업 큐·워커·제출·종료)
  5. 우선순위 큐·작업 취소
  6. 자주 발생하는 에러와 해결법
  7. 베스트 프랙티스
  8. 성능 벤치마크
  9. 프로덕션 패턴
  10. 실전 예제

1. 스레드 풀 아키텍처

핵심 구성 요소

flowchart TB
    subgraph main["메인 스레드"]
        S1[submit 작업1]
        S2[submit 작업2]
        S3[submit 작업3]
    end
    subgraph queue["작업 큐"]
        Q[FIFO 큐]
    end
    subgraph workers["워커 스레드"]
        W1[워커 1]
        W2[워커 2]
        W3[워커 3]
    end
    S1 --> Q
    S2 --> Q
    S3 --> Q
    Q --> W1
    Q --> W2
    Q --> W3

동작 흐름:

  1. 메인 스레드가 submit()으로 작업(함수/람다)을 큐에 넣습니다.
  2. 워커 스레드들이 큐에서 작업을 꺼내 실행합니다.
  3. 큐가 비면 워커는 condition_variable으로 대기합니다.
  4. 새 작업이 들어오면 한 워커가 깨어나 처리합니다.

작업 제출 시퀀스 다이어그램

sequenceDiagram
    participant M as 메인 스레드
    participant Q as 작업 큐
    participant W1 as 워커 1
    participant W2 as 워커 2

    M->>Q: submit(task1)
    M->>Q: submit(task2)
    Note over Q: notify_one()
    Q->>W1: task1 꺼냄
    W1->>W1: task1 실행
    Q->>W2: task2 꺼냄
    W2->>W2: task2 실행
    M->>M: wait() 또는 get()
    W1-->>M: 완료
    W2-->>M: 완료

워커 상태 전이

stateDiagram-v2
    [*] --> 대기: 풀 시작
    대기 --> 실행: 큐에서 작업 획득
    실행 --> 대기: 작업 완료, 큐 비어있음
    실행 --> 대기: 작업 완료, 다음 작업 있음
    대기 --> 종료: stop_ && 큐 비어있음

std::async vs 스레드 풀

항목std::async스레드 풀
스레드 수작업마다 생성 가능고정 (코어 수 등)
오버헤드future·스레드 생성 비용큐 접근만
제어제한 어려움명시적 제어
적합한 경우소량 작업, 간단한 병렬화대량 작업, 서버

2. 기본 스레드 풀 구현

최소 동작 코드

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads) : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] {
                for (;;) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex_);
                        condition_.wait(lock, [this] {
                            return stop_ || !tasks_.empty();
                        });
                        if (stop_ && tasks_.empty()) return;
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
        }
    }

    template<typename F>
    void submit(F&& f) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace(std::forward<F>(f));
        }
        condition_.notify_one();
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& w : workers_) w.join();
    }

private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};

사용 예:

int main() {
    ThreadPool pool(4);
    for (int i = 0; i < 8; ++i) {
        pool.submit([i] {
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::cout << "task " << i << " done\n";
        });
    }
    return 0;  // 소멸자에서 join
}

주의: submit은 비동기입니다. 작업 완료를 기다리려면 별도 동기화가 필요합니다. 아래 완전한 구현에서 std::future 반환으로 해결합니다.


3. 완전한 스레드 풀 (future 반환)

결과를 받을 수 있는 submit

#include <atomic>
#include <future>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
#include <memory>

class ThreadPool {
public:
    explicit ThreadPool(size_t num_threads)
        : stop_(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] { worker_loop(); });
        }
    }

    template<typename F, typename... Args>
    auto submit(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type> {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        std::future<return_type> result = task->get_future();

        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace([task]() { (*task)(); });
        }
        condition_.notify_one();
        return result;
    }

    void wait() {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        condition_empty_.wait(lock, [this] {
            return tasks_.empty() && busy_count_ == 0;
        });
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& w : workers_) w.join();
    }

private:
    void worker_loop() {
        for (;;) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                condition_.wait(lock, [this] {
                    return stop_ || !tasks_.empty();
                });
                if (stop_ && tasks_.empty()) return;
                task = std::move(tasks_.front());
                tasks_.pop();
                ++busy_count_;
            }
            task();
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                --busy_count_;
                if (tasks_.empty() && busy_count_ == 0)
                    condition_empty_.notify_all();
            }
        }
    }

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    std::condition_variable condition_empty_;
    std::atomic<size_t> busy_count_{0};
    bool stop_;
};

C++17 result_of 대체 (C++20에서는 std::invoke_result 사용):

// C++17: result_of는 C++20에서 deprecated
// C++20에서는:
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
    -> std::future<std::invoke_result_t<F, Args...>> {
    using return_type = std::invoke_result_t<F, Args...>;
    // ...
}

사용 예: 병렬 합산:

int main() {
    ThreadPool pool(std::thread::hardware_concurrency());
    std::vector<int> data(1000000, 1);

    auto sum_range = [&data](size_t start, size_t end) {
        int sum = 0;
        for (size_t i = start; i < end; ++i) sum += data[i];
        return sum;
    };

    const size_t chunk = data.size() / 4;
    std::vector<std::future<int>> futures;
    for (int i = 0; i < 4; ++i) {
        size_t s = i * chunk;
        size_t e = (i == 3) ? data.size() : (i + 1) * chunk;
        futures.push_back(pool.submit(sum_range, s, e));
    }

    int total = 0;
    for (auto& f : futures) total += f.get();
    std::cout << "sum = " << total << "\n";
}

4. 완전한 스레드 풀 예제 (작업 큐·워커·제출·종료)

단계별 동작 흐름

아래 예제는 작업 큐 생성 → 워커 스레드 기동 → 작업 제출(void/future) → Graceful Shutdown까지 한 번에 보여주는 실행 가능한 코드입니다.

#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

class CompleteThreadPool {
public:
    explicit CompleteThreadPool(size_t num_threads) : stop_(false) {
        if (num_threads == 0) num_threads = 1;
        workers_.reserve(num_threads);
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this, i] { worker_loop(i); });
        }
    }

    // 1. void 반환 작업 제출 (fire-and-forget)
    template<typename F>
    void submit(F&& f) {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            if (stop_) throw std::runtime_error("submit on stopped pool");
            tasks_.emplace(std::function<void()>(std::forward<F>(f)));
        }
        condition_.notify_one();
    }

    // 2. future 반환 작업 제출 (결과 수신)
    template<typename F, typename... Args>
    auto submit_with_result(F&& f, Args&&... args)
        -> std::future<std::invoke_result_t<F, Args...>> {
        using return_type = std::invoke_result_t<F, Args...>;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        std::future<return_type> result = task->get_future();
        submit([task]() { (*task)(); });
        return result;
    }

    // 3. Graceful Shutdown: 큐 비우기 + 진행 중 작업 완료 대기
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (auto& w : workers_) {
            if (w.joinable()) w.join();
        }
        workers_.clear();
    }

    // 4. 유휴 상태까지 대기 후 shutdown
    void shutdown_when_idle() {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        condition_idle_.wait(lock, [this] {
            return tasks_.empty() && busy_count_ == 0;
        });
        stop_ = true;
        lock.unlock();
        condition_.notify_all();
        for (auto& w : workers_) {
            if (w.joinable()) w.join();
        }
        workers_.clear();
    }

    ~CompleteThreadPool() { shutdown(); }

private:
    void worker_loop(size_t id) {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                condition_.wait(lock, [this] {
                    return stop_ || !tasks_.empty();
                });
                if (stop_ && tasks_.empty()) return;
                task = std::move(tasks_.front());
                tasks_.pop();
                ++busy_count_;
            }
            try {
                task();
            } catch (const std::exception& e) {
                std::cerr << "[워커 " << id << "] 예외: " << e.what() << "\n";
            }
            {
                std::lock_guard<std::mutex> lock(queue_mutex_);
                --busy_count_;
                if (tasks_.empty() && busy_count_ == 0)
                    condition_idle_.notify_all();
            }
        }
    }

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mutex_;
    std::condition_variable condition_;
    std::condition_variable condition_idle_;
    std::atomic<size_t> busy_count_{0};
    std::atomic<bool> stop_;
};

사용 예: 작업 제출과 Shutdown

CompleteThreadPool pool(4);
for (int i = 0; i < 4; ++i)
    pool.submit([i]() { /* 작업 처리 */ });
auto f = pool.submit_with_result( { return 42; });
std::cout << f.get() << "\n";
pool.shutdown_when_idle();

실행 흐름 요약:

  1. 작업 큐: std::queue<std::function<void()>> + std::mutex + condition_variable
  2. 워커 스레드: worker_loop()에서 큐에서 작업을 꺼내 실행, 예외 처리 포함
  3. 작업 제출: submit() (void), submit_with_result() (future)
  4. Shutdown: shutdown() (즉시), shutdown_when_idle() (유휴 시)

5. 우선순위 큐·작업 취소

우선순위 작업 큐

#include <queue>

struct PriorityTask {
    int priority;
    std::function<void()> task;
    bool operator<(const PriorityTask& o) const {
        return priority < o.priority;  // 높은 숫자 = 높은 우선순위
    }
};

class PriorityThreadPool {
    // ...
    std::priority_queue<PriorityTask> tasks_;
    // submit 시 priority 인자 추가
};

Graceful Shutdown

void shutdown() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        stop_ = true;
    }
    condition_.notify_all();
    for (auto& w : workers_) w.join();
    workers_.clear();
}

// 소멸자에서 즉시 중단 대신, 큐 비우기 대기 옵션
void shutdown_when_idle() {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    condition_empty_.wait(lock, [this] {
        return tasks_.empty() && busy_count_ == 0;
    });
    stop_ = true;
    condition_.notify_all();
    lock.unlock();
    for (auto& w : workers_) w.join();
}

6. 자주 발생하는 에러와 해결법

문제 1: 람다에서 참조 캡처로 use-after-free

증상: 크래시, 잘못된 결과

원인: [&]로 캡처한 지역 변수가 스레드가 실행되기 전에 스코프를 벗어남

// ❌ 잘못된 사용
void bad() {
    int x = 42;
    pool.submit([&]() { std::cout << x; });  // x는 이미 파괴됐을 수 있음
}  // x 소멸

// ✅ 올바른 사용: 값 캡처
pool.submit([x]() { std::cout << x; });

// 또는 shared_ptr로 공유
auto data = std::make_shared<Data>(...);
pool.submit([data]() { process(data); });

문제 2: submit 내부에서 pool에 다시 submit (데드락 가능)

증상: 교착 상태, 프로그램 멈춤

원인: 큐 락을 잡은 채로 submit이 호출되고, 내부 작업이 또 submit을 기다리는 경우

// ❌ 위험한 패턴
pool.submit([&pool]() {
    auto result = heavy_compute();
    pool.submit([result]() { save(result); });  // 데드락 가능
});

// ✅ 해결: submit은 락 밖에서 notify
// (우리 구현은 이미 그렇게 되어 있음)
// 또는 재귀 submit을 별도 큐에 넣고 메인 루프에서 처리

문제 3: stop_ 플래그만으로는 진행 중 작업 중단 불가

증상: shutdown 후에도 작업이 계속 실행됨

해결법: 각 작업 내부에서 주기적으로 stop_requested() 체크

void long_running_task(std::atomic<bool>& stop) {
    for (int i = 0; i < 1000000; ++i) {
        if (stop.load()) return;
        do_work(i);
    }
}

문제 4: 예외가 워커를 종료시킴

증상: 워커가 예외로 죽어서 스레드 수가 줄어듦

해결법: 워커 루프에서 try-catch

void worker_loop() {
    for (;;) {
        // ...
        try {
            task();
        } catch (const std::exception& e) {
            std::cerr << "task exception: " << e.what() << "\n";
        } catch (...) {
            std::cerr << "unknown task exception\n";
        }
    }
}

문제 5: future.get()에서 데드락

증상: 메인 스레드가 멈춤

원인: 스레드 풀 크기가 1이고, 메인 스레드가 get()으로 대기하는 동안 그 유일한 워커가 다른 작업을 기다리는 경우

// ❌ 위험: 단일 스레드 풀
ThreadPool pool(1);
auto f = pool.submit([&pool]() {
    auto inner = pool.submit( { return 1; });
    return inner.get();  // 데드락: 유일한 워커가 inner 완료 대기
});

// ✅ 해결: 스레드 풀 크기 >= 2 이상
// 또는 재귀 submit을 피하고, 작업을 평탄화

문제 6: 캐시 라인 분할 (False Sharing)

증상: 멀티스레드에서 카운터 증가 시 예상보다 느림
해결법: 스레드별 데이터를 alignas(64)로 캐시 라인 경계 정렬

문제 7: 큐 무한 증가 (Backpressure 없음)

증상: 메모리 사용량이 계속 증가하다 OOM 발생

원인: 제출 속도가 처리 속도보다 빠를 때 큐에 작업이 무한히 쌓임

해결법: 큐 크기 제한 + 블로킹 submit

template<typename F>
void submit_blocking(F&& f, size_t max_queue_size = 1000) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    condition_full_.wait(lock, [this, max_queue_size] {
        return tasks_.size() < max_queue_size;
    });
    tasks_.emplace(std::forward<F>(f));
    condition_.notify_one();
}

문제 8: shared_ptr 순환 참조

증상: pool이 task를 들고, task가 pool을 캡처하면 메모리 누수
해결법: weak_ptr로 pool 참조 후 lock()으로 유효성 확인


7. 베스트 프랙티스

1. 람다 캡처: 값 캡처 우선

// ✅ 좋은 예: 값 캡처
pool.submit([path = std::string(path)]() { process(path); });

// ✅ shared_ptr로 공유 시
auto data = std::make_shared<Data>(...);
pool.submit([data]() { process(data); });

2. 워커 수 설정

// CPU 바운드: 코어 수
size_t n = std::thread::hardware_concurrency();

// I/O 바운드: 2~4배
size_t n = std::thread::hardware_concurrency() * 2;

3. 예외 처리: 워커 루프에서 try-catch

void worker_loop() {
    for (;;) {
        // ...
        try {
            task();
        } catch (const std::exception& e) {
            log_error(e.what());
        } catch (...) {
            log_error("unknown exception");
        }
    }
}

4. 재귀 submit 시 데드락 방지

// ❌ 위험: 단일 스레드 풀에서 재귀 submit + get()
ThreadPool pool(1);
auto f = pool.submit([&pool]() {
    auto inner = pool.submit( { return 1; });
    return inner.get();  // 데드락
});

// ✅ 해결: 풀 크기 >= 2 또는 작업 평탄화
ThreadPool pool(std::max(2uz, std::thread::hardware_concurrency()));

5. Shutdown 시점 명확히

// 서버 종료 시: 큐에 남은 작업까지 처리
pool.shutdown_when_idle();

// 긴급 종료 시: 즉시 중단
pool.shutdown();

6. 큐 크기 제한 (Backpressure)

// 제출 속도 > 처리 속도 시 OOM 방지
void submit_blocking(F&& f, size_t max_queue = 1000) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    condition_full_.wait(lock, [this, max_queue] {
        return tasks_.size() < max_queue;
    });
    tasks_.emplace(std::forward<F>(f));
    condition_.notify_one();
}

7. 모니터링 포인트

프로덕션에서 queue_size, busy_workers, total_submitted, total_completed 등을 추적합니다.


8. 성능 벤치마크

벤치마크 설정

  • CPU: 8코어 (예: Apple M1 / Intel i7)
  • 작업: 100만 개 정수 합산을 8개 청크로 분할
  • 비교: std::async vs 스레드 풀

예시 결과 (표)

방식10만 작업100만 작업스레드 수
std::async (매번)2.3초23초100만 개 생성
스레드 풀 (8 워커)0.15초1.5초8개 고정
스레드 풀 (16 워커)0.14초1.4초16개 고정

벤치마크 코드 예시

void benchmark_pool(ThreadPool& pool, size_t num_tasks) {
    auto start = std::chrono::high_resolution_clock::now();
    std::vector<std::future<int>> futures;
    for (size_t i = 0; i < num_tasks; ++i)
        futures.push_back(pool.submit([i]() { return static_cast<int>(i); }));
    for (auto& f : futures) f.get();
    auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::high_resolution_clock::now() - start).count();
    std::cout << "pool " << num_tasks << ": " << elapsed << " ms\n";
}

최적 워커 수

  • CPU 바운드: std::thread::hardware_concurrency() (코어 수)
  • I/O 바운드: 코어 수의 2~4배
  • 혼합: 프로파일링으로 조정

7. 프로덕션 패턴

패턴 1: 싱글톤 스레드 풀

class ThreadPoolSingleton {
public:
    static ThreadPool& instance() {
        static ThreadPool pool(std::thread::hardware_concurrency());
        return pool;
    }
};

// 사용
ThreadPoolSingleton::instance().submit([]{ do_work(); });

패턴 2: 스레드 풀 크기 설정

// 환경 변수 또는 설정 파일에서 읽기
size_t get_pool_size() {
    if (const char* p = std::getenv("THREAD_POOL_SIZE")) {
        return std::max(1uz, static_cast<size_t>(std::atoi(p)));
    }
    return std::thread::hardware_concurrency();
}

패턴 3: 작업 타임아웃

template<typename F>
auto submit_with_timeout(F&& f, std::chrono::milliseconds timeout) {
    auto result = submit(std::forward<F>(f));
    if (result.wait_for(timeout) == std::future_status::timeout) {
        // 타임아웃 처리: 작업은 백그라운드에서 계속 실행됨
        throw std::runtime_error("task timeout");
    }
    return result.get();
}

패턴 4: 배치 제출로 락 경합 감소

template<typename F>
void submit_batch(std::vector<F>&& tasks) {
    std::unique_lock<std::mutex> lock(queue_mutex_);
    for (auto& t : tasks) {
        tasks_.emplace(std::move(t));
    }
    condition_.notify_all();  // 한 번에 모두 깨움
}

패턴 5: Work Stealing (고급)

부하가 불균형할 때, 유휴 워커가 다른 워커의 로컬 큐에서 작업을 가져옵니다. 구현이 복잡하므로 cpp-series-51-3-multithreading-tuning 참고.

패턴 6: 모니터링

get_stats()queue_size, busy_workers, total_workers를 반환해 모니터링합니다.

패턴 7: 스레드 로컬 스토리지

thread_local로 워커별 캐시·DB 연결 등 독립 리소스를 둡니다.

패턴 8: 작업 그룹 (Task Group)

여러 작업을 한 번에 제출하고 모두 완료될 때까지 대기:

template<typename... Fs>
void submit_and_wait(Fs&&... fs) {
    std::tuple<std::future<void>...> futures = {
        submit(std::forward<Fs>(fs))...
    };
    std::apply( { (f.get(), ...); }, futures);
}

패턴 9: Work Stealing 개요

부하 불균형 시 유휴 워커가 다른 워커의 로컬 큐에서 작업을 훔칩니다. C++ 표준에는 없어 직접 구현하거나 BS::thread_pool 같은 라이브러리를 사용합니다.


10. 실전 예제

예제 1: 병렬 이미지 처리

void resize_images_parallel(const std::vector<std::string>& paths) {
    ThreadPool pool(std::thread::hardware_concurrency());
    std::vector<std::future<void>> futures;
    for (const auto& path : paths)
        futures.push_back(pool.submit([path]() {
            auto thumb = resize(load_image(path), 128, 128);
            save_image("thumb_" + path, thumb);
        }));
    for (auto& f : futures) f.get();
}

예제 2: Map-Reduce 스타일 합산

template<typename T>
T parallel_sum(const std::vector<T>& data) {
    size_t num_workers = std::thread::hardware_concurrency();
    ThreadPool pool(num_workers);
    const size_t chunk = std::max(size_t(1), data.size() / num_workers);
    std::vector<std::future<T>> futures;

    for (size_t i = 0; i < data.size(); i += chunk) {
        size_t end = std::min(i + chunk, data.size());
        futures.push_back(pool.submit([&data, i, end]() {
            T sum = 0;
            for (size_t j = i; j < end; ++j) sum += data[j];
            return sum;
        }));
    }

    T total = 0;
    for (auto& f : futures) total += f.get();
    return total;
}

예제 3: 파이프라인 (Producer-Consumer)

Producer가 BlockingQueue에서 꺼낸 작업을 pool.submit()으로 제출하고, pool.wait()으로 완료를 기다립니다.

예제 4: 병렬 퀵소트 (작업 분할)

template<typename T>
void parallel_quicksort(std::vector<T>& arr, size_t left, size_t right,
                        ThreadPool& pool, int depth = 0) {
    if (left >= right) return;
    size_t pivot = partition(arr, left, right);

    const int max_depth = 4;  // 분할 깊이 제한
    if (depth < max_depth) {
        auto f = pool.submit([&, pivot, right]() {
            parallel_quicksort(arr, pivot + 1, right, pool, depth + 1);
        });
        parallel_quicksort(arr, left, pivot - 1, pool, depth + 1);
        f.get();
    } else {
        std::sort(arr.begin() + left, arr.begin() + right + 1);
    }
}

예제 5: 병렬 HTTP 요청 배치

std::vector<Response> fetch_all(const std::vector<std::string>& urls) {
    ThreadPool pool(std::min(urls.size(), size_t(16)));
    std::vector<std::future<Response>> futures;
    for (const auto& url : urls) futures.push_back(pool.submit([url]() { return http_get(url); }));
    std::vector<Response> results;
    for (auto& f : futures) results.push_back(f.get());
    return results;
}

예제 6: 게임 렌더링 작업 분할

void render_frame(Scene& scene, ThreadPool& pool) {
    const int num_chunks = 4;
    std::vector<std::future<void>> futures;
    for (int i = 0; i < num_chunks; ++i) {
        size_t start = i * (scene.meshes.size() / num_chunks);
        size_t end = (i + 1) * (scene.meshes.size() / num_chunks);
        futures.push_back(pool.submit([&scene, start, end]() {
            for (size_t j = start; j < end; ++j) scene.meshes[j].draw();
        }));
    }
    for (auto& f : futures) f.get();
}

스레드 풀 라이브러리 비교

라이브러리특징라이선스
BS::thread_pool헤더 전용, C++17, Work stealingMIT
ctpl가벼움, 헤더 전용MIT
ThreadPool (progschj)단순, C++11Zlib
HPX대규모 분산, C++20BSL

참고 자료


구현 체크리스트

  • 워커 수를 hardware_concurrency() 또는 설정값으로 결정
  • submit 시 람다는 값 캡처 또는 shared_ptr 사용
  • 워커 루프에서 예외 처리 (try-catch)
  • shutdown 시 진행 중 작업 완료 대기 옵션 제공
  • 재귀 submit 시 데드락 가능성 검토 (풀 크기 >= 2)
  • 프로덕션에서 큐 크기·통계 모니터링
  • False sharing 방지 (스레드별 데이터 정렬)

정리

항목설명
아키텍처작업 큐 + 고정 워커 스레드
기본 구현mutex + condition_variable
future 반환packaged_task로 결과 수신
에러 방지참조 캡처 금지, 예외 처리, 데드락 주의
성능std::async 대비 대량 작업에서 유리
프로덕션싱글톤, 설정, 모니터링, 배치 제출

핵심 원칙:

  1. 스레드 수를 제한하고 작업 큐로 부하 분산
  2. 람다 캡처와 예외 처리로 안정성 확보
  3. 벤치마크로 워커 수와 패턴 검증
  4. 프로덕션에서는 모니터링과 graceful shutdown 적용

자주 묻는 질문 (FAQ)

Q. std::async 대신 스레드 풀을 써야 할 때는?

A. 작업 수가 수백~수천 개 이상이고, 스레드 생성 비용을 줄이고 싶을 때 스레드 풀을 사용합니다. 소량의 일회성 병렬 작업은 std::async로 충분합니다.

Q. 워커 수는 어떻게 정하면 되나요?

A. CPU 바운드 작업은 코어 수, I/O 바운드는 2~4배를 시도해 보고 프로파일링으로 조정합니다.

Q. 기존 라이브러리를 써도 되나요?

A. BS::thread_pool, ctpl 등 검증된 라이브러리를 사용해도 됩니다. 이 글의 구현은 원리 이해와 커스터마이징용입니다.

Q. 선행으로 읽으면 좋은 글은?

A. 원자 연산, 데이터 레이스와 뮤텍스, 태스크 큐를 먼저 읽으면 도움이 됩니다.

Q. 더 깊이 공부하려면?

A. cppreference - thread, C++ Concurrency in Action을 참고하세요.


한 줄 요약: 스레드 풀로 스레드 수를 제한하면서 대량 작업을 안정적으로 병렬 처리할 수 있습니다.


관련 글

  • C++ 고급 프로파일링 완벽 가이드 | perf·gprof
  • C++ 멀티스레드 성능 튜닝 | 락 프리·작업 훔치기·스레드 풀 [#51-3]
  • C++ Lock-Free 프로그래밍 실전 | CAS·ABA·메모리 순서·고성능 큐 [#51-5]
  • C++ Lock-Free 프로그래밍 실전 | CAS·ABA·메모리 순서·고성능 큐 [#34-3]
  • C++ condition_variable 기초 |
  • C++ Atomic Operations |
... 996 lines not shown ... Token usage: 63706/1000000; 936294 remaining Start-Sleep -Seconds 3