Flow lifecycle operations
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
Flow can be imagined as a pipe in which requests for next values flow in one direction, and the corresponding produced values flow in the other direction. When flow is completed or an exception occurs, this information is also propagated and it closes the intermediate steps on the way. So, as they all flow, we can listen for values, exceptions, or other characteristic events (like starting or completing). To do this, we use methods such as onEach
, onStart
, onCompletion
, onEmpty
and catch
. Let's explain these one by one.
onEach
To react to each flowing value, we use the onEach
function.
The onEach
lambda expression is suspending, and elements are processed one after another in order (sequentially). So, if we add delay
in onEach
, we will delay each value as it flows.
onStart
The onStart
function sets a listener that should be called immediately once the flow is started, i.e., once the terminal operation is called. It is important to note that onStart
does not wait for the first element: it is called when we request the first element.
It is good to know that in onStart
(as well as in onCompletion
, onEmpty
and catch
) we can emit elements. Such elements will flow downstream from this place.
onCompletion
There are a few ways in which a flow can be completed. The most common one is when the flow builder is done (i.e., the last element has been sent), although this also happens in the case of an uncaught exception or a coroutine cancellation. In all these cases, we can add a listener for flow completion by using the onCompletion
method.
In Android, we often use onStart
to show a progress bar (the indicator that we are waiting for a network response), and we use onCompletion
to hide it.
onEmpty
A flow might complete without emitting any value, which might be an indication of an unexpected event. For such cases, there is the onEmpty
function, which invokes the given action when this flow completes without emitting any elements. onEmpty
might then be used to emit some default value.
catch
At any point of flow building or processing, an exception might occur. Such an exception will flow down, closing each processing step on the way; however, it can be caught and managed. To do so, we can use the catch
method. This listener receives the exception as an argument and allows you to perform recovering operations.
In the example above, notice that
onEach
does not react to an exception. The same happens with other functions likemap
,filter
etc. Only theonCompletion
handler will be called.
The catch
method stops an exception by catching it. The previous steps have already been completed, but catch
can still emit new values and keep the rest of the flow alive.
The catch
will only react to the exceptions thrown in the function defined upstream (you can imagine that the exception needs to be caught as it flows down).
In Android, we often use catch
to show exceptions that happened in a flow.
We could also use catch
to emit default data to display on the screen, such as an empty list.
Uncaught exceptions
Uncaught exceptions in a flow immediately cancel this flow, and collect
rethrows this exception. This behavior is typical of suspending functions, and coroutineScope
behaves the same way. Exceptions can be caught outside flow using the classic try-catch block.
Notice that using catch
does not protect us from an exception in the terminal operation (because catch
cannot be placed after the last operation). So, if there is an exception in the collect
, it won't be caught, and an error will be thrown.
Therefore, it is common practice to move the operation from collect
to onEach
and place it before the catch
. This is specifically useful if we suspect that collect
might raise an exception. If we move the operation from collect
, we can be sure that catch
will catch all exceptions.
retry
An exception flows through a flow, closing each step one by one. These steps become inactive, so it is not possible to send messages after an exception, but each step gives you a reference to the previous ones, and this reference can be used to start this flow again. Based on this idea, Kotlin offers the retry
and retryWhen
functions. Here is a simplified implementation of retryWhen
:
As you can see, retryWhen
has a predicate which is checked whenever an exception occurs from the previous steps of the flow. This predicate decides if an exception should be ignored and previous steps started again, or if it should continue closing this flow. This is a universal retry function. In most cases, we want to specify that we want to retry a specific number of times and/or only when a certain class of exceptions occurs (like a network connection exception). For that, there is another function called retry
, which uses retryWhen
under the hood.
This is how retry
works:
Let's see a few popular usages of these functions. I often see retries that always retry. For them, a popular reason to define a predicate is to specify some logging and introduce some delay between new connection attempts.
There is another popular practice which involves gradually increasing the delay between subsequent connection attempts. We can also implement a predicate that retries if an exception is or isn't of a certain type.
flowOn
Lambda expressions used as arguments for flow operations (like onEach
, onStart
, onCompletion
, etc.) and its builders (like flow
or channelFlow
) are all suspending in nature. Suspending functions need to have a context and should be in relation to their parent (for structured concurrency). So, you might be wondering where these functions take their context from. The answer is: from the context where collect
is called.
How does this code work? The terminal operation call requests elements from upstream, thereby propagating the coroutine context. However, it can also be modified by the flowOn
function.
Remember that flowOn
works only for functions that are upstream in the flow.
launchIn
collect
is a suspending operation that suspends a coroutine until the flow is completed. It is common to wrap it with a launch
builder so that flow processing can start on another coroutine. To help with such cases, there is the launchIn
function, which launches collect
in a new coroutine on the scope object passed as the only argument.
launchIn
is often used to start flow processing in a separate coroutine.
Summary
In this chapter, we've learned about different Flow functionalities. Now we know how to do something when our flow starts, when it is closing, or on each element; we also know how to catch exceptions and how to launch a flow in a new coroutine. These are typical tools that are widely used, especially in Android development. For instance, here is how a flow might be used on Android: