Kotlin Coroutine: Channel vs Flow, 언제 쓰나 | 비교 실전 가이드
이 글의 핵심
Kotlin 코루틴 채널·플로우 차이—핫/콜드·백프레셔·수집 시점을 기준으로 Channel과 Flow를 고르는 실무 기준을 한 줄로 압축했습니다.
들어가며
Kotlin에서 여러 값을 시간에 따라 다룰 때 후보는 크게 둘입니다. Channel은 보통 핫(hot)에 가깝고, 생산자가 소비자와 독립적으로(또는 강하게 결합해) 이벤트를 밀어 넣습니다. Flow는 기본적으로 콜드(cold)이며, 수집(collect)이 시작될 때 업스트림이 실행됩니다.
이 글은 “둘 다 스트림 같은데 뭐가 다르냐”는 질문에 백프레셔·소유권·테스트 관점에서 답을 정리합니다. Kotlin 코루틴에서 채널과 플로우의 차이를 핫/콜드와 수집 시점으로 나누면 선택이 단순해집니다. 코루틴과 스레드의 큰 그림은 코루틴 vs 스레드, 기본기는 코루틴 가이드와 함께 보면 좋습니다.
목차
개념 설명
- Channel:
send/receive로 동시에 실행 중인 생산자·소비자를 연결합니다. 버퍼가 없으면 한쪽이 준비될 때까지 서로를 맞춥니다(동기화 채널). “이미 돌아가는 파이프라인”을 표현하기 좋습니다. - Flow: 중단 가능한 일련의 값을 표현합니다.
collect가 호출되기 전까지는 업스트림 로직이 필요할 때만 돌아갑니다(콜드). “같은 소스를 여러 번 구독할 수 있는” 리액티브 시퀀스에 가깝습니다. - SharedFlow / StateFlow: Flow 계열이지만 핫에 가깝게 동작합니다. “Channel vs Flow” 질문은 종종 콜드 Flow vs SharedFlow까지 확장됩니다.
실전 구현 (단계별 코드)
1) 의존성(2026년 기준 Kotlin 2.x + Coroutines 1.10+)
// build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2")
}
2) Channel: 워커가 결과를 모아 전달
기본 Channel 사용
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking {
val ch = Channel<Int>(capacity = 4)
val producer = launch {
repeat(5) { i ->
println("Sending $i")
ch.send(i) // 버퍼 가득 차면 중단
}
ch.close()
println("Producer done")
}
val consumer = launch {
for (x in ch) {
println("Received $x")
delay(100) // 느린 소비자 시뮬레이션
}
println("Consumer done")
}
producer.join()
consumer.join()
}
출력:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
Received 0
Producer done
Received 1
Received 2
Received 3
Received 4
Consumer done
무버퍼 Channel (Rendezvous)
fun main() = runBlocking {
val ch = Channel<Int>() // capacity = 0 (기본값)
val producer = launch {
repeat(3) { i ->
println("Sending $i")
ch.send(i) // 소비자가 받을 때까지 중단
println("Sent $i")
}
ch.close()
}
val consumer = launch {
delay(200) // 소비자가 늦게 시작
for (x in ch) {
println("Received $x")
delay(100)
}
}
producer.join()
consumer.join()
}
출력:
Sending 0
(200ms 대기)
Received 0
Sent 0
Sending 1
(100ms 대기)
Received 1
Sent 1
...
다중 생산자/소비자
fun main() = runBlocking {
val ch = Channel<Int>(capacity = 10)
// 생산자 3개
repeat(3) { producerId ->
launch {
repeat(5) { i ->
ch.send(producerId * 100 + i)
}
}
}
// 소비자 2개
repeat(2) { consumerId ->
launch {
for (x in ch) {
println("Consumer $consumerId received $x")
}
}
}
delay(500) // 모든 작업 완료 대기
ch.close()
}
3) Flow: 수집할 때마다(또는 각 collect마다) 로직 실행
기본 Flow (콜드)
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
fun numbers(): Flow<Int> = flow {
println("Flow started")
repeat(5) { i ->
emit(i)
delay(100)
}
println("Flow completed")
}
fun main() = runBlocking {
println("=== First collect ===")
numbers().collect { println("a: $it") }
println("\n=== Second collect ===")
numbers().collect { println("b: $it") } // 콜드: 다시 실행됨
}
출력:
=== First collect ===
Flow started
a: 0
a: 1
a: 2
a: 3
a: 4
Flow completed
=== Second collect ===
Flow started
b: 0
b: 1
b: 2
b: 3
b: 4
Flow completed
Flow 연산자 체이닝
fun main() = runBlocking {
flow {
repeat(10) { emit(it) }
}
.filter { it % 2 == 0 } // 짝수만
.map { it * it } // 제곱
.take(3) // 처음 3개
.collect { println(it) }
}
출력:
0
4
16
Flow 백프레셔 (buffer, conflate, collectLatest)
fun main() = runBlocking {
// 빠른 생산자
val fastFlow = flow {
repeat(10) { i ->
emit(i)
delay(10) // 빠름
}
}
// 느린 소비자
println("=== No buffer ===")
fastFlow.collect {
delay(100) // 느림
println(it)
}
println("\n=== With buffer ===")
fastFlow.buffer(5).collect {
delay(100)
println(it)
}
println("\n=== Conflate (latest only) ===")
fastFlow.conflate().collect {
delay(100)
println(it)
}
println("\n=== collectLatest (cancel previous) ===")
fastFlow.collectLatest {
delay(100)
println(it)
}
}
4) 핫에 가까운 이벤트: SharedFlow
기본 SharedFlow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
val hot = MutableSharedFlow<Int>(
replay = 0, // 새 구독자에게 재전송할 개수
extraBufferCapacity = 16 // 버퍼 크기
)
// 구독자 1
val sub1 = scope.launch {
hot.collect { println("Subscriber 1: $it") }
}
// 구독자 2
val sub2 = scope.launch {
hot.collect { println("Subscriber 2: $it") }
}
delay(100) // 구독자 준비 대기
// 이벤트 발행
repeat(3) {
hot.emit(it)
delay(50)
}
delay(100)
sub1.cancel()
sub2.cancel()
scope.cancel()
}
출력:
Subscriber 1: 0
Subscriber 2: 0
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2
StateFlow (상태 관리)
data class UiState(val count: Int, val message: String)
class ViewModel {
private val _state = MutableStateFlow(UiState(0, "Initial"))
val state: StateFlow<UiState> = _state.asStateFlow()
fun increment() {
_state.update { it.copy(count = it.count + 1) }
}
fun setMessage(msg: String) {
_state.update { it.copy(message = msg) }
}
}
fun main() = runBlocking {
val vm = ViewModel()
// 상태 구독
val job = launch {
vm.state.collect { state ->
println("State: count=${state.count}, message=${state.message}")
}
}
delay(100)
// 상태 변경
vm.increment()
delay(50)
vm.increment()
delay(50)
vm.setMessage("Updated")
delay(100)
job.cancel()
}
출력:
State: count=0, message=Initial
State: count=1, message=Initial
State: count=2, message=Initial
State: count=2, message=Updated
SharedFlow vs StateFlow
fun main() = runBlocking {
// SharedFlow: 이벤트 스트림
val events = MutableSharedFlow<String>()
launch {
events.collect { println("Event: $it") }
}
events.emit("Click")
events.emit("Scroll")
// StateFlow: 상태 (항상 최신 값 유지)
val state = MutableStateFlow("Initial")
launch {
state.collect { println("State: $it") }
}
state.value = "Loading"
state.value = "Success"
delay(100)
}
고급 활용: 백프레셔와 버퍼
1) Channel 백프레셔 전략
버퍼 용량 제한
fun main() = runBlocking {
// 버퍼 크기 2
val ch = Channel<Int>(capacity = 2)
val producer = launch {
repeat(5) { i ->
println("Sending $i")
ch.send(i) // 버퍼 가득 차면 여기서 중단
println("Sent $i")
}
ch.close()
}
delay(500) // 소비자 늦게 시작
val consumer = launch {
for (x in ch) {
println("Received $x")
delay(200)
}
}
producer.join()
consumer.join()
}
출력:
Sending 0
Sent 0
Sending 1
Sent 1
Sending 2
(버퍼 가득, 대기)
(500ms 후 소비자 시작)
Received 0
Sent 2
Sending 3
Received 1
...
무제한 버퍼 (위험)
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
fun main() = runBlocking {
val ch = Channel<Int>(capacity = UNLIMITED)
val producer = launch {
repeat(1_000_000) { i ->
ch.send(i) // 절대 중단 안 됨 → 메모리 폭증 위험
}
ch.close()
}
val consumer = launch {
for (x in ch) {
delay(10) // 느린 소비
}
}
producer.join()
consumer.join()
}
문제점: 생산자가 빠르면 메모리 무한 증가
Conflated Channel (최신 값만)
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
fun main() = runBlocking {
val ch = Channel<Int>(capacity = CONFLATED)
val producer = launch {
repeat(10) { i ->
ch.send(i) // 이전 값 덮어씀
delay(10)
}
ch.close()
}
delay(150) // 소비자 늦게 시작
val consumer = launch {
for (x in ch) {
println("Received $x")
}
}
producer.join()
consumer.join()
}
출력: 최신 값만 받음 (중간 값 손실)
2) Flow 백프레셔 전략
buffer() - 버퍼 추가
fun main() = runBlocking {
flow {
repeat(5) { i ->
emit(i)
println("Emitted $i")
}
}
.buffer(2) // 버퍼 크기 2
.collect {
delay(100) // 느린 소비자
println("Collected $it")
}
}
conflate() - 최신 값만
fun main() = runBlocking {
flow {
repeat(10) { i ->
emit(i)
delay(10)
}
}
.conflate() // 소비자가 바쁘면 중간 값 스킵
.collect {
delay(100)
println("Collected $it")
}
}
출력: 0, 9 등 일부만 출력 (중간 값 스킵)
collectLatest() - 이전 수집 취소
fun main() = runBlocking {
flow {
repeat(5) { i ->
emit(i)
delay(50)
}
}
.collectLatest { value ->
println("Collecting $value")
delay(200) // 느린 처리
println("Processed $value")
}
}
출력:
Collecting 0
Collecting 1 (0 처리 취소)
Collecting 2 (1 처리 취소)
Collecting 3 (2 처리 취소)
Collecting 4 (3 처리 취소)
Processed 4 (마지막만 완료)
3) 구조화된 동시성 (Structured Concurrency)
coroutineScope로 자식 묶기
suspend fun fetchUserData(userId: Int): UserData = coroutineScope {
// 병렬로 3개 API 호출
val nameDeferred = async { fetchName(userId) }
val ordersDeferred = async { fetchOrders(userId) }
val profileDeferred = async { fetchProfile(userId) }
// 하나라도 실패하면 나머지 자동 취소
UserData(
name = nameDeferred.await(),
orders = ordersDeferred.await(),
profile = profileDeferred.await()
)
}
suspend fun fetchName(userId: Int): String {
delay(100)
return "User-$userId"
}
suspend fun fetchOrders(userId: Int): List<String> {
delay(150)
return listOf("Order1", "Order2")
}
suspend fun fetchProfile(userId: Int): String {
delay(80)
return "Profile-$userId"
}
fun main() = runBlocking {
try {
val data = fetchUserData(123)
println("Fetched: $data")
} catch (e: Exception) {
println("Failed: ${e.message}")
}
}
Channel 파이프라인 + 구조화된 동시성
fun main() = runBlocking {
coroutineScope {
val numbers = Channel<Int>(capacity = 10)
val squares = Channel<Int>(capacity = 10)
// Stage 1: 숫자 생성
launch {
repeat(10) { i ->
numbers.send(i)
}
numbers.close()
}
// Stage 2: 제곱 계산
launch {
for (n in numbers) {
squares.send(n * n)
}
squares.close()
}
// Stage 3: 출력
launch {
for (s in squares) {
println("Square: $s")
}
}
} // 모든 자식 작업 완료 대기
println("Pipeline completed")
}
4) 핫 Flow: SharedFlow와 StateFlow
SharedFlow (이벤트 스트림)
class EventBus {
private val _events = MutableSharedFlow<String>(
replay = 0, // 새 구독자에게 재전송 안 함
extraBufferCapacity = 64 // 버퍼 크기
)
val events: SharedFlow<String> = _events.asSharedFlow()
suspend fun emit(event: String) {
_events.emit(event)
}
}
fun main() = runBlocking {
val bus = EventBus()
// 구독자 1
val job1 = launch {
bus.events.collect { println("Sub1: $it") }
}
// 구독자 2
val job2 = launch {
bus.events.collect { println("Sub2: $it") }
}
delay(100)
// 이벤트 발행
bus.emit("Event1")
bus.emit("Event2")
bus.emit("Event3")
delay(100)
job1.cancel()
job2.cancel()
}
StateFlow (상태 관리)
class Counter {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.update { it + 1 }
}
fun decrement() {
_count.update { it - 1 }
}
}
fun main() = runBlocking {
val counter = Counter()
// 상태 구독
val job = launch {
counter.count.collect { count ->
println("Count: $count")
}
}
delay(100)
counter.increment() // Count: 1
delay(50)
counter.increment() // Count: 2
delay(50)
counter.decrement() // Count: 1
delay(100)
job.cancel()
}
replay 옵션 비교
fun main() = runBlocking {
// replay = 0 (기본)
val noReplay = MutableSharedFlow<Int>(replay = 0)
noReplay.emit(1)
noReplay.emit(2)
launch {
noReplay.collect { println("No replay: $it") }
}
// 출력: 없음 (이미 발행된 값)
delay(100)
// replay = 2
val withReplay = MutableSharedFlow<Int>(replay = 2)
withReplay.emit(1)
withReplay.emit(2)
withReplay.emit(3)
launch {
withReplay.collect { println("With replay: $it") }
}
// 출력: With replay: 2, With replay: 3 (마지막 2개)
delay(100)
}
성능·비교 요약
| 관점 | Channel | Flow (콜드) |
|---|---|---|
| 시작 시점 | 보통 생산자·소비자가 동시에 돌아감 | collect 시 업스트림 실행 |
| 재사용 | 동일 채널 인스턴스를 여러 소비자와? 설계 필요 | collect마다 새 실행(기본) |
| 백프레셔 | 버퍼·블로킹 send | 연산자로 조절 |
| 테스트 | send/receive로 단계 분리 | runTest + 가상 시간 |
| UI/상태 | 채널만으로는 UI 모델이 부족한 경우 많음 | StateFlow로 상태 표현에 강함 |
실무 사례
- 백그라운드 워커 풀 → 단일 집계기:
Channel로 작업 큐를 두고, 소비자가 DB에 배치 기록. - REST 응답 스트리밍: 서버에서 청크를 밀 때는 프레임워크별로 다르지만, 동시성 경계는 Channel로 두기 쉽습니다.
- UI 이벤트: 사용자 입력·네트워크 결과를 상태 한 방향으로 줄이려면
StateFlow/SharedFlow가 많이 쓰입니다. - 리포지토리 레이어: “한 번 호출해 여러 값”은 Flow, “동시에 실행되는 파이프라인”은 Channel 후보입니다.
트러블슈팅
증상: Flow가 두 번 실행된다
→ 콜드 Flow는 collect마다 처음부터 다시 돕니다. shareIn으로 핫으로 바꿀지, 의도인지 확인하세요.
증상: Channel에서 ClosedSendChannelException
→ 닫힌 채널에 send했는지, close() 순서가 맞는지 확인하세요.
증상: 생산이 너무 빨라 메모리가 급증
→ 무제한 버퍼·Dispatchers.IO 남용을 의심하세요. 용량·드롭 정책을 명시하세요.
증상: 테스트가 불안정하다
→ runTest, StandardTestDispatcher, 가상 시간으로 결정적으로 만드세요.
마무리
Channel은 동시에 도는 생산·소비를 잇는 도구이고, Flow는 지연·중단 가능한 값의 나열을 표현하는 도구입니다. “핫/콜드·백프레셔·수집 시점”만 명확히 해도 선택이 단순해집니다. 팀 내 용어를 코루틴 vs 스레드와 맞춰 두면 리뷰 비용도 줄어듭니다.