C++ 스레드 풀 | "Thread Pool" 구현 가이드

C++ 스레드 풀 | "Thread Pool" 구현 가이드

이 글의 핵심

C++ 스레드 풀에 대한 실전 가이드입니다. 개념부터 실무 활용까지 예제와 함께 상세히 설명합니다.

스레드 풀이란?

미리 생성된 스레드들로 작업을 효율적으로 처리

// 매번 스레드 생성 (비효율)
for (int i = 0; i < 1000; i++) {
    thread t(task);
    t.detach();
}

// 스레드 풀 (효율적)
ThreadPool pool(4);
for (int i = 0; i < 1000; i++) {
    pool.enqueue(task);
}

기본 구현

#include <thread>
#include <queue>
#include <functional>
#include <condition_variable>
#include <future>

class ThreadPool {
private:
    vector<thread> workers;
    queue<function<void()>> tasks;
    
    mutex mtx;
    condition_variable cv;
    bool stop = false;
    
public:
    ThreadPool(size_t numThreads) {
        for (size_t i = 0; i < numThreads; i++) {
            workers.emplace_back([this] {
                while (true) {
                    function<void()> task;
                    
                    {
                        unique_lock<mutex> lock(mtx);
                        cv.wait(lock, [this] {
                            return stop || !tasks.empty();
                        });
                        
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        task = move(tasks.front());
                        tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    template<class F>
    void enqueue(F&& f) {
        {
            unique_lock<mutex> lock(mtx);
            tasks.emplace(forward<F>(f));
        }
        cv.notify_one();
    }
    
    ~ThreadPool() {
        {
            unique_lock<mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        
        for (auto& worker : workers) {
            worker.join();
        }
    }
};

future 지원

class ThreadPool {
private:
    vector<thread> workers;
    queue<function<void()>> tasks;
    mutex mtx;
    condition_variable cv;
    bool stop = false;
    
public:
    ThreadPool(size_t numThreads) {
        for (size_t i = 0; i < numThreads; i++) {
            workers.emplace_back([this] {
                while (true) {
                    function<void()> task;
                    
                    {
                        unique_lock<mutex> lock(mtx);
                        cv.wait(lock, [this] {
                            return stop || !tasks.empty();
                        });
                        
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        task = move(tasks.front());
                        tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> future<typename result_of<F(Args...)>::type> 
    {
        using return_type = typename result_of<F(Args...)>::type;
        
        auto task = make_shared<packaged_task<return_type()>>(
            bind(forward<F>(f), forward<Args>(args)...)
        );
        
        future<return_type> res = task->get_future();
        
        {
            unique_lock<mutex> lock(mtx);
            tasks.emplace([task]() { (*task)(); });
        }
        cv.notify_one();
        
        return res;
    }
    
    ~ThreadPool() {
        {
            unique_lock<mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        
        for (auto& worker : workers) {
            worker.join();
        }
    }
};

실전 예시

예시 1: 병렬 계산

int compute(int x) {
    this_thread::sleep_for(chrono::milliseconds(100));
    return x * x;
}

int main() {
    ThreadPool pool(4);
    vector<future<int>> results;
    
    // 작업 제출
    for (int i = 0; i < 10; i++) {
        results.emplace_back(pool.enqueue(compute, i));
    }
    
    // 결과 수집
    for (auto& result : results) {
        cout << result.get() << endl;
    }
}

예시 2: 파일 처리

void processFile(const string& filename) {
    cout << "처리 중: " << filename << endl;
    this_thread::sleep_for(chrono::milliseconds(500));
}

int main() {
    ThreadPool pool(4);
    
    vector<string> files = {
        "file1.txt", "file2.txt", "file3.txt",
        "file4.txt", "file5.txt", "file6.txt"
    };
    
    for (const auto& file : files) {
        pool.enqueue(processFile, file);
    }
    
    // 자동으로 완료 대기 (소멸자)
}

예시 3: 웹 크롤러

#include <set>

class WebCrawler {
private:
    ThreadPool pool;
    set<string> visited;
    mutex mtx;
    
public:
    WebCrawler(size_t numThreads) : pool(numThreads) {}
    
    void crawl(const string& url) {
        {
            lock_guard<mutex> lock(mtx);
            if (visited.count(url)) {
                return;
            }
            visited.insert(url);
        }
        
        pool.enqueue([this, url]() {
            cout << "크롤링: " << url << endl;
            
            // 페이지 다운로드
            this_thread::sleep_for(chrono::milliseconds(100));
            
            // 링크 추출 (시뮬레이션)
            vector<string> links = {
                url + "/page1",
                url + "/page2"
            };
            
            for (const auto& link : links) {
                crawl(link);
            }
        });
    }
};

int main() {
    WebCrawler crawler(4);
    crawler.crawl("http://example.com");
    
    this_thread::sleep_for(chrono::seconds(2));
}

예시 4: 이미지 처리

struct Image {
    string filename;
    int width, height;
};

Image processImage(const string& filename) {
    cout << "처리 중: " << filename << endl;
    this_thread::sleep_for(chrono::milliseconds(200));
    return {filename, 800, 600};
}

int main() {
    ThreadPool pool(4);
    
    vector<string> images = {
        "img1.jpg", "img2.jpg", "img3.jpg",
        "img4.jpg", "img5.jpg", "img6.jpg"
    };
    
    vector<future<Image>> results;
    
    for (const auto& img : images) {
        results.emplace_back(pool.enqueue(processImage, img));
    }
    
    for (auto& result : results) {
        Image img = result.get();
        cout << img.filename << ": " 
             << img.width << "x" << img.height << endl;
    }
}

우선순위 큐

class PriorityThreadPool {
private:
    struct Task {
        int priority;
        function<void()> func;
        
        bool operator<(const Task& other) const {
            return priority < other.priority;  // 높은 우선순위 먼저
        }
    };
    
    vector<thread> workers;
    priority_queue<Task> tasks;
    mutex mtx;
    condition_variable cv;
    bool stop = false;
    
public:
    PriorityThreadPool(size_t numThreads) {
        for (size_t i = 0; i < numThreads; i++) {
            workers.emplace_back([this] {
                while (true) {
                    function<void()> task;
                    
                    {
                        unique_lock<mutex> lock(mtx);
                        cv.wait(lock, [this] {
                            return stop || !tasks.empty();
                        });
                        
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        task = move(tasks.top().func);
                        tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    template<class F>
    void enqueue(int priority, F&& f) {
        {
            unique_lock<mutex> lock(mtx);
            tasks.push({priority, forward<F>(f)});
        }
        cv.notify_one();
    }
    
    ~PriorityThreadPool() {
        {
            unique_lock<mutex> lock(mtx);
            stop = true;
        }
        cv.notify_all();
        
        for (auto& worker : workers) {
            worker.join();
        }
    }
};

자주 발생하는 문제

문제 1: 데드락

// ❌ 작업이 다른 작업 대기
pool.enqueue([&pool]() {
    auto f = pool.enqueue( { return 42; });
    f.get();  // 데드락 가능
});

// ✅ 중첩 작업 피하기

문제 2: 예외 처리

// ❌ 예외 무시
pool.enqueue( {
    throw runtime_error("에러");
});

// ✅ future로 예외 전달
auto f = pool.enqueue( {
    throw runtime_error("에러");
    return 42;
});

try {
    f.get();
} catch (const exception& e) {
    cout << e.what() << endl;
}

문제 3: 스레드 수

// ❌ 너무 많은 스레드
ThreadPool pool(1000);  // 오버헤드

// ✅ CPU 코어 수 기반
ThreadPool pool(thread::hardware_concurrency());

FAQ

Q1: 스레드 풀은 언제 사용하나요?

A:

  • 많은 작은 작업
  • 스레드 생성 비용 절감
  • 리소스 제한

Q2: 스레드 수는?

A:

  • CPU 바운드: hardware_concurrency()
  • I/O 바운드: 더 많이 (2-4배)

Q3: 성능 향상은?

A: 스레드 생성/소멸 비용 제거. 작은 작업에서 큰 효과.

Q4: 우선순위는?

A: priority_queue 사용.

Q5: 작업 취소는?

A: atomic<bool> 플래그로 구현.

Q6: 스레드 풀 학습 리소스는?

A:

  • “C++ Concurrency in Action”
  • Boost.Asio
  • “Effective Modern C++“

같이 보면 좋은 글 (내부 링크)

이 주제와 연결되는 다른 글입니다.

  • C++ 멀티스레딩 | “thread/mutex” 기초 가이드
  • C++ condition_variable | “조건 변수” 가이드
  • C++ Expression Templates | “지연 평가” 고급 기법

관련 글

  • C++ 메모리 정렬 |
  • C++ condition_variable 기초 |
  • C++ RVO·NRVO |
  • C++ Expression Templates |
  • C++ 인라인 어셈블리 |