Table of contents
- Continuing from Part 9
- withContext — Switching Threads
- Flow — Asynchronous Data Streams
- Flow Operators
- Flow Context and flowOn
- Channel — Inter-Coroutine Communication
- Error Handling
- Structured Concurrency
Continuing from Part 9
In Part 9, we covered the basic coroutine tools: suspend, launch, async, and dispatchers. This part goes one level deeper. We’ll look at Flow for handling data streams, Channel for inter-coroutine communication, and how the system should respond when errors occur.
withContext — Switching Threads
In Part 9, we saw how to pass a dispatcher as an argument to launch. When you need to switch dispatchers within a single coroutine, use withContext.
import kotlinx.coroutines.*
suspend fun fetchFromNetwork(): String = withContext(Dispatchers.IO) {
println("Network call: ${Thread.currentThread().name}")
delay(1000)
"Server data"
}
suspend fun processData(data: String): String = withContext(Dispatchers.Default) {
println("Data processing: ${Thread.currentThread().name}")
data.uppercase()
}
fun main() = runBlocking {
val raw = fetchFromNetwork()
val processed = processData(raw)
println("Result: $processed")
}
Network call: DefaultDispatcher-worker-1
Data processing: DefaultDispatcher-worker-1
Result: SERVER DATA
withContext executes the block on the specified dispatcher and returns to the original dispatcher when done. It doesn’t create a new coroutine — it only changes the execution context of the current one. Unlike launch(Dispatchers.IO), it directly returns a result, so it reads like synchronous code.
The most common pattern in practice is fetching data on IO and then processing it on Default or Main.
Flow — Asynchronous Data Streams
A suspend function returns only a single value. For cases where multiple values arrive sequentially — real-time sensor data, stock price updates, search autocomplete — you need Flow.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numberFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(300)
emit(i) // Emit values one by one
}
}
fun main() = runBlocking {
println("Flow collection started")
numberFlow().collect { value ->
println("Received: $value")
}
println("Flow collection ended")
}
Flow collection started
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Flow collection ended
The flow { } builder emits values with emit(), and collect receives them. The key characteristic is that it’s a cold stream: no code executes until collect is called.
Visualizing data passing through a Flow pipeline makes it intuitive.
flowchart LR
emit["emit()\n1, 2, 3, ..., 10"]
filter["filter\n{ it % 2 == 0 }"]
map["map\n{ it * it }"]
take["take(3)"]
collect["collect\n{ println(it) }"]
emit -->|"1,2,3,...,10"| filter
filter -->|"2,4,6,8,10"| map
map -->|"4,16,36,64,100"| take
take -->|"4,16,36"| collect
Flow Operators
In Part 5, we learned filter and map for collections — Flow has the same operators.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val numbers = (1..10).asFlow() // List -> Flow conversion
numbers
.filter { it % 2 == 0 } // Even numbers only
.map { it * it } // Square
.take(3) // First 3 only
.collect { println(it) }
}
4
16
36
They feel the same as collection APIs, but there’s a difference. Flow operators don’t create intermediate lists — values pass through the pipeline one at a time. This makes them memory-efficient when processing large amounts of data.
Here are a few more commonly used operators.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// transform — more flexible than map
flowOf(1, 2, 3)
.transform { value ->
emit("$value start")
emit("$value end")
}
.collect { println(it) }
// 1 start, 1 end, 2 start, 2 end, 3 start, 3 end
println("---")
// onEach — side effects (logging, etc.)
flowOf("a", "b", "c")
.onEach { println("Processing: $it") }
.map { it.uppercase() }
.collect { println("Result: $it") }
}
Flow Context and flowOn
Sometimes the emit code and collect code of a Flow need to run on different dispatchers. Fetching data on an IO thread and collecting on the main thread, for example.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun heavyComputation(): Flow<Int> = flow {
for (i in 1..3) {
println("Emit thread: ${Thread.currentThread().name}")
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default) // Specify dispatcher for emit code
fun main() = runBlocking {
heavyComputation().collect { value ->
println("Collect thread: ${Thread.currentThread().name}, value: $value")
}
}
Emit thread: DefaultDispatcher-worker-1
Collect thread: main, value: 1
Emit thread: DefaultDispatcher-worker-1
Collect thread: main, value: 2
Emit thread: DefaultDispatcher-worker-1
Collect thread: main, value: 3
flowOn changes the dispatcher of everything above it (upstream). It doesn’t affect the collector’s dispatcher. Just remember this rule.
Channel — Inter-Coroutine Communication
If Flow is a unidirectional stream, Channel is a pipeline between coroutines. One side sends values and the other receives them.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>()
// Producer
launch {
val items = listOf("Apple", "Banana", "Strawberry")
for (item in items) {
println("Sending: $item")
channel.send(item)
delay(200)
}
channel.close() // Nothing more to send
}
// Consumer
for (item in channel) {
println("Received: $item")
}
println("Channel closed")
}
Sending: Apple
Received: Apple
Sending: Banana
Received: Banana
Sending: Strawberry
Received: Strawberry
Channel closed
Both send and receive are suspend functions. When the channel is full, send suspends; when the channel is empty, receive suspends. This backpressure mechanism naturally handles speed differences between producers and consumers.
You can specify a buffer size for the channel.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// Buffer size 2 — send returns immediately for up to 2 items
val buffered = Channel<Int>(capacity = 2)
launch {
for (i in 1..5) {
println("Before send: $i")
buffered.send(i)
println("After send: $i")
}
buffered.close()
}
delay(1000) // Consumer starts slowly
for (value in buffered) {
println("Received: $value")
delay(300)
}
}
Error Handling
What happens when an error occurs in a coroutine? By default, it propagates up to the parent coroutine.
import kotlinx.coroutines.*
fun main() = runBlocking {
// Handle with try-catch
val deferred = async {
throw RuntimeException("Async error occurred")
}
try {
deferred.await()
} catch (e: RuntimeException) {
println("Caught: ${e.message}") // Caught: Async error occurred
}
}
Exceptions thrown in async are re-thrown when await() is called. So wrapping the await() call in try-catch handles it.
CoroutineExceptionHandler
If you want to handle exceptions centrally at the top level, use CoroutineExceptionHandler.
import kotlinx.coroutines.*
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Exception handler: ${exception.message}")
}
val scope = CoroutineScope(Dispatchers.Default + handler)
scope.launch {
throw IllegalStateException("Something went wrong")
}
delay(500) // Give time for the exception handler to run
println("Main continues")
}
Exception handler: Something went wrong
Main continues
The handler catches unhandled exceptions from launch. It doesn’t work with async, because async exceptions are designed to be handled at await().
Flow Error Handling
In Flow, use the catch operator.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw RuntimeException("Data source error")
emit(3) // Not executed
}
fun main() = runBlocking {
riskyFlow()
.catch { e -> println("Error caught: ${e.message}") }
.collect { println("Received: $it") }
}
Received: 1
Received: 2
Error caught: Data source error
catch only catches exceptions from upstream (above it). It doesn’t catch exceptions from inside collect, so you need a separate try-catch for collector-side errors.
Structured Concurrency
This is the most important concept in coroutine design philosophy. The core principle is simple: coroutines always form parent-child relationships, and a parent doesn’t finish until all its children have finished.
import kotlinx.coroutines.*
fun main() = runBlocking {
try {
coroutineScope {
launch {
delay(500)
println("Task 1 complete")
}
launch {
delay(200)
throw RuntimeException("Task 2 failed")
}
launch {
delay(1000)
println("Task 3 complete") // Not executed
}
}
} catch (e: RuntimeException) {
println("Error: ${e.message}")
}
println("Continuing")
}
Error: Task 2 failed
Continuing
When Task 2 fails, Tasks 1 and 3 in the same scope are also cancelled. This is the essence of structured concurrency. If one child fails, siblings are cancelled, and the exception propagates to the parent.
supervisorScope — Independent Children
If you want a child’s failure not to affect other children, use supervisorScope.
import kotlinx.coroutines.*
fun main() = runBlocking {
supervisorScope {
val job1 = launch {
delay(500)
println("Task 1 complete")
}
val job2 = launch {
delay(200)
throw RuntimeException("Task 2 failed")
}
val job3 = launch {
delay(1000)
println("Task 3 complete")
}
}
}
Task 1 complete
Task 3 complete
Even though Task 2 fails, Tasks 1 and 3 complete normally. This is useful when running multiple independent tasks concurrently. Note that the failed coroutine’s exception isn’t ignored — it still shows up in logs and should be handled separately.
We’ve reached advanced coroutines. Use Flow for data streams, Channel for inter-coroutine communication, handle errors systematically, and manage lifecycles with structured concurrency. Combining these tools lets you write complex asynchronous logic as readable code.
The next part covers Kotlin DSL and advanced function features. We’ll explore expressive features like lambdas with receivers, infix functions, and operator overloading.


Loading comments...