article banner (priority)

Collecting values on flow: fold and scan

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub.

If you use collection processing functions, you might recognize fold. It is used to combine all the values in this collection into one by applying an operation that combines two values into one for each element (starting from the initial value).

For example, if the initial value is 0 and the operation is addition, then the result is the sum of all the numbers: we first take the initial value 0; then, we add the first element 1 to it; to the result 1, we add the second number 2; to the result 3, we add the third number 3; to the result 6, we add the last number 4. The result of this operation, 10, is what will be returned from fold.

fun main() { val list = listOf(1, 2, 3, 4) val res = list.fold(0) { acc, i -> acc + i } println(res) // 10 val res2 = list.fold(1) { acc, i -> acc * i } println(res2) // 24 }

fold is a terminal operation. It can also be used for Flow, but it will suspend until this flow is completed (just like collect).

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { val list = flowOf(1, 2, 3, 4) .onEach { delay(1000) } val res = list.fold(0) { acc, i -> acc + i } println(res) } // (4 sec) // 10

There is an alternative to fold called scan. It is an intermediate operation that produces all intermediate accumulator values.

fun main() { val list = listOf(1, 2, 3, 4) val res = list.scan(0) { acc, i -> acc + i } println(res) // [0, 1, 3, 6, 10] }

scan is useful withFlow because it produces a new value immediately after receiving one from the previous step.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.* suspend fun main() { flowOf(1, 2, 3, 4) .onEach { delay(1000) } .scan(0) { acc, v -> acc + v } .collect { println(it) } } // 0 // (1 sec) // 1 // (1 sec) // 3 // (1 sec) // 6 // (1 sec) // 10

We can implement scan easily using the flow builder and collect. We first emit the initial value, then with each new element we emit the result of the next value accumulation.

fun <T, R> Flow<T>.scan( initial: R, operation: suspend (accumulator: R, value: T) -> R ): Flow<R> = flow { var accumulator: R = initial emit(accumulator) collect { value -> accumulator = operation(accumulator, value) emit(accumulator) } }

The typical use case for scan is when we have a flow of updates or changes, and we need an object that is the result of these changes.

val userStateFlow: Flow<User> = userChangesFlow .scan(user) { user, change -> user.withChange(change) } val messagesListFlow: Flow<List<Message>> = messagesFlow .scan(messages) { acc, message -> acc + message }