SharedFlow and StateFlow
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
Flow is typically cold, so its values are calculated on demand. However, there are cases in which we want multiple receivers to be subscribed to one source of changes. This is where we use SharedFlow, which is conceptually similar to a mailing list. We also have StateFlow, which is similar to an observable value. Let's explain them both step by step.
SharedFlow
Let's start with MutableSharedFlow
, which is like a broadcast channel: everyone can send (emit) messages which will be received by every coroutine that is listening (collecting).
The above program never ends because the
coroutineScope
is waiting for the coroutines that were started withlaunch
and which keep listening onMutableSharedFlow
. Apparently,MutableSharedFlow
is not closable, so the only way to fix this problem is to cancel the whole scope.
MutableSharedFlow
can also keep sending messages. If we set the replay
parameter (it defaults to 0), the defined number of last values will be kept. If a coroutine now starts observing, it will receive these values first. This cache can also be reset with resetReplayCache
.
MutableSharedFlow
is conceptually similar to RxJava Subjects. When thereplay
parameter is set to 0, it is similar to aPublishSubject
. Whenreplay
is 1, it is similar to aBehaviorSubject
. Whenreplay
isInt.MAX_VALUE
, it is similar toReplaySubject
.
In Kotlin, we like to have a distinction between interfaces that are used to only listen and those that are used to modify. For instance, we've already seen the distinction between SendChannel
, ReceiveChannel
and just Channel
. The same rule applies here. MutableSharedFlow
inherits from both SharedFlow
and FlowCollector
. The former inherits from Flow
and is used to observe, while FlowCollector
is used to emit values.
These interfaces are often used to expose only functions, to emit, or only to collect.
Here is an example of typical usage on Android:
shareIn
Flow is often used to observe changes, like user actions, database modifications, or new messages. We already know the different ways in which these events can be processed and handled. We've learned how to merge multiple flows into one. But what if multiple classes are interested in these changes and we would like to turn one flow into multiple flows? The solution is SharedFlow
, and the easiest way to turn a Flow
into a SharedFlow
is by using the shareIn
function.
The shareIn
function creates a SharedFlow
and sends elements from its Flow
. Since we need to start a coroutine to collect elements on flow, shareIn
expects a coroutine scope as the first argument. The third argument is replay
, which is 0 by default. The second argument is interesting: started
determines when listening for values should start, depending on the number of listeners. The following options are supported:
SharingStarted.Eagerly
- immediately starts listening for values and sending them to a flow. Notice that if you have a limitedreplay
value and your values appear before you start subscribing, you might lose some values (if your replay is 0, you will lose all such values).
SharingStarted.Lazily
- starts listening when the first subscriber appears. This guarantees that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to get the most recent replay values. The upstream flow continues to be active even when all subscribers disappear, but only the most recent replay values are cached without subscribers.
WhileSubscribed()
- starts listening on the flow when the first subscriber appears; it stops when the last subscriber disappears. If a new subscriber appears when ourSharedFlow
is stopped, it will start again.WhileSubscribed
has additional optional configuration parameters:stopTimeoutMillis
(how long to listen after the last subscriber disappears, 0 by default) andreplayExpirationMillis
(how long to keep replay after stopping,Long.MAX_VALUE
by default).
- It is also possible to define a custom strategy by implementing the
SharingStarted
interface.
Using shareIn
is very convenient when multiple services are interested in the same changes. Let's say that you need to observe how stored locations change over time. This is how a DTO (Data Transfer Object) could be implemented on Android using the Room library:
The problem is that if multiple services need to depend on these locations, then it would not be optimal for each of them to observe the database separately. Instead, we could make a service that listens to these changes and shares them into SharedFlow
. This is where we will use shareIn
. But how should we configure it? You need to decide for yourself. Do you want your subscribers to immediately receive the last list of locations? If so, set replay
to 1. If you only want to react to change, set it to 0. How about started
? WhileSubscribed()
sounds best for this use case.
Beware! Do not create a new SharedFlow for each call. Create one, and store it in a property.
StateFlow
StateFlow is an extension of the SharedFlow concept. It works similarly to SharedFlow when the replay
parameter is set to 1. It always stores one value, which can be accessed using the value
property.
Please note how the
value
property is overridden insideMutableStateFlow
. In Kotlin, anopen val
property can be overridden with avar
property.val
only allows getting a value (getter), whilevar
also supports setting a new value (setter).
The initial value needs to be passed to the constructor. We both access and set the value using the value
property. As you can see, MutableStateFlow
is like an observable holder for a value.
On Android, StateFlow is used as a modern alternative to LiveData. First, it has full support for coroutines. Second, it has an initial value, so it does not need to be nullable. So, StateFlow is often used on ViewModels to represent its state. This state is observed, and a view is displayed and updated on this basis.
Beware that StateFlow is conflated, so slower observers might not receive some intermediate state changes. To receive all events, use SharedFlow.
This behavior is by design. StateFlow represents the current state, and we might assume that nobody is interested in an outdated state.
stateIn
stateIn
is a function that transforms Flow<T>
into StateFlow<T>
. It can only be called with a scope, but it is a suspending function. Remember that StateFlow needs to always have a value; so, if you don't specify it, then you need to wait until the first value is calculated.
The second variant of stateIn
is not suspending but it requires an initial value and a started
mode. This mode has the same options as shareIn
(as previously explained).
We typically use stateIn
when we want to observe a value from one source of changes. On the way, these changes can be processed, and in the end they can be observed by our views.
Summary
In this chapter, we've learned about SharedFlow
and StateFlow
, both of which are especially important for Android developers as they are commonly used as a part of the MVVM pattern. Remember them and consider using them, especially if you use view models in Android development.