Combining flows: merge, zip, and combine
Let's talk about combining two flows into one. There are a few ways to do this. The simplest involves merging the elements from two flows into one. No modifications are made, no matter from which flow elements originate. To do this, we use the top-level merge
function.
import kotlinx.coroutines.flow.*
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
// [1, 0.1, 0.2, 0.3, 2, 3]
// or [1, 0.1, 0.2, 0.3, 2, 3]
// or [0.1, 1, 2, 3, 0.2, 0.3]
// or any other combination
}
It is important to know that when we use merge
the elements from one flow do not wait for another flow. For instance, in the example below, elements from the first flow are delayed, but this does not stop the elements from the second flow.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
.onEach { delay(1000) }
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
together.collect { println(it) }
}
// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3
We use merge
when we have multiple sources of events that should lead to the same actions.
fun listenForMessages() {
merge(userSentMessages, messagesNotifications)
.onEach { displayMessage(it) }
.launchIn(scope)
}
The next function is zip
, which makes pairs from both flows. We also need to specify a function that decides how elements are paired (transformed into one what will be emitted in the new flow). Each element can only be part of one pair, so it needs to wait for its pair. Elements left without a pair are lost, so when the zipping of a flow is complete, the resulting flow is also complete (as is the other flow).
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3
The zip
function reminds me of the polonaise - a traditional Polish dance. One feature of this dance is that a line of pairs is separated down the middle, then these pairs reform when they meet again.
A still from the movie Pan Tadeusz, directed by Andrzej Wajda, presenting the polonaise dance. The last important function when combining two flows is combine
. Just like zip
, it also forms pairs from elements, which have to wait for the slower flow to produce the first pair. However, the similarities to the polonaise dance end here. When we use combine
, every new element replaces its predecessor. If the first pair has been formed already, it will produce a new pair together with the previous element from the other flow.
Notice that zip
needs pairs, so it closes when the first flow closes. combine
does not have such a limitation, so it will emit until both flows are closed.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4
combine
is typically used when we need to actively observe two sources of changes. If you want to have elements emitted whenever a change occurs, you can add initial values to each combined flow (to have the initial pair).
userUpdateFlow.onStart { emit(currentUser) }
A typical use case might be when a view needs to be either of two observable element changes. For example, when a notification badge depends on both the current state of a user and some notifications, we might observe them both and combine their changes to update a view.
userStateFlow
.combine(notificationsFlow) { userState, notifications ->
updateNotificationBadge(userState, notifications)
}
.collect()
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.
Nicola Corti is a Google Developer Expert for Kotlin. He has been working with the language since before version 1.0 and he is the maintainer of several open-source libraries and tools.
He's currently working as Android Infrastructure Engineer at Spotify in Stockholm, Sweden.
Furthermore, he is an active member of the developer community.
His involvement goes from speaking at international conferences about Mobile development to leading communities across Europe (GDG Pisa, KUG Hamburg, GDG Sthlm Android).
In his free time, he also loves baking, photography, and running.