Skip to content
ioob.dev
Go back

Kotlin Part 10 — Coroutines Advanced

· 4 min read
Kotlin Series (10/12)
  1. Kotlin Beginner Part 1 — Variables and Types
  2. Kotlin Beginner Part 2 — Conditionals and Loops
  3. Kotlin Beginner Part 3 — Functions
  4. Kotlin Beginner Part 4 — Classes and Objects
  5. Kotlin Beginner Part 5 — Collections and Lambdas
  6. Kotlin Part 6 — Null Safety Advanced
  7. Kotlin Part 7 — Generics
  8. Kotlin Part 8 — sealed class and enum
  9. Kotlin Part 9 — Coroutines Basics
  10. Kotlin Part 10 — Coroutines Advanced
  11. Kotlin Part 11 — DSL and Advanced Functions
  12. Kotlin Part 12 — Practical Patterns
Table of contents

Table of contents

Continuing from Part 9

In Part 9, we covered the basic coroutine tools: suspend, launch, async, and dispatchers. This part goes one level deeper. We’ll look at Flow for handling data streams, Channel for inter-coroutine communication, and how the system should respond when errors occur.

withContext — Switching Threads

In Part 9, we saw how to pass a dispatcher as an argument to launch. When you need to switch dispatchers within a single coroutine, use withContext.

import kotlinx.coroutines.*

suspend fun fetchFromNetwork(): String = withContext(Dispatchers.IO) {
    println("Network call: ${Thread.currentThread().name}")
    delay(1000)
    "Server data"
}

suspend fun processData(data: String): String = withContext(Dispatchers.Default) {
    println("Data processing: ${Thread.currentThread().name}")
    data.uppercase()
}

fun main() = runBlocking {
    val raw = fetchFromNetwork()
    val processed = processData(raw)
    println("Result: $processed")
}
Network call: DefaultDispatcher-worker-1
Data processing: DefaultDispatcher-worker-1
Result: SERVER DATA

withContext executes the block on the specified dispatcher and returns to the original dispatcher when done. It doesn’t create a new coroutine — it only changes the execution context of the current one. Unlike launch(Dispatchers.IO), it directly returns a result, so it reads like synchronous code.

The most common pattern in practice is fetching data on IO and then processing it on Default or Main.

Flow — Asynchronous Data Streams

A suspend function returns only a single value. For cases where multiple values arrive sequentially — real-time sensor data, stock price updates, search autocomplete — you need Flow.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numberFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(300)
        emit(i)  // Emit values one by one
    }
}

fun main() = runBlocking {
    println("Flow collection started")

    numberFlow().collect { value ->
        println("Received: $value")
    }

    println("Flow collection ended")
}
Flow collection started
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Flow collection ended

The flow { } builder emits values with emit(), and collect receives them. The key characteristic is that it’s a cold stream: no code executes until collect is called.

Visualizing data passing through a Flow pipeline makes it intuitive.

flowchart LR
    emit["emit()\n1, 2, 3, ..., 10"]
    filter["filter\n{ it % 2 == 0 }"]
    map["map\n{ it * it }"]
    take["take(3)"]
    collect["collect\n{ println(it) }"]

    emit -->|"1,2,3,...,10"| filter
    filter -->|"2,4,6,8,10"| map
    map -->|"4,16,36,64,100"| take
    take -->|"4,16,36"| collect

Flow Operators

In Part 5, we learned filter and map for collections — Flow has the same operators.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val numbers = (1..10).asFlow()  // List -> Flow conversion

    numbers
        .filter { it % 2 == 0 }       // Even numbers only
        .map { it * it }              // Square
        .take(3)                       // First 3 only
        .collect { println(it) }
}
4
16
36

They feel the same as collection APIs, but there’s a difference. Flow operators don’t create intermediate lists — values pass through the pipeline one at a time. This makes them memory-efficient when processing large amounts of data.

Here are a few more commonly used operators.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // transform — more flexible than map
    flowOf(1, 2, 3)
        .transform { value ->
            emit("$value start")
            emit("$value end")
        }
        .collect { println(it) }
    // 1 start, 1 end, 2 start, 2 end, 3 start, 3 end

    println("---")

    // onEach — side effects (logging, etc.)
    flowOf("a", "b", "c")
        .onEach { println("Processing: $it") }
        .map { it.uppercase() }
        .collect { println("Result: $it") }
}

Flow Context and flowOn

Sometimes the emit code and collect code of a Flow need to run on different dispatchers. Fetching data on an IO thread and collecting on the main thread, for example.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun heavyComputation(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emit thread: ${Thread.currentThread().name}")
        delay(100)
        emit(i)
    }
}.flowOn(Dispatchers.Default)  // Specify dispatcher for emit code

fun main() = runBlocking {
    heavyComputation().collect { value ->
        println("Collect thread: ${Thread.currentThread().name}, value: $value")
    }
}
Emit thread: DefaultDispatcher-worker-1
Collect thread: main, value: 1
Emit thread: DefaultDispatcher-worker-1
Collect thread: main, value: 2
Emit thread: DefaultDispatcher-worker-1
Collect thread: main, value: 3

flowOn changes the dispatcher of everything above it (upstream). It doesn’t affect the collector’s dispatcher. Just remember this rule.

Channel — Inter-Coroutine Communication

If Flow is a unidirectional stream, Channel is a pipeline between coroutines. One side sends values and the other receives them.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>()

    // Producer
    launch {
        val items = listOf("Apple", "Banana", "Strawberry")
        for (item in items) {
            println("Sending: $item")
            channel.send(item)
            delay(200)
        }
        channel.close()  // Nothing more to send
    }

    // Consumer
    for (item in channel) {
        println("Received: $item")
    }
    println("Channel closed")
}
Sending: Apple
Received: Apple
Sending: Banana
Received: Banana
Sending: Strawberry
Received: Strawberry
Channel closed

Both send and receive are suspend functions. When the channel is full, send suspends; when the channel is empty, receive suspends. This backpressure mechanism naturally handles speed differences between producers and consumers.

You can specify a buffer size for the channel.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // Buffer size 2 — send returns immediately for up to 2 items
    val buffered = Channel<Int>(capacity = 2)

    launch {
        for (i in 1..5) {
            println("Before send: $i")
            buffered.send(i)
            println("After send: $i")
        }
        buffered.close()
    }

    delay(1000)  // Consumer starts slowly
    for (value in buffered) {
        println("Received: $value")
        delay(300)
    }
}

Error Handling

What happens when an error occurs in a coroutine? By default, it propagates up to the parent coroutine.

import kotlinx.coroutines.*

fun main() = runBlocking {
    // Handle with try-catch
    val deferred = async {
        throw RuntimeException("Async error occurred")
    }

    try {
        deferred.await()
    } catch (e: RuntimeException) {
        println("Caught: ${e.message}")  // Caught: Async error occurred
    }
}

Exceptions thrown in async are re-thrown when await() is called. So wrapping the await() call in try-catch handles it.

CoroutineExceptionHandler

If you want to handle exceptions centrally at the top level, use CoroutineExceptionHandler.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Exception handler: ${exception.message}")
    }

    val scope = CoroutineScope(Dispatchers.Default + handler)

    scope.launch {
        throw IllegalStateException("Something went wrong")
    }

    delay(500)  // Give time for the exception handler to run
    println("Main continues")
}
Exception handler: Something went wrong
Main continues

The handler catches unhandled exceptions from launch. It doesn’t work with async, because async exceptions are designed to be handled at await().

Flow Error Handling

In Flow, use the catch operator.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun riskyFlow(): Flow<Int> = flow {
    emit(1)
    emit(2)
    throw RuntimeException("Data source error")
    emit(3)  // Not executed
}

fun main() = runBlocking {
    riskyFlow()
        .catch { e -> println("Error caught: ${e.message}") }
        .collect { println("Received: $it") }
}
Received: 1
Received: 2
Error caught: Data source error

catch only catches exceptions from upstream (above it). It doesn’t catch exceptions from inside collect, so you need a separate try-catch for collector-side errors.

Structured Concurrency

This is the most important concept in coroutine design philosophy. The core principle is simple: coroutines always form parent-child relationships, and a parent doesn’t finish until all its children have finished.

import kotlinx.coroutines.*

fun main() = runBlocking {
    try {
        coroutineScope {
            launch {
                delay(500)
                println("Task 1 complete")
            }
            launch {
                delay(200)
                throw RuntimeException("Task 2 failed")
            }
            launch {
                delay(1000)
                println("Task 3 complete")  // Not executed
            }
        }
    } catch (e: RuntimeException) {
        println("Error: ${e.message}")
    }

    println("Continuing")
}
Error: Task 2 failed
Continuing

When Task 2 fails, Tasks 1 and 3 in the same scope are also cancelled. This is the essence of structured concurrency. If one child fails, siblings are cancelled, and the exception propagates to the parent.

supervisorScope — Independent Children

If you want a child’s failure not to affect other children, use supervisorScope.

import kotlinx.coroutines.*

fun main() = runBlocking {
    supervisorScope {
        val job1 = launch {
            delay(500)
            println("Task 1 complete")
        }
        val job2 = launch {
            delay(200)
            throw RuntimeException("Task 2 failed")
        }
        val job3 = launch {
            delay(1000)
            println("Task 3 complete")
        }
    }
}
Task 1 complete
Task 3 complete

Even though Task 2 fails, Tasks 1 and 3 complete normally. This is useful when running multiple independent tasks concurrently. Note that the failed coroutine’s exception isn’t ignored — it still shows up in logs and should be handled separately.


We’ve reached advanced coroutines. Use Flow for data streams, Channel for inter-coroutine communication, handle errors systematically, and manage lifecycles with structured concurrency. Combining these tools lets you write complex asynchronous logic as readable code.

The next part covers Kotlin DSL and advanced function features. We’ll explore expressive features like lambdas with receivers, infix functions, and operator overloading.

-> Part 11: DSL and Advanced Functions


Related Posts

Share this post on:

Comments

Loading comments...


Previous Post
Kotlin Part 9 — Coroutines Basics
Next Post
Kotlin Part 11 — DSL and Advanced Functions