Kotlin Coroutine: Channel vs Flow, 언제 쓰나 | 비교 실전 가이드

Kotlin Coroutine: Channel vs Flow, 언제 쓰나 | 비교 실전 가이드

이 글의 핵심

Kotlin 코루틴 채널·플로우 차이—핫/콜드·백프레셔·수집 시점을 기준으로 Channel과 Flow를 고르는 실무 기준을 한 줄로 압축했습니다.

들어가며

Kotlin에서 여러 값을 시간에 따라 다룰 때 후보는 크게 둘입니다. Channel은 보통 핫(hot)에 가깝고, 생산자가 소비자와 독립적으로(또는 강하게 결합해) 이벤트를 밀어 넣습니다. Flow는 기본적으로 콜드(cold)이며, 수집(collect)이 시작될 때 업스트림이 실행됩니다.

이 글은 “둘 다 스트림 같은데 뭐가 다르냐”는 질문에 백프레셔·소유권·테스트 관점에서 답을 정리합니다. Kotlin 코루틴에서 채널과 플로우의 차이를 핫/콜드와 수집 시점으로 나누면 선택이 단순해집니다. 코루틴과 스레드의 큰 그림은 코루틴 vs 스레드, 기본기는 코루틴 가이드와 함께 보면 좋습니다.


목차

  1. 개념 설명
  2. 실전 구현 (단계별 코드)
  3. 고급 활용
  4. 성능·비교
  5. 실무 사례
  6. 트러블슈팅
  7. 마무리

개념 설명

  • Channel: send/receive동시에 실행 중인 생산자·소비자를 연결합니다. 버퍼가 없으면 한쪽이 준비될 때까지 서로를 맞춥니다(동기화 채널). “이미 돌아가는 파이프라인”을 표현하기 좋습니다.
  • Flow: 중단 가능한 일련의 값을 표현합니다. collect가 호출되기 전까지는 업스트림 로직이 필요할 때만 돌아갑니다(콜드). “같은 소스를 여러 번 구독할 수 있는” 리액티브 시퀀스에 가깝습니다.
  • SharedFlow / StateFlow: Flow 계열이지만 에 가깝게 동작합니다. “Channel vs Flow” 질문은 종종 콜드 Flow vs SharedFlow까지 확장됩니다.

실전 구현 (단계별 코드)

1) 의존성(2026년 기준 Kotlin 2.x + Coroutines 1.10+)

// build.gradle.kts
dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2")
}

2) Channel: 워커가 결과를 모아 전달

기본 Channel 사용

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel

fun main() = runBlocking {
    val ch = Channel<Int>(capacity = 4)

    val producer = launch {
        repeat(5) { i ->
            println("Sending $i")
            ch.send(i)  // 버퍼 가득 차면 중단
        }
        ch.close()
        println("Producer done")
    }

    val consumer = launch {
        for (x in ch) {
            println("Received $x")
            delay(100)  // 느린 소비자 시뮬레이션
        }
        println("Consumer done")
    }

    producer.join()
    consumer.join()
}

출력:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
Received 0
Producer done
Received 1
Received 2
Received 3
Received 4
Consumer done

무버퍼 Channel (Rendezvous)

fun main() = runBlocking {
    val ch = Channel<Int>()  // capacity = 0 (기본값)

    val producer = launch {
        repeat(3) { i ->
            println("Sending $i")
            ch.send(i)  // 소비자가 받을 때까지 중단
            println("Sent $i")
        }
        ch.close()
    }

    val consumer = launch {
        delay(200)  // 소비자가 늦게 시작
        for (x in ch) {
            println("Received $x")
            delay(100)
        }
    }

    producer.join()
    consumer.join()
}

출력:

Sending 0
(200ms 대기)
Received 0
Sent 0
Sending 1
(100ms 대기)
Received 1
Sent 1
...

다중 생산자/소비자

fun main() = runBlocking {
    val ch = Channel<Int>(capacity = 10)

    // 생산자 3개
    repeat(3) { producerId ->
        launch {
            repeat(5) { i ->
                ch.send(producerId * 100 + i)
            }
        }
    }

    // 소비자 2개
    repeat(2) { consumerId ->
        launch {
            for (x in ch) {
                println("Consumer $consumerId received $x")
            }
        }
    }

    delay(500)  // 모든 작업 완료 대기
    ch.close()
}

3) Flow: 수집할 때마다(또는 각 collect마다) 로직 실행

기본 Flow (콜드)

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

fun numbers(): Flow<Int> = flow {
    println("Flow started")
    repeat(5) { i ->
        emit(i)
        delay(100)
    }
    println("Flow completed")
}

fun main() = runBlocking {
    println("=== First collect ===")
    numbers().collect { println("a: $it") }
    
    println("\n=== Second collect ===")
    numbers().collect { println("b: $it") }  // 콜드: 다시 실행됨
}

출력:

=== First collect ===
Flow started
a: 0
a: 1
a: 2
a: 3
a: 4
Flow completed

=== Second collect ===
Flow started
b: 0
b: 1
b: 2
b: 3
b: 4
Flow completed

Flow 연산자 체이닝

fun main() = runBlocking {
    flow {
        repeat(10) { emit(it) }
    }
    .filter { it % 2 == 0 }  // 짝수만
    .map { it * it }         // 제곱
    .take(3)                 // 처음 3개
    .collect { println(it) }
}

출력:

0
4
16

Flow 백프레셔 (buffer, conflate, collectLatest)

fun main() = runBlocking {
    // 빠른 생산자
    val fastFlow = flow {
        repeat(10) { i ->
            emit(i)
            delay(10)  // 빠름
        }
    }

    // 느린 소비자
    println("=== No buffer ===")
    fastFlow.collect { 
        delay(100)  // 느림
        println(it) 
    }

    println("\n=== With buffer ===")
    fastFlow.buffer(5).collect { 
        delay(100)
        println(it) 
    }

    println("\n=== Conflate (latest only) ===")
    fastFlow.conflate().collect { 
        delay(100)
        println(it) 
    }

    println("\n=== collectLatest (cancel previous) ===")
    fastFlow.collectLatest { 
        delay(100)
        println(it) 
    }
}

4) 핫에 가까운 이벤트: SharedFlow

기본 SharedFlow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
    val hot = MutableSharedFlow<Int>(
        replay = 0,              // 새 구독자에게 재전송할 개수
        extraBufferCapacity = 16 // 버퍼 크기
    )

    // 구독자 1
    val sub1 = scope.launch { 
        hot.collect { println("Subscriber 1: $it") } 
    }

    // 구독자 2
    val sub2 = scope.launch { 
        hot.collect { println("Subscriber 2: $it") } 
    }

    delay(100)  // 구독자 준비 대기

    // 이벤트 발행
    repeat(3) { 
        hot.emit(it)
        delay(50)
    }

    delay(100)
    sub1.cancel()
    sub2.cancel()
    scope.cancel()
}

출력:

Subscriber 1: 0
Subscriber 2: 0
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2

StateFlow (상태 관리)

data class UiState(val count: Int, val message: String)

class ViewModel {
    private val _state = MutableStateFlow(UiState(0, "Initial"))
    val state: StateFlow<UiState> = _state.asStateFlow()

    fun increment() {
        _state.update { it.copy(count = it.count + 1) }
    }

    fun setMessage(msg: String) {
        _state.update { it.copy(message = msg) }
    }
}

fun main() = runBlocking {
    val vm = ViewModel()

    // 상태 구독
    val job = launch {
        vm.state.collect { state ->
            println("State: count=${state.count}, message=${state.message}")
        }
    }

    delay(100)

    // 상태 변경
    vm.increment()
    delay(50)
    vm.increment()
    delay(50)
    vm.setMessage("Updated")

    delay(100)
    job.cancel()
}

출력:

State: count=0, message=Initial
State: count=1, message=Initial
State: count=2, message=Initial
State: count=2, message=Updated

SharedFlow vs StateFlow

fun main() = runBlocking {
    // SharedFlow: 이벤트 스트림
    val events = MutableSharedFlow<String>()
    launch {
        events.collect { println("Event: $it") }
    }
    events.emit("Click")
    events.emit("Scroll")

    // StateFlow: 상태 (항상 최신 값 유지)
    val state = MutableStateFlow("Initial")
    launch {
        state.collect { println("State: $it") }
    }
    state.value = "Loading"
    state.value = "Success"

    delay(100)
}

고급 활용: 백프레셔와 버퍼

1) Channel 백프레셔 전략

버퍼 용량 제한

fun main() = runBlocking {
    // 버퍼 크기 2
    val ch = Channel<Int>(capacity = 2)

    val producer = launch {
        repeat(5) { i ->
            println("Sending $i")
            ch.send(i)  // 버퍼 가득 차면 여기서 중단
            println("Sent $i")
        }
        ch.close()
    }

    delay(500)  // 소비자 늦게 시작

    val consumer = launch {
        for (x in ch) {
            println("Received $x")
            delay(200)
        }
    }

    producer.join()
    consumer.join()
}

출력:

Sending 0
Sent 0
Sending 1
Sent 1
Sending 2
(버퍼 가득, 대기)
(500ms 후 소비자 시작)
Received 0
Sent 2
Sending 3
Received 1
...

무제한 버퍼 (위험)

import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED

fun main() = runBlocking {
    val ch = Channel<Int>(capacity = UNLIMITED)

    val producer = launch {
        repeat(1_000_000) { i ->
            ch.send(i)  // 절대 중단 안 됨 → 메모리 폭증 위험
        }
        ch.close()
    }

    val consumer = launch {
        for (x in ch) {
            delay(10)  // 느린 소비
        }
    }

    producer.join()
    consumer.join()
}

문제점: 생산자가 빠르면 메모리 무한 증가

Conflated Channel (최신 값만)

import kotlinx.coroutines.channels.Channel.Factory.CONFLATED

fun main() = runBlocking {
    val ch = Channel<Int>(capacity = CONFLATED)

    val producer = launch {
        repeat(10) { i ->
            ch.send(i)  // 이전 값 덮어씀
            delay(10)
        }
        ch.close()
    }

    delay(150)  // 소비자 늦게 시작

    val consumer = launch {
        for (x in ch) {
            println("Received $x")
        }
    }

    producer.join()
    consumer.join()
}

출력: 최신 값만 받음 (중간 값 손실)

2) Flow 백프레셔 전략

buffer() - 버퍼 추가

fun main() = runBlocking {
    flow {
        repeat(5) { i ->
            emit(i)
            println("Emitted $i")
        }
    }
    .buffer(2)  // 버퍼 크기 2
    .collect { 
        delay(100)  // 느린 소비자
        println("Collected $it") 
    }
}

conflate() - 최신 값만

fun main() = runBlocking {
    flow {
        repeat(10) { i ->
            emit(i)
            delay(10)
        }
    }
    .conflate()  // 소비자가 바쁘면 중간 값 스킵
    .collect { 
        delay(100)
        println("Collected $it") 
    }
}

출력: 0, 9 등 일부만 출력 (중간 값 스킵)

collectLatest() - 이전 수집 취소

fun main() = runBlocking {
    flow {
        repeat(5) { i ->
            emit(i)
            delay(50)
        }
    }
    .collectLatest { value ->
        println("Collecting $value")
        delay(200)  // 느린 처리
        println("Processed $value")
    }
}

출력:

Collecting 0
Collecting 1  (0 처리 취소)
Collecting 2  (1 처리 취소)
Collecting 3  (2 처리 취소)
Collecting 4  (3 처리 취소)
Processed 4   (마지막만 완료)

3) 구조화된 동시성 (Structured Concurrency)

coroutineScope로 자식 묶기

suspend fun fetchUserData(userId: Int): UserData = coroutineScope {
    // 병렬로 3개 API 호출
    val nameDeferred = async { fetchName(userId) }
    val ordersDeferred = async { fetchOrders(userId) }
    val profileDeferred = async { fetchProfile(userId) }

    // 하나라도 실패하면 나머지 자동 취소
    UserData(
        name = nameDeferred.await(),
        orders = ordersDeferred.await(),
        profile = profileDeferred.await()
    )
}

suspend fun fetchName(userId: Int): String {
    delay(100)
    return "User-$userId"
}

suspend fun fetchOrders(userId: Int): List<String> {
    delay(150)
    return listOf("Order1", "Order2")
}

suspend fun fetchProfile(userId: Int): String {
    delay(80)
    return "Profile-$userId"
}

fun main() = runBlocking {
    try {
        val data = fetchUserData(123)
        println("Fetched: $data")
    } catch (e: Exception) {
        println("Failed: ${e.message}")
    }
}

Channel 파이프라인 + 구조화된 동시성

fun main() = runBlocking {
    coroutineScope {
        val numbers = Channel<Int>(capacity = 10)
        val squares = Channel<Int>(capacity = 10)

        // Stage 1: 숫자 생성
        launch {
            repeat(10) { i ->
                numbers.send(i)
            }
            numbers.close()
        }

        // Stage 2: 제곱 계산
        launch {
            for (n in numbers) {
                squares.send(n * n)
            }
            squares.close()
        }

        // Stage 3: 출력
        launch {
            for (s in squares) {
                println("Square: $s")
            }
        }
    }  // 모든 자식 작업 완료 대기
    println("Pipeline completed")
}

4) 핫 Flow: SharedFlow와 StateFlow

SharedFlow (이벤트 스트림)

class EventBus {
    private val _events = MutableSharedFlow<String>(
        replay = 0,              // 새 구독자에게 재전송 안 함
        extraBufferCapacity = 64 // 버퍼 크기
    )
    val events: SharedFlow<String> = _events.asSharedFlow()

    suspend fun emit(event: String) {
        _events.emit(event)
    }
}

fun main() = runBlocking {
    val bus = EventBus()

    // 구독자 1
    val job1 = launch {
        bus.events.collect { println("Sub1: $it") }
    }

    // 구독자 2
    val job2 = launch {
        bus.events.collect { println("Sub2: $it") }
    }

    delay(100)

    // 이벤트 발행
    bus.emit("Event1")
    bus.emit("Event2")
    bus.emit("Event3")

    delay(100)
    job1.cancel()
    job2.cancel()
}

StateFlow (상태 관리)

class Counter {
    private val _count = MutableStateFlow(0)
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() {
        _count.update { it + 1 }
    }

    fun decrement() {
        _count.update { it - 1 }
    }
}

fun main() = runBlocking {
    val counter = Counter()

    // 상태 구독
    val job = launch {
        counter.count.collect { count ->
            println("Count: $count")
        }
    }

    delay(100)

    counter.increment()  // Count: 1
    delay(50)
    counter.increment()  // Count: 2
    delay(50)
    counter.decrement()  // Count: 1

    delay(100)
    job.cancel()
}

replay 옵션 비교

fun main() = runBlocking {
    // replay = 0 (기본)
    val noReplay = MutableSharedFlow<Int>(replay = 0)
    noReplay.emit(1)
    noReplay.emit(2)
    
    launch {
        noReplay.collect { println("No replay: $it") }
    }
    // 출력: 없음 (이미 발행된 값)

    delay(100)

    // replay = 2
    val withReplay = MutableSharedFlow<Int>(replay = 2)
    withReplay.emit(1)
    withReplay.emit(2)
    withReplay.emit(3)
    
    launch {
        withReplay.collect { println("With replay: $it") }
    }
    // 출력: With replay: 2, With replay: 3 (마지막 2개)

    delay(100)
}

성능·비교 요약

관점ChannelFlow (콜드)
시작 시점보통 생산자·소비자가 동시에 돌아감collect 시 업스트림 실행
재사용동일 채널 인스턴스를 여러 소비자와? 설계 필요collect마다 새 실행(기본)
백프레셔버퍼·블로킹 send연산자로 조절
테스트send/receive로 단계 분리runTest + 가상 시간
UI/상태채널만으로는 UI 모델이 부족한 경우 많음StateFlow상태 표현에 강함

실무 사례

  • 백그라운드 워커 풀 → 단일 집계기: Channel로 작업 큐를 두고, 소비자가 DB에 배치 기록.
  • REST 응답 스트리밍: 서버에서 청크를 밀 때는 프레임워크별로 다르지만, 동시성 경계는 Channel로 두기 쉽습니다.
  • UI 이벤트: 사용자 입력·네트워크 결과를 상태 한 방향으로 줄이려면 StateFlow/SharedFlow가 많이 쓰입니다.
  • 리포지토리 레이어: “한 번 호출해 여러 값”은 Flow, “동시에 실행되는 파이프라인”은 Channel 후보입니다.

트러블슈팅

증상: Flow가 두 번 실행된다
→ 콜드 Flow는 collect마다 처음부터 다시 돕니다. shareIn으로 으로 바꿀지, 의도인지 확인하세요.

증상: Channel에서 ClosedSendChannelException
→ 닫힌 채널에 send했는지, close() 순서가 맞는지 확인하세요.

증상: 생산이 너무 빨라 메모리가 급증
→ 무제한 버퍼·Dispatchers.IO 남용을 의심하세요. 용량·드롭 정책을 명시하세요.

증상: 테스트가 불안정하다
runTest, StandardTestDispatcher, 가상 시간으로 결정적으로 만드세요.


마무리

Channel동시에 도는 생산·소비를 잇는 도구이고, Flow지연·중단 가능한 값의 나열을 표현하는 도구입니다. “핫/콜드·백프레셔·수집 시점”만 명확히 해도 선택이 단순해집니다. 팀 내 용어를 코루틴 vs 스레드와 맞춰 두면 리뷰 비용도 줄어듭니다.