article banner

Flow lifecycle operations

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Flow can be imagined as a pipe in which requests for next values flow in one direction, and the corresponding produced values flow in the other direction. When flow is completed or an exception occurs, this information is also propagated and it closes the intermediate steps on the way. So, as they all flow, we can listen for values, exceptions, or other characteristic events (like starting or completing). To do this, we use methods such as onEach, onStart, onCompletion, onEmpty and catch. Let's explain these one by one.

onEach

To react to each flowing value, we use the onEach function.

import kotlinx.coroutines.flow.* suspend fun main() { flowOf(1, 2, 3, 4) .onEach { print(it) } .collect() // 1234 }

The onEach lambda expression is suspending, and elements are processed one after another in order (sequentially). So, if we add delay in onEach, we will delay each value as it flows.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { flowOf(1, 2) .onEach { delay(1000) } .collect { println(it) } } // (1 sec) // 1 // (1 sec) // 2

onStart

The onStart function sets a listener that should be called immediately once the flow is started, i.e., once the terminal operation is called. It is important to note that onStart does not wait for the first element: it is called when we request the first element.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { flowOf(1, 2) .onEach { delay(1000) } .onStart { println("Before") } .collect { println(it) } } // Before // (1 sec) // 1 // (1 sec) // 2

It is good to know that in onStart (as well as in onCompletion, onEmpty and catch) we can emit elements. Such elements will flow downstream from this place.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { flowOf(1, 2) .onEach { delay(1000) } .onStart { emit(0) } .collect { println(it) } } // 0 // (1 sec) // 1 // (1 sec) // 2

onCompletion

There are a few ways in which a flow can be completed. The most common one is when the flow builder is done (i.e., the last element has been sent), although this also happens in the case of an uncaught exception or a coroutine cancellation. In all these cases, we can add a listener for flow completion by using the onCompletion method.

import kotlinx.coroutines.flow.* import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { flowOf(1, 2) .onEach { delay(1000) } .onCompletion { println("Completed") } .collect { println(it) } } // (1 sec) // 1 // (1 sec) // 2 // Completed
import kotlinx.coroutines.flow.* import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { val job = launch { flowOf(1, 2) .onEach { delay(1000) } .onCompletion { println("Completed") } .collect { println(it) } } delay(1100) job.cancel() } // (1 sec) // 1 // (0.1 sec) // Completed

In Android, we often use onStart to show a progress bar (the indicator that we are waiting for a network response), and we use onCompletion to hide it.

fun updateNews() { scope.launch { newsFlow() .onStart { showProgressBar() } .onCompletion { hideProgressBar() } .collect { view.showNews(it) } } }

onEmpty

A flow might complete without emitting any value, which might be an indication of an unexpected event. For such cases, there is the onEmpty function, which invokes the given action when this flow completes without emitting any elements. onEmpty might then be used to emit some default value.

import kotlinx.coroutines.flow.* import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { flow<List<Int>> { delay(1000) } .onEmpty { emit(emptyList()) } .collect { println(it) } } // (1 sec) // []

catch

At any point of flow building or processing, an exception might occur. Such an exception will flow down, closing each processing step on the way; however, it can be caught and managed. To do so, we can use the catch method. This listener receives the exception as an argument and allows you to perform recovering operations.

import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onEach class MyError : Throwable("My error") val flow = flow { emit(1) emit(2) throw MyError() } suspend fun main(): Unit { flow.onEach { println("Got $it") } .catch { println("Caught $it") } .collect { println("Collected $it") } } // Got 1 // Collected 1 // Got 2 // Collected 2 // Caught MyError: My error

In the example above, notice that onEach does not react to an exception. The same happens with other functions like map, filter etc. Only the onCompletion handler will be called.

The catch method stops an exception by catching it. The previous steps have already been completed, but catch can still emit new values and keep the rest of the flow alive.

import kotlinx.coroutines.flow.* class MyError : Throwable("My error") val flow = flow { emit("Message1") throw MyError() } suspend fun main(): Unit { flow.catch { emit("Error") } .collect { println("Collected $it") } } // Collected Message1 // Collected Error

The catch will only react to the exceptions thrown in the function defined upstream (you can imagine that the exception needs to be caught as it flows down).

In Android, we often use catch to show exceptions that happened in a flow.

fun updateNews() { scope.launch { newsFlow() .catch { view.handleError(it) } .onStart { showProgressBar() } .onCompletion { hideProgressBar() } .collect { view.showNews(it) } } }

We could also use catch to emit default data to display on the screen, such as an empty list.

fun updateNews() { scope.launch { newsFlow() .catch { view.handleError(it) emit(emptyList()) } .onStart { showProgressBar() } .onCompletion { hideProgressBar() } .collect { view.showNews(it) } } }

Uncaught exceptions

Uncaught exceptions in a flow immediately cancel this flow, and collect rethrows this exception. This behavior is typical of suspending functions, and coroutineScope behaves the same way. Exceptions can be caught outside flow using the classic try-catch block.

import kotlinx.coroutines.flow.* class MyError : Throwable("My error") val flow = flow { emit("Message1") throw MyError() } suspend fun main(): Unit { try { flow.collect { println("Collected $it") } } catch (e: MyError) { println("Caught") } } // Collected Message1 // Caught

Notice that using catch does not protect us from an exception in the terminal operation (because catch cannot be placed after the last operation). So, if there is an exception in the collect, it won't be caught, and an error will be thrown.

import kotlinx.coroutines.flow.* class MyError : Throwable("My error") val flow = flow { emit("Message1") emit("Message2") } suspend fun main(): Unit { flow.onStart { println("Before") } .catch { println("Caught $it") } .collect { throw MyError() } } // Before // Exception in thread "..." MyError: My error

Therefore, it is common practice to move the operation from collect to onEach and place it before the catch. This is specifically useful if we suspect that collect might raise an exception. If we move the operation from collect, we can be sure that catch will catch all exceptions.

import kotlinx.coroutines.flow.* class MyError : Throwable("My error") val flow = flow { emit("Message1") emit("Message2") } suspend fun main(): Unit { flow.onStart { println("Before") } .onEach { throw MyError() } .catch { println("Caught $it") } .collect() } // Before // Caught MyError: My error

retry

An exception flows through a flow, closing each step one by one. These steps become inactive, so it is not possible to send messages after an exception, but each step gives you a reference to the previous ones, and this reference can be used to start this flow again. Based on this idea, Kotlin offers the retry and retryWhen functions. Here is a simplified implementation of retryWhen:

// Simplified implementation of retryWhen fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.( cause: Throwable, attempt: Long, ) -> Boolean, ): Flow<T> = flow { var attempt = 0L do { val shallRetry = try { collect { emit(it) } false } catch (e: Throwable) { predicate(e, attempt++) .also { if (!it) throw e } } } while (shallRetry) }

As you can see, retryWhen has a predicate which is checked whenever an exception occurs from the previous steps of the flow. This predicate decides if an exception should be ignored and previous steps started again, or if it should continue closing this flow. This is a universal retry function. In most cases, we want to specify that we want to retry a specific number of times and/or only when a certain class of exceptions occurs (like a network connection exception). For that, there is another function called retry, which uses retryWhen under the hood.

// Actual implementation of retry fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, predicate: suspend (cause: Throwable) -> Boolean = {true} ): Flow<T> { require(retries > 0) { "Expected positive amount of retries, but had $retries" } return retryWhen { cause, attempt -> attempt < retries && predicate(cause) } }

This is how retry works:

import kotlinx.coroutines.flow.retry import kotlinx.coroutines.flow.flow suspend fun main() { flow { emit(1) emit(2) error("E") emit(3) }.retry(3) { print(it.message) true }.collect { print(it) } // 12E12E12E12(exception thrown) }

Let's see a few popular usages of these functions. I often see retries that always retry. For them, a popular reason to define a predicate is to specify some logging and introduce some delay between new connection attempts.

fun makeConnection(config: ConnectionConfig) = api .startConnection(config) .retry { e -> delay(1000) log.error(e) { "Error for $config" } true }

There is another popular practice which involves gradually increasing the delay between subsequent connection attempts. We can also implement a predicate that retries if an exception is or isn't of a certain type.

fun makeConnection(config: ConnectionConfig) = api .startConnection(config) .retryWhen { e, attempt -> delay(100 * attempt) log.error(e) { "Error for $config" } e is ApiException && e.code !in 400..499 }

flowOn

Lambda expressions used as arguments for flow operations (like onEach, onStart, onCompletion, etc.) and its builders (like flow or channelFlow) are all suspending in nature. Suspending functions need to have a context and should be in relation to their parent (for structured concurrency). So, you might be wondering where these functions take their context from. The answer is: from the context where collect is called.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun usersFlow(): Flow<String> = flow { repeat(2) { val ctx = currentCoroutineContext() val name = ctx[CoroutineName]?.name emit("User$it in $name") } } suspend fun main() { val users = usersFlow() withContext(CoroutineName("Name1")) { users.collect { println(it) } } withContext(CoroutineName("Name2")) { users.collect { println(it) } } } // User0 in Name1 // User1 in Name1 // User0 in Name2 // User1 in Name2

How does this code work? The terminal operation call requests elements from upstream, thereby propagating the coroutine context. However, it can also be modified by the flowOn function.

import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.flow.* import kotlinx.coroutines.withContext import kotlin.coroutines.coroutineContext suspend fun present(place: String, message: String) { val ctx = coroutineContext val name = ctx[CoroutineName]?.name println("[$name] $message on $place") } fun messagesFlow(): Flow<String> = flow { present("flow builder", "Message") emit("Message") } suspend fun main() { val users = messagesFlow() withContext(CoroutineName("Name1")) { users .flowOn(CoroutineName("Name3")) .onEach { present("onEach", it) } .flowOn(CoroutineName("Name2")) .collect { present("collect", it) } } } // [Name3] Message on flow builder // [Name2] Message on onEach // [Name1] Message on collect

Remember that flowOn works only for functions that are upstream in the flow.

launchIn

collect is a suspending operation that suspends a coroutine until the flow is completed. It is common to wrap it with a launch builder so that flow processing can start on another coroutine. To help with such cases, there is the launchIn function, which launches collect in a new coroutine on the scope object passed as the only argument.

fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch { collect() }

launchIn is often used to start flow processing in a separate coroutine.

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { flowOf("User1", "User2") .onStart { println("Users:") } .onEach { println(it) } .launchIn(this) } // Users: // User1 // User2

Summary

In this chapter, we've learned about different Flow functionalities. Now we know how to do something when our flow starts, when it is closing, or on each element; we also know how to catch exceptions and how to launch a flow in a new coroutine. These are typical tools that are widely used, especially in Android development. For instance, here is how a flow might be used on Android:

fun updateNews() { newsFlow() .onStart { showProgressBar() } .onCompletion { hideProgressBar() } .onEach { view.showNews(it) } .catch { view.handleError(it) } .launchIn(viewModelScope) }