Flow under the hood: how does it really work

Kotlin Coroutines Flow is much simpler concept what most developers think. It is just a definition of what operations to execute. Similar to a suspending lambda expression (with some extra elements).

In this article, I would like to give you a deep understanding of how Flow works. We will start with a very simple concepts, and use them to build our own Flow. Then we will also define map and filter for Flow. This understanding should help you both with using Flow on real-life projects, and with implementing your own Flow processing functions. Have fun :)

Understanding Flow

We will start our story with a simple lambda expression. We define it once, and then we can call it whenever we want.

import kotlin.* fun main() { val f: () -> Unit = { print("A") print("B") print("C") } f() // ABC f() // ABC }

Now we will add a parameter emit of type (String) -> Unit. Since this parameter is a function, we can call it on our lambda expression, but also when we call our lambda expression, we need to define a lambda expression for emit parameter. In our example, when we call f, we start lambda expression at 1, and when in this lambda expression we call emit, we start the lambda expression defined when we called f.

import kotlin.* fun main() { val f: ((String) -> Unit) -> Unit = { emit -> // 1 emit("A") emit("B") emit("C") } f { print(it) } // ABC f { print(it) } // ABC }

Now we need to make a small modification. We will define a functional interface FlowCollector with an abstract method named emit. We will use this interface instead of function type. The trick is, that functional interfaces can be defined with lambda expressions, so f call can stay the same.

import kotlin.* fun interface FlowCollector { fun emit(value: String) } fun main() { val f: (FlowCollector) -> Unit = { it.emit("A") it.emit("B") it.emit("C") } f { print(it) } // ABC f { print(it) } // ABC }

Calling emit on it is not convenient. Instead, we will make FlowCollector a receiver. Thanks to that, inside our lambda expression there is a receiver (this keyword) of type FlowCollector. That means we can call this.emit or just emit. f invocation can still stay the same.

import kotlin.* fun interface FlowCollector { fun emit(value: String) } fun main() { val f: FlowCollector.() -> Unit = { emit("A") emit("B") emit("C") } f { print(it) } // ABC f { print(it) } // ABC }

Instead of passing around lambda expression, we would prefer to have an object implementing some interface. We will call this interface Flow, and wrap our definition with an object expression.

import kotlin.* fun interface FlowCollector { fun emit(value: String) } interface Flow { fun collect(collector: FlowCollector) } fun main() { val builder: FlowCollector.() -> Unit = { emit("A") emit("B") emit("C") } val flow: Flow = object : Flow { override fun collect(collector: FlowCollector) { collector.builder() } } flow.collect { print(it) } // ABC flow.collect { print(it) } // ABC }

This object creation more convenient, let's extract a builder.

import kotlin.* fun interface FlowCollector { fun emit(value: String) } interface Flow { fun collect(collector: FlowCollector) } fun flow(builder: FlowCollector.() -> Unit) = object : Flow { override fun collect(collector: FlowCollector) { collector.builder() } } fun main() { val f: Flow = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } // ABC f.collect { print(it) } // ABC }

The above flow expects elements of type String. Let's make it generic. We will also use suspend modifier to make our functions support features related to coroutines.

import kotlin.* fun interface FlowCollector<T> { suspend fun emit(value: T) } interface Flow<T> { suspend fun collect(collector: FlowCollector<T>) } fun <T> flow(builder: suspend FlowCollector<T>.() -> Unit) = object : Flow<T> { override suspend fun collect(collector: FlowCollector<T>) { collector.builder() } } suspend fun main() { val f: Flow<String> = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } // ABC f.collect { print(it) } // ABC }

That is it! This is nearly exactly how Flow and FlowCollector interfaces, and flow builder are implemented. When you call collect, you invoke your builder lambda expression. When this expression calls emit, it calls the lambda expression defined next to collect.

Presented builder is the most basic way to create a flow. Most of the other functions use it under the hood.

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow { forEach { value -> emit(value) } } public fun <T> Sequence<T>.asFlow(): Flow<T> = flow { forEach { value -> emit(value) } } public fun <T> flowOf(vararg elements: T): Flow<T> = flow { for (element in elements) { emit(element) } }

Beware! channelFlow and SharedFlow start a coroutine, so the way they work is different. Their elements' production can work independently of elements consumption.

How Flow processing works

Flow can be considered as a bit more complicated suspending lambda expression with receiver. Although its power lies in all the functions defined for its creation, processing and observing. Most of them are actually very simple under the hood. Think of the map function, that transforms each element. It creates a new flow, so we will start with flow builder. When this flow is started, we need to start the flow it wraps, so inside the builder, we need to call collect method. Whenever we receive an element, we should transform it and send to the flow we created.

fun <T, R> Flow<T>.map(transformation: suspend (T) -> R): Flow<R> = flow { collect { emit(transformation(it)) } }

filter method is similar, but it emits conditionally, when a predicate returns true.

fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T> = flow { collect { if(predicate(it)) { emit(it) } } }

Most Flow processing functions are similarly defined.

fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = flow { collect { action(it) emit(it) } } // simplified implementation fun <T> Flow<T>.onStart(action: suspend () -> Unit): Flow<T> = flow { action() collect { emit(it) } }

I must say, it is easier to explain it live, as I do on my workshops. However, I hope it gives you a sense of how those functions work.

Conclusion

Flow can be considered as a bit more complicated suspending lambda expression with receiver, and its processing functions are just decorating it with new operations. There is no magic here, the way how Flow and most of its methods are defined is simple and straightforward.

I hope, this article helped you understand how flow works. You can find more in my book Kotlin Coroutines or on my workshop dedicated to that topic.