Combining flows: merge, zip, and combine

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub.

Let's talk about combining two flows into one. There are a few ways to do this. The simplest involves merging the elements from two flows into one. No modifications are made, no matter from which flow elements originate. To do this, we use the top-level merge function.

import kotlinx.coroutines.flow.* suspend fun main() { val ints: Flow<Int> = flowOf(1, 2, 3) val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3) val together: Flow<Number> = merge(ints, doubles) print(together.toList()) // [1, 0.1, 0.2, 0.3, 2, 3] // or [1, 0.1, 0.2, 0.3, 2, 3] // or [0.1, 1, 2, 3, 0.2, 0.3] // or any other combination }

It is important to know that when we use merge the elements from one flow do not wait for another flow. For instance, in the example below, elements from the first flow are delayed, but this does not stop the elements from the second flow.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { val ints: Flow<Int> = flowOf(1, 2, 3) .onEach { delay(1000) } val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3) val together: Flow<Number> = merge(ints, doubles) together.collect { println(it) } } // 0.1 // 0.2 // 0.3 // (1 sec) // 1 // (1 sec) // 2 // (1 sec) // 3

We use merge when we have multiple sources of events that should lead to the same actions.

fun listenForMessages() { merge(userSentMessages, messagesNotifications) .onEach { displayMessage(it) } .launchIn(scope) }

The next function is zip, which makes pairs from both flows. We also need to specify a function that decides how elements are paired (transformed into one what will be emitted in the new flow). Each element can only be part of one pair, so it needs to wait for its pair. Elements left without a pair are lost, so when the zipping of a flow is complete, the resulting flow is also complete (as is the other flow).

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { val flow1 = flowOf("A", "B", "C") .onEach { delay(400) } val flow2 = flowOf(1, 2, 3, 4) .onEach { delay(1000) } flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}" } .collect { println(it) } } // (1 sec) // A_1 // (1 sec) // B_2 // (1 sec) // C_3

The zip function reminds me of the polonaise - a traditional Polish dance. One feature of this dance is that a line of pairs is separated down the middle, then these pairs reform when they meet again.

A still from the movie Pan Tadeusz, directed by Andrzej Wajda, presenting the polonaise dance.

The last important function when combining two flows is combine. Just like zip, it also forms pairs from elements, which have to wait for the slower flow to produce the first pair. However, the similarities to the polonaise dance end here. When we use combine, every new element replaces its predecessor. If the first pair has been formed already, it will produce a new pair together with the previous element from the other flow.

Notice that zip needs pairs, so it closes when the first flow closes. combine does not have such a limitation, so it will emit until both flows are closed.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { val flow1 = flowOf("A", "B", "C") .onEach { delay(400) } val flow2 = flowOf(1, 2, 3, 4) .onEach { delay(1000) } flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" } .collect { println(it) } } // (1 sec) // B_1 // (0.2 sec) // C_1 // (0.8 sec) // C_2 // (1 sec) // C_3 // (1 sec) // C_4

combine is typically used when we need to actively observe two sources of changes. If you want to have elements emitted whenever a change occurs, you can add initial values to each combined flow (to have the initial pair).

userUpdateFlow.onStart { emit(currentUser) }

A typical use case might be when a view needs to be either of two observable element changes. For example, when a notification badge depends on both the current state of a user and some notifications, we might observe them both and combine their changes to update a view.