Skip to content
ioob.dev
Go back

Kotlin 10편 — 코루틴 심화

· 5분 읽기

Table of contents

9편에서 이어서

9편에서 코루틴의 기본 도구인 suspend, launch, async, 디스패처를 다뤘다. 이번 편에서는 한 단계 더 들어간다. 데이터 스트림을 다루는 Flow, 코루틴 간 통신을 위한 Channel, 그리고 에러가 터졌을 때 시스템이 어떻게 반응해야 하는지를 살펴본다.

withContext — 스레드 전환

9편에서 디스패처를 launch의 인자로 넘기는 방식을 봤다. 하나의 코루틴 안에서 디스패처를 바꿔야 할 때는 withContext를 쓴다.

import kotlinx.coroutines.*

suspend fun fetchFromNetwork(): String = withContext(Dispatchers.IO) {
    println("네트워크 호출: ${Thread.currentThread().name}")
    delay(1000)
    "서버 데이터"
}

suspend fun processData(data: String): String = withContext(Dispatchers.Default) {
    println("데이터 처리: ${Thread.currentThread().name}")
    data.uppercase()
}

fun main() = runBlocking {
    val raw = fetchFromNetwork()
    val processed = processData(raw)
    println("결과: $processed")
}
네트워크 호출: DefaultDispatcher-worker-1
데이터 처리: DefaultDispatcher-worker-1
결과: 서버 데이터

withContext는 지정한 디스패처에서 블록을 실행하고, 끝나면 원래 디스패처로 돌아온다. 새 코루틴을 만드는 게 아니라 현재 코루틴의 실행 컨텍스트만 바꾸는 것이다. launch(Dispatchers.IO)와 달리 결과를 직접 반환하기 때문에 동기 코드처럼 읽힌다.

실무에서 가장 흔한 패턴은 IO에서 데이터를 가져온 뒤 Default나 Main에서 처리하는 구조다.

Flow — 비동기 데이터 스트림

suspend 함수는 값을 하나만 반환한다. 값이 여러 개 순차적으로 나오는 경우 — 실시간 센서 데이터, 주가 업데이트, 검색어 자동완성 — 에는 Flow가 필요하다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numberFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(300)
        emit(i)  // 값을 하나씩 방출
    }
}

fun main() = runBlocking {
    println("Flow 수집 시작")

    numberFlow().collect { value ->
        println("받음: $value")
    }

    println("Flow 수집 끝")
}
Flow 수집 시작
받음: 1
받음: 2
받음: 3
받음: 4
받음: 5
Flow 수집 끝

flow { } 빌더 안에서 emit()으로 값을 내보내고, collect로 값을 받는다. 핵심적인 특성은 콜드(cold) 스트림이라는 점이다. collect가 호출되기 전까지는 아무 코드도 실행되지 않는다.

데이터가 Flow 파이프라인을 통과하는 과정을 그림으로 보면 직관적이다.

flowchart LR
    emit["emit()\n1, 2, 3, …, 10"]
    filter["filter\n{ it % 2 == 0 }"]
    map["map\n{ it * it }"]
    take["take(3)"]
    collect["collect\n{ println(it) }"]

    emit -->|"1,2,3,…,10"| filter
    filter -->|"2,4,6,8,10"| map
    map -->|"4,16,36,64,100"| take
    take -->|"4,16,36"| collect

Flow 연산자

5편에서 컬렉션의 filter, map을 배웠는데, Flow에도 같은 연산자가 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val numbers = (1..10).asFlow()  // List → Flow 변환

    numbers
        .filter { it % 2 == 0 }       // 짝수만
        .map { it * it }              // 제곱
        .take(3)                       // 처음 3개만
        .collect { println(it) }
}
4
16
36

컬렉션 API와 동일한 느낌으로 쓸 수 있지만, 차이점이 있다. Flow 연산자는 중간 리스트를 만들지 않고 값이 하나씩 파이프라인을 통과한다. 대량의 데이터를 처리할 때 메모리 효율이 좋다.

자주 쓰이는 연산자를 좀 더 보자.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // transform — map보다 유연한 변환
    flowOf(1, 2, 3)
        .transform { value ->
            emit("$value 시작")
            emit("$value 끝")
        }
        .collect { println(it) }
    // 1 시작, 1 끝, 2 시작, 2 끝, 3 시작, 3 끝

    println("---")

    // onEach — 사이드 이펙트 (로깅 등)
    flowOf("a", "b", "c")
        .onEach { println("처리 중: $it") }
        .map { it.uppercase() }
        .collect { println("결과: $it") }
}

Flow 컨텍스트와 flowOn

Flow의 방출(emit) 코드와 수집(collect) 코드가 서로 다른 디스패처에서 돌아야 할 때가 있다. 데이터는 IO 스레드에서 가져오고, 수집은 메인 스레드에서 하는 식이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun heavyComputation(): Flow<Int> = flow {
    for (i in 1..3) {
        println("방출 스레드: ${Thread.currentThread().name}")
        delay(100)
        emit(i)
    }
}.flowOn(Dispatchers.Default)  // 방출 코드의 디스패처 지정

fun main() = runBlocking {
    heavyComputation().collect { value ->
        println("수집 스레드: ${Thread.currentThread().name}, 값: $value")
    }
}
방출 스레드: DefaultDispatcher-worker-1
수집 스레드: main, 값: 1
방출 스레드: DefaultDispatcher-worker-1
수집 스레드: main, 값: 2
방출 스레드: DefaultDispatcher-worker-1
수집 스레드: main, 값: 3

flowOn은 그 위쪽(업스트림)의 디스패처를 바꾼다. 수집 쪽의 디스패처에는 영향을 주지 않는다. 이 규칙만 기억하면 된다.

Channel — 코루틴 간 통신

Flow가 단방향 스트림이라면, Channel은 코루틴 사이의 파이프라인이다. 한쪽에서 값을 보내고 다른 쪽에서 받는 구조다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>()

    // 생산자
    launch {
        val items = listOf("사과", "바나나", "딸기")
        for (item in items) {
            println("보내는 중: $item")
            channel.send(item)
            delay(200)
        }
        channel.close()  // 더 이상 보낼 게 없음
    }

    // 소비자
    for (item in channel) {
        println("받음: $item")
    }
    println("채널 닫힘")
}
보내는 중: 사과
받음: 사과
보내는 중: 바나나
받음: 바나나
보내는 중: 딸기
받음: 딸기
채널 닫힘

sendreceive는 모두 suspend 함수다. 채널이 가득 차면 send가 일시 정지하고, 채널이 비어 있으면 receive가 일시 정지한다. 이 배압(backpressure) 메커니즘 덕분에 생산자와 소비자의 속도 차이를 자연스럽게 조절할 수 있다.

채널에는 버퍼 크기를 지정할 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // 버퍼 크기 2 — 2개까지는 send가 바로 리턴
    val buffered = Channel<Int>(capacity = 2)

    launch {
        for (i in 1..5) {
            println("보내기 전: $i")
            buffered.send(i)
            println("보낸 후: $i")
        }
        buffered.close()
    }

    delay(1000)  // 소비자가 느리게 시작
    for (value in buffered) {
        println("받음: $value")
        delay(300)
    }
}

에러 처리

코루틴에서 에러가 발생하면 어떻게 될까? 기본적으로 부모 코루틴까지 전파된다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    // try-catch로 처리
    val deferred = async {
        throw RuntimeException("비동기 에러 발생")
    }

    try {
        deferred.await()
    } catch (e: RuntimeException) {
        println("잡음: ${e.message}")  // 잡음: 비동기 에러 발생
    }
}

async에서 발생한 예외는 await()을 호출할 때 다시 던져진다. 그래서 await() 호출을 try-catch로 감싸면 된다.

CoroutineExceptionHandler

최상위에서 예외를 일괄 처리하고 싶다면 CoroutineExceptionHandler를 쓴다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("예외 처리기: ${exception.message}")
    }

    val scope = CoroutineScope(Dispatchers.Default + handler)

    scope.launch {
        throw IllegalStateException("뭔가 잘못됐다")
    }

    delay(500)  // 예외 처리기가 동작할 시간
    println("메인 계속 진행")
}
예외 처리기: 뭔가 잘못됐다
메인 계속 진행

핸들러는 launch에서 발생한 처리되지 않은 예외를 잡는다. async에는 동작하지 않는데, async의 예외는 await()에서 처리하도록 설계되어 있기 때문이다.

Flow 에러 처리

Flow에서는 catch 연산자를 쓴다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun riskyFlow(): Flow<Int> = flow {
    emit(1)
    emit(2)
    throw RuntimeException("데이터 소스 에러")
    emit(3)  // 실행되지 않음
}

fun main() = runBlocking {
    riskyFlow()
        .catch { e -> println("에러 잡음: ${e.message}") }
        .collect { println("받음: $it") }
}
받음: 1
받음: 2
에러 잡음: 데이터 소스 에러

catch는 그 위쪽(업스트림)에서 발생한 예외만 잡는다. collect 안에서 발생한 예외는 잡지 않으므로, collect 쪽 에러는 별도의 try-catch가 필요하다.

구조화된 동시성

코루틴의 설계 철학 중 가장 중요한 개념이다. 핵심 원칙은 단순하다: 코루틴은 항상 부모-자식 관계를 형성하고, 부모는 모든 자식이 끝나야 끝난다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    try {
        coroutineScope {
            launch {
                delay(500)
                println("작업 1 완료")
            }
            launch {
                delay(200)
                throw RuntimeException("작업 2 실패")
            }
            launch {
                delay(1000)
                println("작업 3 완료")  // 실행되지 않음
            }
        }
    } catch (e: RuntimeException) {
        println("에러: ${e.message}")
    }

    println("계속 진행")
}
에러: 작업 2 실패
계속 진행

작업 2가 실패하면 같은 스코프의 작업 1과 작업 3도 취소된다. 이게 구조화된 동시성의 핵심이다. 자식 하나가 실패하면 형제들도 취소되고, 부모에게 예외가 전파된다.

supervisorScope — 독립적인 자식

자식의 실패가 다른 자식에게 영향을 주지 않게 하고 싶다면 supervisorScope를 쓴다.

import kotlinx.coroutines.*

fun main() = runBlocking {
    supervisorScope {
        val job1 = launch {
            delay(500)
            println("작업 1 완료")
        }
        val job2 = launch {
            delay(200)
            throw RuntimeException("작업 2 실패")
        }
        val job3 = launch {
            delay(1000)
            println("작업 3 완료")
        }
    }
}
작업 1 완료
작업 3 완료

작업 2가 실패해도 작업 1과 3은 정상적으로 완료된다. 여러 독립적인 작업을 동시에 실행할 때 유용하다. 다만 실패한 코루틴의 예외가 무시되는 건 아니다. 로그에는 찍히므로 별도로 처리해야 한다.


코루틴 심화까지 왔다. Flow로 데이터 스트림을 다루고, Channel로 코루틴 간 통신을 하고, 에러를 체계적으로 처리하고, 구조화된 동시성으로 생명주기를 관리한다. 이 도구들을 조합하면 복잡한 비동기 로직도 읽기 좋은 코드로 작성할 수 있다.

다음 편에서는 Kotlin DSL과 고급 함수 기능을 다룬다. 수신 객체 람다, infix, operator overloading 같은 표현력 높은 기능을 살펴보자.

11편: DSL과 고급 함수


Share this post on:

Comments

Loading comments...


Previous Post
Kotlin 11편 — DSL과 고급 함수
Next Post
Kotlin 9편 — 코루틴 기초