Table of contents
- 9편에서 이어서
- withContext — 스레드 전환
- Flow — 비동기 데이터 스트림
- Flow 연산자
- Flow 컨텍스트와 flowOn
- Channel — 코루틴 간 통신
- 에러 처리
- 구조화된 동시성
9편에서 이어서
9편에서 코루틴의 기본 도구인 suspend, launch, async, 디스패처를 다뤘다. 이번 편에서는 한 단계 더 들어간다. 데이터 스트림을 다루는 Flow, 코루틴 간 통신을 위한 Channel, 그리고 에러가 터졌을 때 시스템이 어떻게 반응해야 하는지를 살펴본다.
withContext — 스레드 전환
9편에서 디스패처를 launch의 인자로 넘기는 방식을 봤다. 하나의 코루틴 안에서 디스패처를 바꿔야 할 때는 withContext를 쓴다.
import kotlinx.coroutines.*
suspend fun fetchFromNetwork(): String = withContext(Dispatchers.IO) {
println("네트워크 호출: ${Thread.currentThread().name}")
delay(1000)
"서버 데이터"
}
suspend fun processData(data: String): String = withContext(Dispatchers.Default) {
println("데이터 처리: ${Thread.currentThread().name}")
data.uppercase()
}
fun main() = runBlocking {
val raw = fetchFromNetwork()
val processed = processData(raw)
println("결과: $processed")
}
네트워크 호출: DefaultDispatcher-worker-1
데이터 처리: DefaultDispatcher-worker-1
결과: 서버 데이터
withContext는 지정한 디스패처에서 블록을 실행하고, 끝나면 원래 디스패처로 돌아온다. 새 코루틴을 만드는 게 아니라 현재 코루틴의 실행 컨텍스트만 바꾸는 것이다. launch(Dispatchers.IO)와 달리 결과를 직접 반환하기 때문에 동기 코드처럼 읽힌다.
실무에서 가장 흔한 패턴은 IO에서 데이터를 가져온 뒤 Default나 Main에서 처리하는 구조다.
Flow — 비동기 데이터 스트림
suspend 함수는 값을 하나만 반환한다. 값이 여러 개 순차적으로 나오는 경우 — 실시간 센서 데이터, 주가 업데이트, 검색어 자동완성 — 에는 Flow가 필요하다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numberFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(300)
emit(i) // 값을 하나씩 방출
}
}
fun main() = runBlocking {
println("Flow 수집 시작")
numberFlow().collect { value ->
println("받음: $value")
}
println("Flow 수집 끝")
}
Flow 수집 시작
받음: 1
받음: 2
받음: 3
받음: 4
받음: 5
Flow 수집 끝
flow { } 빌더 안에서 emit()으로 값을 내보내고, collect로 값을 받는다. 핵심적인 특성은 콜드(cold) 스트림이라는 점이다. collect가 호출되기 전까지는 아무 코드도 실행되지 않는다.
데이터가 Flow 파이프라인을 통과하는 과정을 그림으로 보면 직관적이다.
flowchart LR
emit["emit()\n1, 2, 3, …, 10"]
filter["filter\n{ it % 2 == 0 }"]
map["map\n{ it * it }"]
take["take(3)"]
collect["collect\n{ println(it) }"]
emit -->|"1,2,3,…,10"| filter
filter -->|"2,4,6,8,10"| map
map -->|"4,16,36,64,100"| take
take -->|"4,16,36"| collect
Flow 연산자
5편에서 컬렉션의 filter, map을 배웠는데, Flow에도 같은 연산자가 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val numbers = (1..10).asFlow() // List → Flow 변환
numbers
.filter { it % 2 == 0 } // 짝수만
.map { it * it } // 제곱
.take(3) // 처음 3개만
.collect { println(it) }
}
4
16
36
컬렉션 API와 동일한 느낌으로 쓸 수 있지만, 차이점이 있다. Flow 연산자는 중간 리스트를 만들지 않고 값이 하나씩 파이프라인을 통과한다. 대량의 데이터를 처리할 때 메모리 효율이 좋다.
자주 쓰이는 연산자를 좀 더 보자.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// transform — map보다 유연한 변환
flowOf(1, 2, 3)
.transform { value ->
emit("$value 시작")
emit("$value 끝")
}
.collect { println(it) }
// 1 시작, 1 끝, 2 시작, 2 끝, 3 시작, 3 끝
println("---")
// onEach — 사이드 이펙트 (로깅 등)
flowOf("a", "b", "c")
.onEach { println("처리 중: $it") }
.map { it.uppercase() }
.collect { println("결과: $it") }
}
Flow 컨텍스트와 flowOn
Flow의 방출(emit) 코드와 수집(collect) 코드가 서로 다른 디스패처에서 돌아야 할 때가 있다. 데이터는 IO 스레드에서 가져오고, 수집은 메인 스레드에서 하는 식이다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun heavyComputation(): Flow<Int> = flow {
for (i in 1..3) {
println("방출 스레드: ${Thread.currentThread().name}")
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default) // 방출 코드의 디스패처 지정
fun main() = runBlocking {
heavyComputation().collect { value ->
println("수집 스레드: ${Thread.currentThread().name}, 값: $value")
}
}
방출 스레드: DefaultDispatcher-worker-1
수집 스레드: main, 값: 1
방출 스레드: DefaultDispatcher-worker-1
수집 스레드: main, 값: 2
방출 스레드: DefaultDispatcher-worker-1
수집 스레드: main, 값: 3
flowOn은 그 위쪽(업스트림)의 디스패처를 바꾼다. 수집 쪽의 디스패처에는 영향을 주지 않는다. 이 규칙만 기억하면 된다.
Channel — 코루틴 간 통신
Flow가 단방향 스트림이라면, Channel은 코루틴 사이의 파이프라인이다. 한쪽에서 값을 보내고 다른 쪽에서 받는 구조다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>()
// 생산자
launch {
val items = listOf("사과", "바나나", "딸기")
for (item in items) {
println("보내는 중: $item")
channel.send(item)
delay(200)
}
channel.close() // 더 이상 보낼 게 없음
}
// 소비자
for (item in channel) {
println("받음: $item")
}
println("채널 닫힘")
}
보내는 중: 사과
받음: 사과
보내는 중: 바나나
받음: 바나나
보내는 중: 딸기
받음: 딸기
채널 닫힘
send와 receive는 모두 suspend 함수다. 채널이 가득 차면 send가 일시 정지하고, 채널이 비어 있으면 receive가 일시 정지한다. 이 배압(backpressure) 메커니즘 덕분에 생산자와 소비자의 속도 차이를 자연스럽게 조절할 수 있다.
채널에는 버퍼 크기를 지정할 수 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// 버퍼 크기 2 — 2개까지는 send가 바로 리턴
val buffered = Channel<Int>(capacity = 2)
launch {
for (i in 1..5) {
println("보내기 전: $i")
buffered.send(i)
println("보낸 후: $i")
}
buffered.close()
}
delay(1000) // 소비자가 느리게 시작
for (value in buffered) {
println("받음: $value")
delay(300)
}
}
에러 처리
코루틴에서 에러가 발생하면 어떻게 될까? 기본적으로 부모 코루틴까지 전파된다.
import kotlinx.coroutines.*
fun main() = runBlocking {
// try-catch로 처리
val deferred = async {
throw RuntimeException("비동기 에러 발생")
}
try {
deferred.await()
} catch (e: RuntimeException) {
println("잡음: ${e.message}") // 잡음: 비동기 에러 발생
}
}
async에서 발생한 예외는 await()을 호출할 때 다시 던져진다. 그래서 await() 호출을 try-catch로 감싸면 된다.
CoroutineExceptionHandler
최상위에서 예외를 일괄 처리하고 싶다면 CoroutineExceptionHandler를 쓴다.
import kotlinx.coroutines.*
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("예외 처리기: ${exception.message}")
}
val scope = CoroutineScope(Dispatchers.Default + handler)
scope.launch {
throw IllegalStateException("뭔가 잘못됐다")
}
delay(500) // 예외 처리기가 동작할 시간
println("메인 계속 진행")
}
예외 처리기: 뭔가 잘못됐다
메인 계속 진행
핸들러는 launch에서 발생한 처리되지 않은 예외를 잡는다. async에는 동작하지 않는데, async의 예외는 await()에서 처리하도록 설계되어 있기 때문이다.
Flow 에러 처리
Flow에서는 catch 연산자를 쓴다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw RuntimeException("데이터 소스 에러")
emit(3) // 실행되지 않음
}
fun main() = runBlocking {
riskyFlow()
.catch { e -> println("에러 잡음: ${e.message}") }
.collect { println("받음: $it") }
}
받음: 1
받음: 2
에러 잡음: 데이터 소스 에러
catch는 그 위쪽(업스트림)에서 발생한 예외만 잡는다. collect 안에서 발생한 예외는 잡지 않으므로, collect 쪽 에러는 별도의 try-catch가 필요하다.
구조화된 동시성
코루틴의 설계 철학 중 가장 중요한 개념이다. 핵심 원칙은 단순하다: 코루틴은 항상 부모-자식 관계를 형성하고, 부모는 모든 자식이 끝나야 끝난다.
import kotlinx.coroutines.*
fun main() = runBlocking {
try {
coroutineScope {
launch {
delay(500)
println("작업 1 완료")
}
launch {
delay(200)
throw RuntimeException("작업 2 실패")
}
launch {
delay(1000)
println("작업 3 완료") // 실행되지 않음
}
}
} catch (e: RuntimeException) {
println("에러: ${e.message}")
}
println("계속 진행")
}
에러: 작업 2 실패
계속 진행
작업 2가 실패하면 같은 스코프의 작업 1과 작업 3도 취소된다. 이게 구조화된 동시성의 핵심이다. 자식 하나가 실패하면 형제들도 취소되고, 부모에게 예외가 전파된다.
supervisorScope — 독립적인 자식
자식의 실패가 다른 자식에게 영향을 주지 않게 하고 싶다면 supervisorScope를 쓴다.
import kotlinx.coroutines.*
fun main() = runBlocking {
supervisorScope {
val job1 = launch {
delay(500)
println("작업 1 완료")
}
val job2 = launch {
delay(200)
throw RuntimeException("작업 2 실패")
}
val job3 = launch {
delay(1000)
println("작업 3 완료")
}
}
}
작업 1 완료
작업 3 완료
작업 2가 실패해도 작업 1과 3은 정상적으로 완료된다. 여러 독립적인 작업을 동시에 실행할 때 유용하다. 다만 실패한 코루틴의 예외가 무시되는 건 아니다. 로그에는 찍히므로 별도로 처리해야 한다.
코루틴 심화까지 왔다. Flow로 데이터 스트림을 다루고, Channel로 코루틴 간 통신을 하고, 에러를 체계적으로 처리하고, 구조화된 동시성으로 생명주기를 관리한다. 이 도구들을 조합하면 복잡한 비동기 로직도 읽기 좋은 코드로 작성할 수 있다.
다음 편에서는 Kotlin DSL과 고급 함수 기능을 다룬다. 수신 객체 람다, infix, operator overloading 같은 표현력 높은 기능을 살펴보자.
Loading comments...