SharedFlow and StateFlow
Flow is typically cold, so its values are calculated on demand. However, there are cases in which we want multiple receivers to be subscribed to one source of changes. This is where we use SharedFlow, which is conceptually similar to a mailing list. We also have StateFlow, which is similar to an observable value. Let's explain them both step by step.
Let's start with MutableSharedFlow
, which is like a broadcast channel: everyone can send (emit) messages which will be received by every coroutine that is listening (collecting).
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// or MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
// (program never ends)
The above program never ends because the coroutineScope
is waiting for the coroutines that were started with launch
and which keep listening on MutableSharedFlow
. Apparently, MutableSharedFlow
is not closable, so the only way to fix this problem is to cancel the whole scope.
MutableSharedFlow
can also keep sending messages. If we set the replay
parameter (it defaults to 0), the defined number of last values will be kept. If a coroutine now starts observing, it will receive these values first. This cache can also be reset with resetReplayCache
.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
MutableSharedFlow
is conceptually similar to RxJava Subjects. When the replay
parameter is set to 0, it is similar to a PublishSubject
. When replay
is 1, it is similar to a BehaviorSubject
. When replay
is Int.MAX_VALUE
, it is similar to ReplaySubject
.
In Kotlin, we like to have a distinction between interfaces that are used to only listen and those that are used to modify. For instance, we've already seen the distinction between SendChannel
, ReceiveChannel
and just Channel
. The same rule applies here. MutableSharedFlow
inherits from both SharedFlow
and FlowCollector
. The former inherits from Flow
and is used to observe, while FlowCollector
is used to emit values.
interface MutableSharedFlow<T> :
SharedFlow<T>, FlowCollector<T> {
fun tryEmit(value: T): Boolean
val subscriptionCount: StateFlow<Int>
fun resetReplayCache()
}
interface SharedFlow<out T> : Flow<T> {
val replayCache: List<T>
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
These interfaces are often used to expose only functions, to emit, or only to collect.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
// (1 sec)
// #1 received Message1
// #2 received Message1
// #1 received Message2
// #2 received Message2
Here is an example of typical usage on Android:
class UserProfileViewModel {
private val _userChanges =
MutableSharedFlow<UserChange>()
val userChanges: SharedFlow<UserChange> = _userChanges
fun onCreate() {
viewModelScope.launch {
userChanges.collect(::applyUserChange)
}
}
fun onNameChanged(newName: String) {
// ...
_userChanges.emit(NameChange(newName))
}
fun onPublicKeyChanged(newPublicKey: String) {
// ...
_userChanges.emit(PublicKeyChange(newPublicKey))
}
}
Flow is often used to observe changes, like user actions, database modifications, or new messages. We already know the different ways in which these events can be processed and handled. We've learned how to merge multiple flows into one. But what if multiple classes are interested in these changes and we would like to turn one flow into multiple flows? The solution is SharedFlow
, and the easiest way to turn a Flow
into a SharedFlow
is by using the shareIn
function.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
// (1 sec)
// #1 A
// (1 sec)
// #1 B
// #2 B
// (1 sec)
// #1 C
// #2 C
// #3 C
The shareIn
function creates a SharedFlow
and sends elements from its Flow
. Since we need to start a coroutine to collect elements on flow, shareIn
expects a coroutine scope as the first argument. The third argument is replay
, which is 0 by default. The second argument is interesting: started
determines when listening for values should start, depending on the number of listeners. The following options are supported:
SharingStarted.Eagerly
- immediately starts listening for values and sending them to a flow. Notice that if you have a limited replay
value and your values appear before you start subscribing, you might lose some values (if your replay is 0, you will lose all such values).
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
print("Done")
}
// (0.1 sec)
// Done
SharingStarted.Lazily
- starts listening when the first subscriber appears. This guarantees that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to get the most recent replay values. The upstream flow continues to be active even when all subscribers disappear, but only the most recent replay values are cached without subscribers.
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
//sampleStart
suspend fun main(): Unit = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D")
.onEach { delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
}
// (0.1 sec)
// #1 A
// #1 B
// #1 C
// (1 sec)
// #2 D
// #1 D
//sampleEnd
WhileSubscribed()
- starts listening on the flow when the first subscriber appears; it stops when the last subscriber disappears. If a new subscriber appears when our SharedFlow
is stopped, it will start again. WhileSubscribed
has additional optional configuration parameters: stopTimeoutMillis
(how long to listen after the last subscriber disappears, 0 by default) and replayExpirationMillis
(how long to keep replay after stopping, Long.MAX_VALUE
by default).
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch {
println("#1 ${sharedFlow.first()}")
}
launch {
println("#2 ${sharedFlow.take(2).toList()}")
}
delay(3000)
launch {
println("#3 ${sharedFlow.first()}")
}
}
// (3 sec)
// Started
// (1 sec)
// #1 A
// (1 sec)
// #2 [A, B]
// Finished
// (1 sec)
// Started
// (1 sec)
// #3 A
// Finished
Using shareIn
is very convenient when multiple services are interested in the same changes. Let's say that you need to observe how stored locations change over time. This is how a DTO (Data Transfer Object) could be implemented on Android using the Room library:
@Dao
interface LocationDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
suspend fun insertLocation(location: Location)
@Query("DELETE FROM location_table")
suspend fun deleteLocations()
@Query("SELECT * FROM location_table ORDER BY time")
fun observeLocations(): Flow<List<Location>>
}
The problem is that if multiple services need to depend on these locations, then it would not be optimal for each of them to observe the database separately. Instead, we could make a service that listens to these changes and shares them into SharedFlow
. This is where we will use shareIn
. But how should we configure it? You need to decide for yourself. Do you want your subscribers to immediately receive the last list of locations? If so, set replay
to 1. If you only want to react to change, set it to 0. How about started
? WhileSubscribed()
sounds best for this use case.
class LocationService(
locationDao: LocationDao,
scope: CoroutineScope
) {
private val locations = locationDao.observeLocations()
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
)
fun observeLocations(): Flow<List<Location>> = locations
}
Beware! Do not create a new SharedFlow for each call. Create one, and store it in a property.
StateFlow is an extension of the SharedFlow concept. It works similarly to SharedFlow when the replay
parameter is set to 1. It always stores one value, which can be accessed using the value
property.
interface StateFlow<out T> : SharedFlow<T> {
val value: T
}
interface MutableStateFlow<T> :
StateFlow<T>, MutableSharedFlow<T> {
override var value: T
fun compareAndSet(expect: T, update: T): Boolean
}
Please note how the value
property is overridden inside MutableStateFlow
. In Kotlin, an open val
property can be overridden with a var
property. val
only allows getting a value (getter), while var
also supports setting a new value (setter).
The initial value needs to be passed to the constructor. We both access and set the value using the value
property. As you can see, MutableStateFlow
is like an observable holder for a value.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect { println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect { println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
On Android, StateFlow is used as a modern alternative to LiveData. First, it has full support for coroutines. Second, it has an initial value, so it does not need to be nullable. So, StateFlow is often used on ViewModels to represent its state. This state is observed, and a view is displayed and updated on this basis.
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
private val _uiState =
MutableStateFlow<NewsState>(LoadingNews)
val uiState: StateFlow<NewsState> = _uiState
fun onCreate() {
scope.launch {
_uiState.value =
NewsLoaded(newsRepository.getNews())
}
}
}
State flow emits new values only when the value changes.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val state = MutableStateFlow("A")
state.onEach { println("Updated to $it") }
.stateIn(this) // Updated to A
state.value = "B" // Updated to B
state.value = "B" // (nothing printed)
state.emit("B") // (nothing printed)
}
StateFlow is also conflated, so slower observers might not receive some intermediate state changes. To receive all events, use SharedFlow.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.*
suspend fun main(): Unit = coroutineScope {
val state = MutableStateFlow('X')
launch {
for (c in 'A'..'E') {
delay(300)
state.value = c
// or state.emit(c)
}
}
state.collect {
delay(1000)
println(it)
}
}
// X
// C
// E
This behavior is by design. StateFlow represents the current state, and we might assume that nobody is interested in repeating or outdated state.
stateIn
is a function that transforms Flow<T>
into StateFlow<T>
. It can only be called with a scope, but it is a suspending function. Remember that StateFlow needs to always have a value; so, if you don't specify it, then you need to wait until the first value is calculated.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B", "C")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect { println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
The second variant of stateIn
is not suspending but it requires an initial value and a started
mode. This mode has the same options as shareIn
(as previously explained).
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main(): Unit = coroutineScope {
val flow = flowOf("A", "B")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty"
)
println(stateFlow.value)
delay(2000)
stateFlow.collect { println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B
We typically use stateIn
when we want to observe a value from one source of changes. On the way, these changes can be processed, and in the end they can be observed by our views.
class LocationsViewModel(
locationService: LocationService
) : ViewModel() {
private val location = locationService.observeLocations()
.map { it.toLocationsDisplay() }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Lazily,
initialValue = LocationsDisplay.Loading,
)
// ...
}
In this chapter, we've learned about SharedFlow
and StateFlow
, both of which are especially important for Android developers as they are commonly used as a part of the MVVM pattern. Remember them and consider using them, especially if you use view models in Android development.