Flow can be imagined as a pipe in which requests for next values flow in one direction, and the corresponding produced values flow in the other direction. When flow is completed or an exception occurs, this information is also propagated and it closes the intermediate steps on the way. So, as they all flow, we can listen for values, exceptions, or other characteristic events (like starting or completing). To do this, we use methods such as onEach, onStart, onCompletion, onEmpty and catch. Let's explain these one by one.
onEach
To react to each flowing value, we use the onEach function.
The onEach lambda expression is suspending, and elements are processed one after another in order (sequentially). So, if we add delay in onEach, we will delay each value as it flows.
The onStart function sets a listener that should be called immediately once the flow is started, i.e., once the terminal operation is called. It is important to note that onStart does not wait for the first element: it is called when we request the first element.
It is good to know that in onStart (as well as in onCompletion, onEmpty and catch) we can emit elements. Such elements will flow downstream from this place.
There are a few ways in which a flow can be completed. The most common one is when the flow builder is done (i.e., the last element has been sent), although this also happens in the case of an uncaught exception or a coroutine cancellation. In all these cases, we can add a listener for flow completion by using the onCompletion method.
In Android, we often use onStart to show a progress bar (the indicator that we are waiting for a network response), and we use onCompletion to hide it.
A flow might complete without emitting any value, which might be an indication of an unexpected event. For such cases, there is the onEmpty function, which invokes the given action when this flow completes without emitting any elements. onEmpty might then be used to emit some default value.
At any point of flow building or processing, an exception might occur. Such an exception will flow down, closing each processing step on the way; however, it can be caught and managed. To do so, we can use the catch method. This listener receives the exception as an argument and allows you to perform recovering operations.
In the example above, notice that onEach does not react to an exception. The same happens with other functions like map, filter etc. Only the onCompletion handler will be called.
The catch method stops an exception by catching it. The previous steps have already been completed, but catch can still emit new values and keep the rest of the flow alive.
import kotlinx.coroutines.flow.*
class MyError : Throwable("My error")
val flow = flow {
emit("Message1")
throw MyError()
}
suspend fun main(): Unit {
flow.catch { emit("Error") }
.collect { println("Collected $it") }
}
// Collected Message1
// Collected Error
The catch will only react to the exceptions thrown in the function defined upstream (you can imagine that the exception needs to be caught as it flows down).
In Android, we often use catch to show exceptions that happened in a flow.
Uncaught exceptions in a flow immediately cancel this flow, and collect rethrows this exception. This behavior is typical of suspending functions, and coroutineScope behaves the same way. Exceptions can be caught outside flow using the classic try-catch block.
Notice that using catch does not protect us from an exception in the terminal operation (because catch cannot be placed after the last operation). So, if there is an exception in the collect, it won't be caught, and an error will be thrown.
import kotlinx.coroutines.flow.*
class MyError : Throwable("My error")
val flow = flow {
emit("Message1")
emit("Message2")
}
suspend fun main(): Unit {
flow.onStart { println("Before") }
.catch { println("Caught $it") }
.collect { throw MyError() }
}
// Before
// Exception in thread "..." MyError: My error
Therefore, it is common practice to move the operation from collect to onEach and place it before the catch. This is specifically useful if we suspect that collect might raise an exception. If we move the operation from collect, we can be sure that catch will catch all exceptions.
import kotlinx.coroutines.flow.*
class MyError : Throwable("My error")
val flow = flow {
emit("Message1")
emit("Message2")
}
suspend fun main(): Unit {
flow.onStart { println("Before") }
.onEach { throw MyError() }
.catch { println("Caught $it") }
.collect()
}
// Before
// Caught MyError: My error
retry
An exception flows through a flow, closing each step one by one. These steps become inactive, so it is not possible to send messages after an exception, but each step gives you a reference to the previous ones, and this reference can be used to start this flow again. Based on this idea, Kotlin offers the retry and retryWhen functions. Here is a simplified implementation of retryWhen:
// Simplified implementation of retryWhen
fun <T> Flow<T>.retryWhen(
predicate: suspend FlowCollector<T>.(
cause: Throwable,
attempt: Long,
) -> Boolean,
): Flow<T> = flow {
var attempt = 0L
do {
val shallRetry = try {
collect { emit(it) }
false
} catch (e: Throwable) {
predicate(e, attempt++)
.also { if (!it) throw e }
}
} while (shallRetry)
}
As you can see, retryWhen has a predicate which is checked whenever an exception occurs from the previous steps of the flow. This predicate decides if an exception should be ignored and previous steps started again, or if it should continue closing this flow. This is a universal retry function. In most cases, we want to specify that we want to retry a specific number of times and/or only when a certain class of exceptions occurs (like a network connection exception). For that, there is another function called retry, which uses retryWhen under the hood.
// Actual implementation of retry
fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (cause: Throwable) -> Boolean = {true}
): Flow<T> {
require(retries > 0) {
"Expected positive amount of retries, but had $retries"
}
return retryWhen { cause, attempt ->
attempt < retries && predicate(cause)
}
}
Let's see a few popular usages of these functions. I often see retries that always retry. For them, a popular reason to define a predicate is to specify some logging and introduce some delay between new connection attempts.
fun makeConnection(config: ConnectionConfig) = api
.startConnection(config)
.retry { e ->
delay(1000)
log.error(e) { "Error for $config" }
true
}
There is another popular practice which involves gradually increasing the delay between subsequent connection attempts. We can also implement a predicate that retries if an exception is or isn't of a certain type.
fun makeConnection(config: ConnectionConfig) = api
.startConnection(config)
.retryWhen { e, attempt ->
delay(100 * attempt)
log.error(e) { "Error for $config" }
e is ApiException && e.code !in 400..499
}
flowOn
Lambda expressions used as arguments for flow operations (like onEach, onStart, onCompletion, etc.) and its builders (like flow or channelFlow) are all suspending in nature. Suspending functions need to have a context and should be in relation to their parent (for structured concurrency). So, you might be wondering where these functions take their context from. The answer is: from the context where collect is called.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun usersFlow(): Flow<String> = flow {
repeat(2) {
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
withContext(CoroutineName("Name1")) {
users.collect { println(it) }
}
withContext(CoroutineName("Name2")) {
users.collect { println(it) }
}
}
// User0 in Name1
// User1 in Name1
// User0 in Name2
// User1 in Name2
How does this code work? The terminal operation call requests elements from upstream, thereby propagating the coroutine context. However, it can also be modified by the flowOn function.
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.withContext
import kotlin.coroutines.coroutineContext
suspend fun present(place: String, message: String) {
val ctx = coroutineContext
val name = ctx[CoroutineName]?.name
println("[$name] $message on $place")
}
fun messagesFlow(): Flow<String> = flow {
present("flow builder", "Message")
emit("Message")
}
suspend fun main() {
val users = messagesFlow()
withContext(CoroutineName("Name1")) {
users
.flowOn(CoroutineName("Name3"))
.onEach { present("onEach", it) }
.flowOn(CoroutineName("Name2"))
.collect { present("collect", it) }
}
}
// [Name3] Message on flow builder
// [Name2] Message on onEach
// [Name1] Message on collect
Remember that flowOn works only for functions that are upstream in the flow.
launchIn
collect is a suspending operation that suspends a coroutine until the flow is completed. It is common to wrap it with a launch builder so that flow processing can start on another coroutine. To help with such cases, there is the launchIn function, which launches collect in a new coroutine on the scope object passed as the only argument.
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job =
scope.launch { collect() }
launchIn is often used to start flow processing in a separate coroutine.
In this chapter, we've learned about different Flow functionalities. Now we know how to do something when our flow starts, when it is closing, or on each element; we also know how to catch exceptions and how to launch a flow in a new coroutine. These are typical tools that are widely used, especially in Android development. For instance, here is how a flow might be used on Android:
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.
Nicola Corti is a Google Developer Expert for Kotlin. He has been working with the language since before version 1.0 and he is the maintainer of several open-source libraries and tools.
He's currently working as Android Infrastructure Engineer at Spotify in Stockholm, Sweden.
Furthermore, he is an active member of the developer community.
His involvement goes from speaking at international conferences about Mobile development to leading communities across Europe (GDG Pisa, KUG Hamburg, GDG Sthlm Android).
In his free time, he also loves baking, photography, and running.
Garima Jain, Google Developer Expert - Android, also known as @ragdroid works as a Principal Android Engineer at GoDaddy Studio. She is an international speaker and an active technical blogger. She enjoys interacting with other people from the community and sharing her thoughts with them. In her leisure time, she loves watching television shows, playing TT, and basketball. Due to her love for fiction and coding, she loves to mix technology with fiction to present her ideas and experiments with others.