C++ SIMD와 병렬화: std::execution과 인트린직 가이드
이 글의 핵심
C++ SIMD와 병렬화: std::execution과 인트린직 가이드에 대한 실전 가이드입니다.
들어가며: 루프가 느린데 컴파일러가 벡터화를 못 한다
”한 번에 여러 개씩 처리하고 싶다”
15번에서 컴파일 타임 최적화와 캐시를 다뤘고, 39-1에서 데이터 지향 설계로 캐시 효율을 올렸다면, SIMD(Single Instruction Multiple Data)는 한 명령으로 여러 데이터를 동시에 처리해 throughput을 높입니다.
컴파일러가 자동으로 자동 벡터화를 하기도 하지만, 복잡한 분기나 정렬이 있으면 실패합니다. 이때 std::execution::unsequenced_policy나 인트린직(intrinsics—컴파일러가 제공하는 저수준 SIMD 명령어 함수. SSE/AVX 등)으로 수동 벡터화를 적용할 수 있습니다. std::execution::par는 기존 알고리즘에 병렬 실행 정책을 붙여 멀티코어를 활용하는 가장 간단한 방법입니다.
이 글에서 다루는 것:
- 문제 시나리오: 프로파일에서 루프가 병목일 때, 자동 벡터화 실패 시
- std::execution: seq, par, par_unseq, unseq — 알고리즘에 정책 전달
- SIMD 개념: 벡터 레지스터, 자동 벡터화 조건
- 인트린직: SSE/AVX 헤더, 완전한 벡터 연산 예시
- 흔한 에러와 해결법: 정렬, CPU 기능 검사, par_unseq 제약
- 성능 벤치마크: 스칼라 vs SIMD vs par
- 프로덕션 패턴: CPU 디스패치, 폴백 경로, xsimd 활용
실무에서 겪은 문제
실제 프로젝트에서 이 개념을 적용하며 겪었던 경험을 공유합니다.
문제 상황과 해결
대규모 C++ 프로젝트를 진행하며 이 패턴/기법의 중요성을 체감했습니다. 책에서 배운 이론과 실제 코드는 많이 달랐습니다.
실전 경험:
- 문제: 처음에는 이 개념을 제대로 이해하지 못해 비효율적인 코드를 작성했습니다
- 해결: 코드 리뷰와 프로파일링을 통해 문제를 발견하고 개선했습니다
- 교훈: 이론만으로는 부족하고, 실제로 부딪혀보며 배워야 합니다
이 글이 여러분의 시행착오를 줄여주길 바랍니다.
목차
- 문제 시나리오: 루프가 병목일 때
- std::execution 정책
- SIMD와 자동 벡터화
- 인트린직 소개
- 완전한 SIMD·execution 예제
- 자주 발생하는 에러와 해결법
- 성능 벤치마크
- 프로덕션 패턴
- 정리
1. 문제 시나리오: 루프가 병목일 때
실제 겪는 상황
"100만 개 float 배열 덧셈이 프로파일에서 30%를 차지해요."
"이미지 픽셀 처리 루프가 너무 느려요."
"컴파일러가 벡터화했다고 하는데 실제로는 스칼라 코드가 나와요."
"멀티코어가 있는데 transform이 한 코어만 쓰고 있어요."
"AVX를 쓰려 했는데 구형 CPU에서 크래시가 나요."
원인 후보
- 자동 벡터화 실패: 반복 간 의존성, 복잡한 분기, 간접 접근 → 컴파일러가 SIMD로 변환하지 못함
- 단일 스레드 실행:
std::transform등에 실행 정책을 넘기지 않아 한 코어만 사용 - 정렬 요구 위반:
_mm256_load_ps는 32바이트 정렬 필요. 비정렬 주소 사용 시 크래시 - CPU 기능 미검사: AVX 코드를 AVX 미지원 CPU에서 실행 → SIGILL
시나리오별 해결 방향
| 시나리오 | 특징 | 권장 방법 |
|---|---|---|
| 단순 배열 연산 (덧셈, 곱셈) | 연속 접근, 독립 반복 | std::execution::par 또는 par_unseq |
| 자동 벡터화 실패 | 분기, 의존성 | 인트린직 수동 벡터화 |
| 다양한 CPU 지원 | AVX/SSE 혼재 | CPU 디스패치 + 폴백 |
| 이미지/신호 처리 | SoA, 고정 크기 블록 | AVX 인트린직 + 정렬된 버퍼 |
Before/After: 배열 덧셈 예시
Before (스칼라 + 순차): 한 번에 하나씩, 한 코어만 사용합니다.
// ❌ 스칼라 순차 — 느림
void add_arrays_scalar(const float* a, const float* b, float* out, size_t n) {
for (size_t i = 0; i < n; ++i) {
out[i] = a[i] + b[i];
}
}
After (std::execution::par): 멀티코어 활용으로 즉시 가속됩니다.
// ✅ std::execution::par — 멀티코어 활용
#include <algorithm>
#include <execution>
#include <vector>
void add_arrays_par(std::vector<float>& a, std::vector<float>& b,
std::vector<float>& out) {
std::transform(std::execution::par, a.begin(), a.end(), b.begin(),
out.begin(), { return x + y; });
}
2. std::execution 정책
알고리즘에 병렬·벡터화 힌트 주기
C++17부터 std::sort, std::transform, std::reduce 등에 실행 정책을 넘길 수 있습니다.
| 정책 | 설명 | 요구사항 |
|---|---|---|
| seq | 순차 실행 (기본) | 없음 |
| par | 멀티스레드 병렬 | 반복자·함수 스레드 안전 |
| par_unseq | 병렬 + 벡터화(SIMD) 허용 | 동기화 프리(락 등 금지) |
| unseq (C++20) | 단일 스레드 벡터화만 | 동기화 프리 |
std::transform의 첫 인자로 std::execution::par를 주면, a·b의 원소 쌍에 대해 람다가 여러 스레드에 나뉘어 실행됩니다. c[i] = a[i] + b[i]가 서로 다른 인덱스에서 독립적으로 계산되므로 스레드 안전하고, par만으로도 멀티코어 활용이 됩니다. par_unseq는 여기에 SIMD 벡터화까지 허용하지만, 람다가 동기화 없이 동작해야 하므로 먼저 par로 효과를 본 뒤 도입하는 것이 좋습니다.
#include <algorithm>
#include <execution>
#include <vector>
void add_vectors_par() {
std::vector<double> a(1000000, 1.0), b(1000000, 2.0), c(a.size());
std::transform(std::execution::par, a.begin(), a.end(), b.begin(),
c.begin(), { return x + y; });
}
par_unseq 사용 시 주의: 람다 내부에서 std::mutex, std::atomic 등 동기화를 사용하면 정의되지 않은 동작입니다. 원소별로 완전히 독립적인 연산만 허용됩니다.
// ✅ par_unseq — 독립적 연산만
std::transform(std::execution::par_unseq, a.begin(), a.end(), b.begin(),
c.begin(), { return x * y + 1.0; });
// ❌ par_unseq — 락 사용 시 UB
std::mutex mtx;
std::transform(std::execution::par_unseq, a.begin(), a.end(), c.begin(),
[&mtx](double x) {
std::lock_guard<std::mutex> lock(mtx); // UB!
return x * 2;
});
std::reduce와 std::transform_reduce
std::reduce는 순서에 의존하지 않는 축소 연산에 사용합니다. 덧셈·곱셈처럼 결합 법칙이 성립하면 병렬로 부분 합을 구한 뒤 합칩니다.
#include <numeric>
#include <execution>
double sum_par(const std::vector<double>& v) {
return std::reduce(std::execution::par, v.begin(), v.end(), 0.0);
}
double dot_product_par(const std::vector<double>& a,
const std::vector<double>& b) {
return std::transform_reduce(
std::execution::par, a.begin(), a.end(), b.begin(), 0.0,
std::plus<>(), std::multiplies<>());
}
3. SIMD와 자동 벡터화
한 명령으로 여러 데이터
SIMD: 하나의 명령으로 벡터 레지스터에 담긴 여러 값(예: 4개 float, 8개 float)에 동시에 연산합니다. 데이터 지향 설계(39-1)로 연속 메모리에 데이터를 두면 벡터화가 잘 됩니다.
flowchart LR
subgraph scalar["스칼라 (1개씩)"]
S1[a0] --> S2[+]
B1[b0] --> S2
S2 --> O1[out0]
end
subgraph simd["SIMD (8개씩, AVX)"]
V1["a0..a7"] --> V2["_mm256_add_ps"]
V3["b0..b7"] --> V2
V2 --> V4["out0..out7"]
end
자동 벡터화 조건
컴파일러가 루프를 SIMD로 바꾸려면:
- 연속 메모리 접근:
a[i],a[i+1]형태 - 반복 간 독립성:
out[i]가out[i-1]등에 의존하지 않음 - 단순 연산: 덧셈, 곱셈, 비트 연산 등
- 분기 최소화:
if가 있으면 마스킹이나 스칼라 폴백으로 나뉨
컴파일 옵션: -O3, -march=native(또는 -mavx2)로 타겟 CPU를 지정하면 더 공격적으로 벡터화합니다.
# AVX2 지원 CPU 타겟
g++ -O3 -march=native -o program program.cpp
# 벡터화 리포트 확인 (GCC)
g++ -O3 -march=native -fopt-info-vec-optimized program.cpp
벡터화 실패 패턴
| 패턴 | 원인 | 대안 |
|---|---|---|
a[i] = a[i-1] + b[i] | 반복 간 의존성 | 수동 인트린직 또는 알고리즘 변경 |
if (a[i] > 0) out[i] = ... | 분기 | 마스킹 또는 분리 루프 |
out[indices[i]] = a[i] | 간접 접근 | SoA 재구성 또는 수동 처리 |
a[i] = func(a[i]) | 외부 함수 호출 | 인라인 또는 인트린직 |
4. 인트린직 소개
수동 벡터 연산
인트린직은 컴파일러가 제공하는 내장 함수로, SSE(<xmmintrin.h> 등), AVX(<immintrin.h> 등)에 대응합니다. __m128, __m256 타입으로 128비트/256비트 벡터를 다룹니다.
- SSE: 128비트, float 4개 또는 double 2개
- AVX/AVX2: 256비트, float 8개 또는 double 4개
__m256은 256비트(8개 float) 벡터 타입입니다. _mm256_loadu_ps(a)로 정렬되지 않은 주소 a에서 8개 float를 로드하고, _mm256_add_ps(va, vb)로 8쌍을 한 번에 더한 뒤 _mm256_storeu_ps(out, …)로 out에 저장합니다. loadu/storeu는 정렬 요구가 없어 일반 배열에 그대로 쓸 수 있습니다.
#include <immintrin.h>
// 8개 float 덧셈 (AVX)
void add_float8(const float* a, const float* b, float* out) {
__m256 va = _mm256_loadu_ps(a);
__m256 vb = _mm256_loadu_ps(b);
_mm256_storeu_ps(out, _mm256_add_ps(va, vb));
}
정렬된 로드/스토어 (load_ps vs loadu_ps)
- _mm256_load_ps / _mm256_store_ps: 32바이트(256비트) 정렬 필요. 비정렬 주소 시 크래시.
- _mm256_loadu_ps / _mm256_storeu_ps: 정렬 불필요. 약간의 성능 손실 있을 수 있음.
// ✅ 정렬된 버퍼 사용 시 load_ps (더 빠를 수 있음)
void add_aligned(const float* a, const float* b, float* out, size_t n) {
const float* end = a + (n & ~7u); // 8의 배수
for (; a < end; a += 8, b += 8, out += 8) {
__m256 va = _mm256_load_ps(a); // 32바이트 정렬 가정
__m256 vb = _mm256_load_ps(b);
_mm256_store_ps(out, _mm256_add_ps(va, vb));
}
// 나머지 스칼라 처리
for (size_t i = n & ~7u; i < n; ++i)
out[i] = a[i] + b[i];
}
주요 AVX 인트린직
| 연산 | 인트린직 | 설명 |
|---|---|---|
| 로드 (비정렬) | _mm256_loadu_ps | 8개 float 로드 |
| 스토어 (비정렬) | _mm256_storeu_ps | 8개 float 저장 |
| 덧셈 | _mm256_add_ps | va + vb |
| 곱셈 | _mm256_mul_ps | va * vb |
| FMA | _mm256_fmadd_ps | va * vb + vc |
| 최대/최소 | _mm256_max_ps, _mm256_min_ps | 요소별 max/min |
| 비교 | _mm256_cmp_ps | 마스크 생성 |
| 블렌드 | _mm256_blendv_ps | 마스크로 선택적 병합 |
5. 완전한 SIMD·execution 예제
예제 1: AVX로 전체 배열 덧셈
#include <immintrin.h>
#include <cstddef>
void add_arrays_avx(const float* a, const float* b, float* out, size_t n) {
size_t i = 0;
// AVX: 8개씩 처리
for (; i + 8 <= n; i += 8) {
__m256 va = _mm256_loadu_ps(a + i);
__m256 vb = _mm256_loadu_ps(b + i);
_mm256_storeu_ps(out + i, _mm256_add_ps(va, vb));
}
// 나머지 스칼라
for (; i < n; ++i) {
out[i] = a[i] + b[i];
}
}
예제 2: 배열 합 (reduce) — 수평 합산
#include <immintrin.h>
#include <cstddef>
float sum_avx(const float* a, size_t n) {
__m256 sum8 = _mm256_setzero_ps();
size_t i = 0;
for (; i + 8 <= n; i += 8) {
__m256 v = _mm256_loadu_ps(a + i);
sum8 = _mm256_add_ps(sum8, v);
}
// 수평 합산: 8개 → 1개
__m128 hi = _mm256_extractf128_ps(sum8, 1);
__m128 lo = _mm256_castps256_ps128(sum8);
__m128 sum4 = _mm_add_ps(hi, lo);
sum4 = _mm_hadd_ps(sum4, sum4);
sum4 = _mm_hadd_ps(sum4, sum4);
float sum = _mm_cvtss_f32(sum4);
for (; i < n; ++i)
sum += a[i];
return sum;
}
예제 3: 조건부 연산 (마스킹)
0보다 큰 값만 2배로 만드는 예시입니다.
#include <immintrin.h>
#include <cstddef>
void clamp_positive_double_avx(const float* in, float* out, size_t n) {
__m256 zero = _mm256_setzero_ps();
__m256 two = _mm256_set1_ps(2.0f);
size_t i = 0;
for (; i + 8 <= n; i += 8) {
__m256 v = _mm256_loadu_ps(in + i);
__m256 mask = _mm256_cmp_ps(v, zero, _CMP_GT_OQ); // v > 0
__m256 doubled = _mm256_mul_ps(v, two);
__m256 result = _mm256_blendv_ps(v, doubled, mask); // mask면 doubled
_mm256_storeu_ps(out + i, result);
}
for (; i < n; ++i)
out[i] = (in[i] > 0) ? in[i] * 2.0f : in[i];
}
예제 4: 내적(dot product) — FMA 활용
FMA(Fused Multiply-Add)는 a*b+c를 한 사이클에 수행합니다. AVX2의 _mm256_fmadd_ps를 사용하면 내적 연산이 빨라집니다.
#include <immintrin.h>
#include <cstddef>
float dot_product_avx(const float* a, const float* b, size_t n) {
__m256 sum8 = _mm256_setzero_ps();
size_t i = 0;
for (; i + 8 <= n; i += 8) {
__m256 va = _mm256_loadu_ps(a + i);
__m256 vb = _mm256_loadu_ps(b + i);
sum8 = _mm256_fmadd_ps(va, vb, sum8); // sum8 += va * vb
}
// 수평 합산
__m128 hi = _mm256_extractf128_ps(sum8, 1);
__m128 lo = _mm256_castps256_ps128(sum8);
__m128 sum4 = _mm_add_ps(hi, lo);
sum4 = _mm_hadd_ps(sum4, sum4);
sum4 = _mm_hadd_ps(sum4, sum4);
float sum = _mm_cvtss_f32(sum4);
for (; i < n; ++i)
sum += a[i] * b[i];
return sum;
}
예제 5: std::execution + 인트린직 조합
먼저 par로 병렬화하고, 각 청크 내부는 스칼라 또는 인트린직으로 처리할 수 있습니다. 아래는 par로 청크를 나누고, 각 청크를 AVX로 처리하는 패턴입니다.
#include <algorithm>
#include <execution>
#include <immintrin.h>
#include <vector>
#include <cstddef>
void add_arrays_par_avx(std::vector<float>& a, std::vector<float>& b,
std::vector<float>& out) {
const size_t n = a.size();
std::vector<size_t> indices(n);
std::iota(indices.begin(), indices.end(), 0);
std::for_each(std::execution::par, indices.begin(), indices.end(),
[&](size_t i) {
if (i + 8 <= n) {
__m256 va = _mm256_loadu_ps(&a[i]);
__m256 vb = _mm256_loadu_ps(&b[i]);
_mm256_storeu_ps(&out[i], _mm256_add_ps(va, vb));
// 주의: 실제로는 청크 단위로 나누는 것이 더 효율적
}
});
}
실무에서는 청크 단위로 나누는 것이 캐시·오버헤드 측면에서 유리합니다.
void add_arrays_chunked_par(const float* a, const float* b, float* out,
size_t n) {
constexpr size_t chunk = 4096; // 청크 크기
std::vector<size_t> chunk_starts;
for (size_t i = 0; i < n; i += chunk)
chunk_starts.push_back(i);
std::for_each(std::execution::par, chunk_starts.begin(), chunk_starts.end(),
[&](size_t start) {
size_t end = std::min(start + chunk, n);
for (size_t i = start; i + 8 <= end; i += 8) {
__m256 va = _mm256_loadu_ps(a + i);
__m256 vb = _mm256_loadu_ps(b + i);
_mm256_storeu_ps(out + i, _mm256_add_ps(va, vb));
}
for (size_t i = (end & ~7u); i < end; ++i)
out[i] = a[i] + b[i];
});
}
6. 자주 발생하는 에러와 해결법
에러 1: 정렬되지 않은 주소에 load_ps 사용 (SIGSEGV)
증상: 특정 입력에서 크래시, SIGSEGV.
원인: _mm256_load_ps는 32바이트 정렬을 요구합니다. malloc이나 new로 할당한 버퍼는 보장되지 않습니다.
// ❌ 위험: 정렬 보장 안 됨
float* a = new float[1000];
__m256 v = _mm256_load_ps(a); // a가 32바이트 정렬이 아닐 수 있음 → 크래시
해결법: loadu_ps 사용 또는 정렬된 할당.
// ✅ loadu_ps 사용 (정렬 불필요)
__m256 v = _mm256_loadu_ps(a);
// ✅ 정렬된 할당 (C++11)
alignas(32) float a[1000];
__m256 v = _mm256_load_ps(a);
// ✅ aligned_alloc (C++17)
float* a = static_cast<float*>(std::aligned_alloc(32, 1000 * sizeof(float)));
에러 2: AVX 미지원 CPU에서 실행 (SIGILL)
증상: 최신 CPU에서는 동작하는데, 구형 PC에서 Illegal instruction으로 크래시.
원인: AVX 명령을 지원하지 않는 CPU에서 실행.
해결법: 런타임 CPU 기능 검사 후 분기.
#include <immintrin.h>
#include <cpuid.h> // GCC/Clang
bool has_avx() {
unsigned int eax, ebx, ecx, edx;
__get_cpuid_count(1, 0, &eax, &ebx, &ecx, &edx);
return (ecx & (1u << 28)) != 0; // CPUID.01H:ECX.AVX
}
void add_arrays_safe(const float* a, const float* b, float* out, size_t n) {
if (has_avx()) {
add_arrays_avx(a, b, out, n);
} else {
for (size_t i = 0; i < n; ++i)
out[i] = a[i] + b[i];
}
}
에러 3: par_unseq에서 동기화 사용 (UB)
증상: 간헐적 크래시, 데드락, 잘못된 결과.
원인: par_unseq 람다 내부에서 mutex, atomic 등 사용.
// ❌ UB
std::atomic<int> counter{0};
std::transform(std::execution::par_unseq, a.begin(), a.end(), out.begin(),
[&counter](int x) {
counter++; // UB: par_unseq에서 atomic 연산 제한
return x * 2;
});
해결법: par만 사용하거나, 동기화를 람다 밖으로 빼기.
// ✅ par 사용 (동기화 필요 시)
std::transform(std::execution::par, a.begin(), a.end(), out.begin(),
[&counter](int x) {
counter++; // par에서는 허용 (단, 성능 주의)
return x * 2;
});
에러 4: 나머지 요소 처리 누락
증상: 배열 끝 몇 개가 잘못된 값 또는 초기화되지 않은 값.
원인: SIMD 루프가 8개(또는 4개) 단위로만 처리하고, n이 8의 배수가 아닐 때 나머지를 처리하지 않음.
// ❌ 나머지 미처리
for (size_t i = 0; i + 8 <= n; i += 8) {
// ...
}
// n=10이면 i=8에서 끝나고, out[8], out[9]는 미처리
해결법: 스칼라 루프로 나머지 처리.
// ✅ 나머지 처리
size_t i = 0;
for (; i + 8 <= n; i += 8) {
// ...
}
for (; i < n; ++i)
out[i] = a[i] + b[i];
에러 5: 오버랩(overlap) 버퍼
증상: 결과가 잘못됨, 크래시.
원인: in과 out이 겹치는 영역(예: in_place 연산)을 loadu/storeu로 처리할 때, 로드 전에 스토어가 같은 주소를 덮어쓸 수 있음.
// ❌ in-place 시 out == in 이면 문제
void square_avx(float* in, float* out, size_t n) {
for (size_t i = 0; i + 8 <= n; i += 8) {
__m256 v = _mm256_loadu_ps(in + i);
_mm256_storeu_ps(out + i, _mm256_mul_ps(v, v)); // in==out이면?
}
}
해결법: in-place일 때는 임시 버퍼 사용 또는 다른 알고리즘. 또는 in != out임을 보장.
// ✅ in-place 시 별도 처리
void square_avx_safe(float* data, size_t n) {
alignas(32) float tmp[8];
for (size_t i = 0; i + 8 <= n; i += 8) {
__m256 v = _mm256_loadu_ps(data + i);
_mm256_store_ps(tmp, _mm256_mul_ps(v, v));
_mm256_storeu_ps(data + i, _mm256_load_ps(tmp));
}
for (size_t i = n & ~7u; i < n; ++i)
data[i] *= data[i];
}
에러 6: n이 0일 때
증상: n=0일 때 a[0] 접근으로 크래시 가능.
원인: 루프 조건을 i < n만 확인하고, n==0일 때 로드/스토어를 수행.
해결법: early return.
// ✅ n==0 처리
void add_arrays_avx_safe(const float* a, const float* b, float* out, size_t n) {
if (n == 0) return;
// ...
}
7. 성능 벤치마크
벤치마크 1: 스칼라 vs AVX vs par
#include <chrono>
#include <execution>
#include <immintrin.h>
#include <numeric>
#include <vector>
#include <iostream>
#include <algorithm>
void benchmark_add() {
constexpr size_t N = 1'000'000;
std::vector<float> a(N, 1.0f), b(N, 2.0f), c(N);
// 1. 스칼라
auto t1 = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < N; ++i)
c[i] = a[i] + b[i];
auto t2 = std::chrono::high_resolution_clock::now();
// 2. AVX
auto t3 = std::chrono::high_resolution_clock::now();
size_t i = 0;
for (; i + 8 <= N; i += 8) {
__m256 va = _mm256_loadu_ps(&a[i]);
__m256 vb = _mm256_loadu_ps(&b[i]);
_mm256_storeu_ps(&c[i], _mm256_add_ps(va, vb));
}
for (; i < N; ++i) c[i] = a[i] + b[i];
auto t4 = std::chrono::high_resolution_clock::now();
// 3. std::transform par
auto t5 = std::chrono::high_resolution_clock::now();
std::transform(std::execution::par, a.begin(), a.end(), b.begin(),
c.begin(), { return x + y; });
auto t6 = std::chrono::high_resolution_clock::now();
using namespace std::chrono;
auto d_scalar = duration_cast<microseconds>(t2 - t1).count();
auto d_avx = duration_cast<microseconds>(t4 - t3).count();
auto d_par = duration_cast<microseconds>(t6 - t5).count();
std::cout << "Scalar: " << d_scalar << " μs\n";
std::cout << "AVX: " << d_avx << " μs (" << (double)d_scalar / d_avx << "x)\n";
std::cout << "par: " << d_par << " μs (" << (double)d_scalar / d_par << "x)\n";
}
예상 결과 (환경에 따라 다름):
| 방식 | 100만 float 덧셈 (μs) | 상대 속도 |
|---|---|---|
| 스칼라 | 500~1500 | 1x |
| AVX | 150~400 | 2~4x |
| par (4코어) | 150~400 | 2~4x |
| par_unseq | 100~300 | 3~6x |
벤치마크 2: reduce (합계)
float sum_scalar(const float* a, size_t n) {
float s = 0;
for (size_t i = 0; i < n; ++i) s += a[i];
return s;
}
void benchmark_sum() {
constexpr size_t N = 10'000'000;
std::vector<float> a(N);
std::iota(a.begin(), a.end(), 1.0f);
auto t1 = std::chrono::high_resolution_clock::now();
volatile float r1 = sum_scalar(a.data(), N);
auto t2 = std::chrono::high_resolution_clock::now();
auto t3 = std::chrono::high_resolution_clock::now();
volatile float r2 = std::reduce(std::execution::par, a.begin(), a.end());
auto t4 = std::chrono::high_resolution_clock::now();
using namespace std::chrono;
std::cout << "Scalar sum: " << duration_cast<microseconds>(t2-t1).count() << " μs\n";
std::cout << "par reduce: " << duration_cast<microseconds>(t4-t3).count() << " μs\n";
}
벤치마크 요약 표
| 연산 | 스칼라 | AVX | par | par_unseq | 비고 |
|---|---|---|---|---|---|
| 100만 float 덧셈 | 1x | 2~4x | 2~4x | 3~6x | 코어 수에 따라 par 이득 |
| 1000만 float 합계 | 1x | 2~3x | 3~8x | 4~10x | reduce 병렬화 효과 큼 |
| 조건부 연산 | 1x | 2~3x | 2~4x | 3~5x | 분기 많으면 이득 감소 |
8. 프로덕션 패턴
패턴 1: CPU 디스패치 (AVX → SSE → 스칼라)
런타임에 CPU 기능을 검사해 적절한 경로를 선택합니다.
#include <immintrin.h>
using AddFunc = void (*)(const float*, const float*, float*, size_t);
void add_scalar(const float* a, const float* b, float* out, size_t n) {
for (size_t i = 0; i < n; ++i) out[i] = a[i] + b[i];
}
AddFunc get_add_func() {
#if defined(__AVX__)
if (__builtin_cpu_supports("avx"))
return add_arrays_avx;
#endif
return add_scalar;
}
void dispatch_add(const float* a, const float* b, float* out, size_t n) {
static AddFunc f = get_add_func();
f(a, b, out, n);
}
패턴 2: xsimd 라이브러리 활용
인트린직을 직접 쓰지 않고, 이식 가능한 SIMD 추상화 라이브러리를 사용할 수 있습니다.
// xsimd 사용 예 (설치 필요: vcpkg install xsimd)
#include <xsimd/xsimd.hpp>
void add_xsimd(const float* a, const float* b, float* out, size_t n) {
using batch_type = xsimd::batch<float>;
size_t i = 0;
for (; i + batch_type::size <= n; i += batch_type::size) {
auto va = batch_type::load_unaligned(a + i);
auto vb = batch_type::load_unaligned(b + i);
(va + vb).store_unaligned(out + i);
}
for (; i < n; ++i)
out[i] = a[i] + b[i];
}
장점: CPU별로 최적 경로 자동 선택, ARM NEON 등 다른 아키텍처 지원.
패턴 3: 정렬된 버퍼 할당
SIMD 성능을 극대화하려면 32바이트 정렬된 버퍼를 사용합니다.
#include <memory>
#include <vector>
std::unique_ptr<float[]> make_aligned_buffer(size_t n) {
return std::unique_ptr<float[]>(
static_cast<float*>(std::aligned_alloc(32, n * sizeof(float))));
}
// 또는 vector + custom allocator
std::vector<float, aligned_allocator<float, 32>> data(1000000);
패턴 4: 청크 단위 병렬 + SIMD
큰 배열을 청크로 나누고, 각 청크를 스레드에 할당한 뒤 청크 내부는 SIMD로 처리합니다.
void process_parallel_simd(const float* in, float* out, size_t n) {
constexpr size_t chunk = 65536;
std::vector<size_t> starts;
for (size_t i = 0; i < n; i += chunk)
starts.push_back(i);
std::for_each(std::execution::par, starts.begin(), starts.end(),
[&](size_t start) {
size_t end = std::min(start + chunk, n);
for (size_t i = start; i + 8 <= end; i += 8) {
__m256 v = _mm256_loadu_ps(in + i);
_mm256_storeu_ps(out + i, _mm256_mul_ps(v, v));
}
for (size_t i = (end & ~7u); i < end; ++i)
out[i] = in[i] * in[i];
});
}
패턴 5: 구현 체크리스트
- CPU 기능 런타임 검사 후 폴백 경로 제공
- 정렬:
load_ps사용 시 32바이트 정렬 보장 - 나머지 요소 스칼라 처리
-
n==0early return - par_unseq 사용 시 람다 내 동기화 금지
- 프로파일링으로 실제 이득 확인 후 적용
패턴 6: 점진적 도입 전략
// 1단계: std::execution::par만 적용 (가장 간단)
std::transform(std::execution::par, a.begin(), a.end(), b.begin(),
c.begin(), { return x + y; });
// 2단계: 프로파일로 이득 확인 후 par_unseq 시도
std::transform(std::execution::par_unseq, a.begin(), a.end(), b.begin(),
c.begin(), { return x + y; });
// 3단계: 여전히 병목이면 인트린직 수동 벡터화
// CPU 디스패치 + AVX/SSE/스칼라 경로
패턴 7: 벡터화 리포트로 자동 벡터화 확인
컴파일러가 벡터화를 적용했는지 확인하려면 리포트 옵션을 사용합니다.
# GCC: 벡터화된 루프 출력
g++ -O3 -march=native -fopt-info-vec-optimized -c program.cpp
# GCC: 벡터화 실패 원인 출력
g++ -O3 -march=native -fopt-info-vec-missed -c program.cpp
# Clang: 벡터화 분석
clang++ -O3 -march=native -Rpass=loop-vectorize -Rpass-missed=loop-vectorize -c program.cpp
# 예: 벡터화 성공 시
program.cpp:10:5: note: loop vectorized
# 예: 벡터화 실패 시
program.cpp:15:5: note: loop not vectorized: value that could not be identified as reduction is used outside the loop
패턴 8: SoA와 SIMD 조합
데이터 지향 설계(39-1)의 SoA(Structure of Arrays)와 SIMD는 궁합이 좋습니다. x, y, z가 각각 연속된 float 배열이면 벡터화가 쉽습니다.
// SoA: x, y, z가 각각 연속 배열
struct ParticleSoA {
std::vector<float> x, y, z; // 각 100만 개
};
void scale_velocity_avx(ParticleSoA& p, float scale) {
const size_t n = p.x.size();
__m256 s = _mm256_set1_ps(scale);
for (size_t i = 0; i + 8 <= n; i += 8) {
_mm256_storeu_ps(&p.x[i], _mm256_mul_ps(_mm256_loadu_ps(&p.x[i]), s));
_mm256_storeu_ps(&p.y[i], _mm256_mul_ps(_mm256_loadu_ps(&p.y[i]), s));
_mm256_storeu_ps(&p.z[i], _mm256_mul_ps(_mm256_loadu_ps(&p.z[i]), s));
}
for (size_t i = n & ~7u; i < n; ++i) {
p.x[i] *= scale;
p.y[i] *= scale;
p.z[i] *= scale;
}
}
9. 정리
| 주제 | 요약 |
|---|---|
| std::execution | seq/par/par_unseq/unseq로 알고리즘 병렬·벡터화 |
| SIMD | 한 명령으로 여러 데이터 — SoA·연속 접근이 유리 |
| 인트린직 | SSE/AVX 등 수동 벡터화 — 자동 벡터화 실패 시 보완 |
| 주의 | 정렬, CPU 검사, par_unseq 동기화 금지, 나머지 처리 |
39번 시리즈는 캐시(DoD) → 메모리(pmr) → 연산(SIMD·execution)으로 “압도적 성능”을 위한 하드웨어 레벨 기법을 다뤘습니다.
핵심 원칙:
- 먼저
std::execution::par로 멀티코어 활용 - 자동 벡터화 실패 시 인트린직 고려
- CPU 기능 검사 + 폴백 경로 필수
- 정렬·나머지·n==0 처리 주의
같이 보면 좋은 글 (내부 링크)
이 주제와 연결되는 다른 글입니다.
- C++ SIMD | “벡터 연산” 가이드
- C++26 핵심 기능 완벽 가이드 | 리플렉션 ^^· std::execution
- C++26 프리뷰: Reflection과 신규 표준 라이브러리 제안들 [#44-1]
실전 팁
실무에서 바로 적용할 수 있는 팁입니다.
디버깅 팁
- 문제가 발생하면 먼저 컴파일러 경고를 확인하세요
- 간단한 테스트 케이스로 문제를 재현하세요
성능 팁
- 프로파일링 없이 최적화하지 마세요
- 측정 가능한 지표를 먼저 설정하세요
코드 리뷰 팁
- 코드 리뷰에서 자주 지적받는 부분을 미리 체크하세요
- 팀의 코딩 컨벤션을 따르세요
실전 체크리스트
실무에서 이 개념을 적용할 때 확인해야 할 사항입니다.
코드 작성 전
- 이 기법이 현재 문제를 해결하는 최선의 방법인가?
- 팀원들이 이 코드를 이해하고 유지보수할 수 있는가?
- 성능 요구사항을 만족하는가?
코드 작성 중
- 컴파일러 경고를 모두 해결했는가?
- 엣지 케이스를 고려했는가?
- 에러 처리가 적절한가?
코드 리뷰 시
- 코드의 의도가 명확한가?
- 테스트 케이스가 충분한가?
- 문서화가 되어 있는가?
이 체크리스트를 활용하여 실수를 줄이고 코드 품질을 높이세요.
이 글에서 다루는 키워드 (관련 검색어)
SIMD, 인트린직, 벡터화, std::execution, AVX, SSE, 병렬 알고리즘, par_unseq 등으로 검색하시면 이 글이 도움이 됩니다.
자주 묻는 질문 (FAQ)
Q. 이 내용을 실무에서 언제 쓰나요?
A. 프로파일에서 루프가 병목일 때, 대량의 배열 연산(이미지 처리, 신호 처리, 수치 계산)이 있을 때 적용합니다. 먼저 std::execution::par를 시도하고, 이득이 있으면 par_unseq 또는 인트린직을 검토하세요.
Q. par와 par_unseq 중 뭘 써야 하나요?
A. 람다가 완전히 독립적이고 동기화가 없으면 par_unseq가 더 빠를 수 있습니다. 락이나 공유 상태가 있으면 par만 사용하세요.
Q. 인트린직 vs xsimd/highway?
A. 인트린직은 직접 제어가 가능하고 의존성이 없습니다. xsimd, highway는 ARM 등 다른 CPU 지원과 이식성이 좋습니다. 프로젝트 요구에 따라 선택하세요.
Q. 선행으로 읽으면 좋은 글은?
A. 각 글 하단의 이전 글 링크를 따라가면 순서대로 배울 수 있습니다. C++ 시리즈 목차에서 전체 흐름을 확인할 수 있습니다.
Q. 더 깊이 공부하려면?
A. Intel Intrinsics Guide, cppreference std::execution를 참고하세요.
한 줄 요약: SIMD·std::execution·인트린직으로 연산을 벡터화·병렬화할 수 있습니다. 다음으로 vcpkg·Conan(#40-1)를 읽어보면 좋습니다.
이전 글: 고성능 C++ #39-2: 커스텀 알로케이터·pmr
다음 글: [DevOps for C++ #40-1] C++ 패키지 관리 실무: vcpkg와 Conan으로 외부 라이브러리 의존성 지옥 탈출
관련 글
- C++ 캐시 효율적인 코드: 데이터 지향 설계 가이드
- C++ std::chrono 완벽 가이드 | duration·time_point·클럭·시간 측정 실전 활용
- C++ 현대적 메모리 관리: 커스텀 알로케이터 제작과 std::pmr 가이드
- C++ std::pmr 완벽 가이드 | Polymorphic Memory Resources로 메모리 풀
- C++26 핵심 기능 완벽 가이드 | 리플렉션 ^^· std::execution