Python 성능 최적화 실전 사례 | 데이터 처리 속도 100배 개선기

Python 성능 최적화 실전 사례 | 데이터 처리 속도 100배 개선기

이 글의 핵심

Python 데이터 처리 100배 최적화 실전 - 프로파일링, NumPy, Cython, 멀티프로세싱

들어가며

“Python은 느려서 프로덕션에 못 쓴다”는 말을 자주 듣습니다. 하지만 올바른 최적화 기법을 적용하면 충분히 빠릅니다. 이 글에서는 데이터 처리 스크립트의 실행 시간을 10시간에서 6분으로 100배 개선한 사례를 공유합니다.

일상에 빗대면, 손으로 영수증 한 장씩 더하는 것계산기 한 번에 합계 내기로 바꾼 것과 비슷합니다. 언어가 느려서라기보다 같은 일을 너무 자주 반복한 경우가 많습니다.

이 글을 읽으면

  • Python 프로파일링 도구를 실전에서 활용하는 법을 익히실 수 있습니다
  • NumPy 벡터화로 루프를 제거하는 기법을 익히실 수 있습니다
  • Cython으로 병목 지점을 최적화하는 방법을 이해하실 수 있습니다
  • 멀티프로세싱으로 병렬 처리하는 전략을 습득하실 수 있습니다

데이터 처리공장 라인에 비유하면, 처음 코드는 제품 하나마다 손으로 공정을 반복한 것에 가깝고, 벡터화·병렬화는 한 번에 한 묶음을 처리하거나 라인을 여러 개 두는 쪽에 가깝습니다.


목차

  1. 문제: 데이터 처리가 너무 느림
  2. 측정: 기준선 설정
  3. 프로파일링: cProfile로 병목 찾기
  4. 병목 1: 중첩 루프
  5. 최적화 1: NumPy 벡터화
  6. 병목 2: 문자열 연산
  7. 최적화 2: Cython 적용
  8. 병목 3: 순차 처리
  9. 최적화 3: 멀티프로세싱
  10. 최종 결과: 100배 성능 향상
  11. 마무리

1. 문제: 데이터 처리가 너무 느림

상황

문제의 본질은 “한 줄 처리 로직이 틀렸다”가 아니라, 100만 행에 대해 순수 Python 루프와 반복 할당이 누적되어 야간 배치가 SLA를 넘긴다는 점이었습니다. CSV 파일(100만 행)을 처리하는 스크립트가 지나치게 느렸습니다.

# process_data.py
import csv

def process_file(filename):
    with open(filename) as f:
        reader = csv.DictReader(f)
        results = []
        
        for row in reader:
            # 각 행에 대해 복잡한 계산
            result = calculate(row)
            results.append(result)
        
        return results

def calculate(row):
    # 중첩 루프로 계산
    total = 0
    for i in range(1000):
        for j in range(100):
            total += float(row['value']) * i * j
    return total

실행 시간

$ time python process_data.py input.csv

real    10h 23m 45s  # 10시간!

2. 측정: 기준선 설정

작은 샘플로 테스트

# 1000행만 처리
import time

start = time.time()
results = process_file('sample_1000.csv')
elapsed = time.time() - start

print(f"1000 rows: {elapsed:.2f}s")
# 출력: 1000 rows: 37.23s

# 예상 시간 계산
estimated_hours = (37.23 * 1000000 / 1000) / 3600
print(f"Estimated for 1M rows: {estimated_hours:.1f} hours")
# 출력: Estimated for 1M rows: 10.3 hours

3. 프로파일링: cProfile로 병목 찾기

cProfile 실행

$ python -m cProfile -o profile.stats process_data.py sample_1000.csv
  • -m cProfile: 표준 라이브러리 cProfile을 모듈로 실행합니다.
  • -o profile.stats: 프로파일 결과를 바이너리 파일로 저장해 나중에 pstats로 정렬·필터링하기 쉽게 합니다.

결과 분석

import pstats

stats = pstats.Stats('profile.stats')
stats.sort_stats('cumulative')
stats.print_stats(10)
  • sort_stats('cumulative'): 함수가 호출 스택 전체에서 누적한 시간이 큰 순으로 봅니다(“어디서 시간이 새는지” 찾기에 적합합니다).
  • print_stats(10): 상위 10개 함수만 출력합니다.
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000   35.234    0.035   35.234    0.035 process_data.py:15(calculate)
  1000000    1.456    0.000    1.456    0.000 {built-in method builtins.float}
  1000000    0.523    0.000    0.523    0.000 {method 'append' of 'list' objects}

발견: calculate 함수가 전체 시간의 94% 차지!


4. 병목 1: 중첩 루프

문제 분석

def calculate(row):
    total = 0
    for i in range(1000):          # 1000번
        for j in range(100):       # × 100번
            total += float(row['value']) * i * j  # = 100,000번
    return total

# 1000행 × 100,000번 = 1억 번 연산!

복잡도

  • 시간 복잡도: O(n × 1000 × 100) = O(n × 100,000)
  • n = 1,000,000: 1000억 번 연산

5. 최적화 1: NumPy 벡터화

NumPy로 전환

import numpy as np
import pandas as pd

def process_file_numpy(filename):
    # Pandas로 CSV 읽기 (C로 구현되어 빠름)
    df = pd.read_csv(filename)
    
    # NumPy 벡터 연산
    values = df['value'].values  # NumPy array
    
    # 중첩 루프를 벡터 연산으로
    i_range = np.arange(1000)
    j_range = np.arange(100)
    
    # 외적 (outer product)
    ij_product = np.outer(i_range, j_range).sum()
    
    # 벡터화 연산 (브로드캐스팅)
    results = values * ij_product
    
    return results

# 테스트
start = time.time()
results = process_file_numpy('sample_1000.csv')
elapsed = time.time() - start

print(f"NumPy: {elapsed:.2f}s")
# 출력: NumPy: 0.23s (37.23s → 0.23s, 162배 개선!)

왜 빠른가?

  1. C로 구현: NumPy는 내부적으로 C/Fortran
  2. 벡터화: 루프를 CPU 벡터 명령어로 처리
  3. 메모리 효율: 연속된 메모리 블록 사용

6. 병목 2: 문자열 연산

추가 문제 발견

def format_results(results):
    output = ""
    for r in results:
        output += f"{r}\n"  # 🚨 문자열 += 반복
    return output

# 1,000,000행 → 1,000,000번 문자열 재할당

최적화

# 방법 1: join 사용
def format_results(results):
    return "\n".join(str(r) for r in results)

# 방법 2: StringIO
from io import StringIO

def format_results(results):
    output = StringIO()
    for r in results:
        output.write(f"{r}\n")
    return output.getvalue()

# 벤치마크
# += 방식: 12.3s
# join: 0.8s (15배 개선)
# StringIO: 1.1s (11배 개선)

7. 최적화 2: Cython 적용

Cython 코드

# calculate.pyx
cimport cython

@cython.boundscheck(False)  # 경계 검사 제거
@cython.wraparound(False)   # 음수 인덱스 제거
def calculate_cython(double value):
    cdef long i, j
    cdef double total = 0.0
    
    for i in range(1000):
        for j in range(100):
            total += value * i * j
    
    return total

빌드 및 사용

# setup.py
from setuptools import setup
from Cython.Build import cythonize

setup(
    ext_modules=cythonize("calculate.pyx")
)
  • cythonize: .pyxC 확장 모듈로 컴파일할 소스 목록으로 바꿉니다.
$ python setup.py build_ext --inplace
  • build_ext: Cython이 만든 확장 모듈을 빌드합니다.
  • --inplace: 빌드 산출물을 소스 옆(패키지 트리 안)에 두어 import calculate가 바로 되게 합니다.
# 사용
from calculate import calculate_cython

def process_file_cython(filename):
    df = pd.read_csv(filename)
    results = [calculate_cython(v) for v in df['value']]
    return results

# 벤치마크
# Pure Python: 37.23s
# Cython: 2.15s (17배 개선)

8. 최적화 3: 멀티프로세싱

병렬 처리

from multiprocessing import Pool
import numpy as np

def process_chunk(chunk):
    """청크 단위로 처리"""
    return chunk * ij_product

def process_file_parallel(filename, num_workers=4):
    df = pd.read_csv(filename)
    values = df['value'].values
    
    # 청크로 분할
    chunk_size = len(values) // num_workers
    chunks = np.array_split(values, num_workers)
    
    # 병렬 처리
    with Pool(num_workers) as pool:
        results = pool.map(process_chunk, chunks)
    
    # 결과 합치기
    return np.concatenate(results)

# 벤치마크
# 단일 프로세스 (NumPy): 0.23s
# 멀티프로세싱 (4 cores): 0.08s (3배 개선)

9. 최종 결과: 100배 성능 향상

단계별 개선

단계방법실행 시간개선율
0초기 (Pure Python)10h 23m-
1NumPy 벡터화3m 50s162배
2문자열 최적화3m 42s1.04배
3멀티프로세싱6m 15s0.6배
최종NumPy + 병렬6m 15s100배

주의: 멀티프로세싱은 오버헤드가 있어 작은 데이터셋에서는 오히려 느릴 수 있습니다.

최종 코드

import pandas as pd
import numpy as np
from multiprocessing import Pool

def process_file_optimized(filename, num_workers=4):
    # Pandas로 빠른 CSV 읽기
    df = pd.read_csv(filename)
    values = df['value'].values
    
    # 계산 상수 (한 번만)
    i_range = np.arange(1000)
    j_range = np.arange(100)
    ij_product = np.outer(i_range, j_range).sum()
    
    # 청크 분할
    chunks = np.array_split(values, num_workers)
    
    # 병렬 처리
    with Pool(num_workers) as pool:
        results = pool.starmap(
            lambda chunk: chunk * ij_product,
            [(c,) for c in chunks]
        )
    
    return np.concatenate(results)

10. 추가 최적화 팁

PyPy 사용

# PyPy로 실행 (JIT 컴파일)
$ pypy3 process_data.py

# 순수 Python 코드는 PyPy가 더 빠를 수 있음
# NumPy는 CPython이 더 나음

Numba 사용

from numba import jit

@jit(nopython=True)
def calculate_numba(value):
    total = 0.0
    for i in range(1000):
        for j in range(100):
            total += value * i * j
    return total

# NumPy와 비슷한 성능, 코드는 더 간단

메모리 매핑

# 큰 파일은 메모리 매핑
import mmap

with open('huge_file.bin', 'r+b') as f:
    mmapped = mmap.mmap(f.fileno(), 0)
    # 필요한 부분만 읽기
    data = mmapped[1000:2000]

마무리

Python 성능 최적화의 핵심:

  1. 측정 먼저: 추측하지 말고 프로파일링
  2. 알고리즘 개선: 복잡도를 먼저 줄이기
  3. NumPy 벡터화: 루프를 벡터 연산으로
  4. 병렬화: CPU 코어 활용
  5. Cython/Numba: 핫스팟만 최적화

핵심: Python은 느리지 않습니다. 잘못 쓰면 느릴 뿐입니다.

프로덕션에 옮기기 전에 점검할 것

  • 수치 동일성: 벡터화·병렬화 후에도 허용 오차 안에서 결과가 맞는지 테스트로 고정합니다.
  • 메모리: 대형 배열을 여러 번 복사하면 RAM이 병목이 될 수 있습니다.
  • 배치 SLA: 야간 배치라면 총 시간뿐 아니라 피크 시간 DB 부하도 함께 봅니다.

FAQ

Q1. NumPy vs Pandas 중 뭘 써야 하나요?

Pandas는 데이터프레임 조작에, NumPy는 수치 계산에 특화되어 있습니다. 둘을 조합하여 사용하세요.

Q2. 멀티스레딩 vs 멀티프로세싱?

Python GIL 때문에 CPU 바운드 작업은 멀티프로세싱을, I/O 바운드는 멀티스레딩을 사용하세요.

Q3. Cython vs Numba?

Cython은 유연하지만 빌드가 필요하고, Numba는 간단하지만 NumPy 스타일 코드에 최적화되어 있습니다.


관련 글

  • Python 성능 최적화 가이드
  • NumPy 벡터화 완벽 가이드
  • Python 멀티프로세싱

키워드

Python, 성능 최적화, Performance, 프로파일링, cProfile, NumPy, 벡터화, Cython, Numba, 멀티프로세싱, 실전 사례, 데이터 처리