article banner

Flow building

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Each flow needs to start somewhere. There are many ways to do this, depending on what we need. In this chapter, we will focus on the most important options.

Flow from raw values

The simplest way to create a flow is by using the flowOf function, where we just define what values this flow should have(similar to the listOf function for a list).

import kotlinx.coroutines.flow.* suspend fun main() { flowOf(1, 2, 3, 4, 5) .collect { print(it) } // 12345 }

At times, we might also need a flow with no values. For this, we have the emptyFlow() function (similar to the emptyList function for a list).

import kotlinx.coroutines.flow.* suspend fun main() { emptyFlow<Int>() .collect { print(it) } // (nothing) }

Converters

We can also convert every Iterable, Iterator or Sequence into a Flow using the asFlow function.

import kotlinx.coroutines.flow.* suspend fun main() { listOf(1, 2, 3, 4, 5) // or setOf(1, 2, 3, 4, 5) // or sequenceOf(1, 2, 3, 4, 5) .asFlow() .collect { print(it) } // 12345 }

These functions produce a flow of elements that are available immediately. They are useful to start a flow of elements that we can then process using the flow processing functions.

Converting a function to a flow

Flow is frequently used to represent a single value delayed in time (like a Single in RxJava). So, it makes sense to convert a suspending function into a flow. The result of this function will be the only value in this flow. For that, there is the asFlow extension function, which works on function types (both suspend () -> T and () -> T). Here it is used to convert a suspending lambda expression into Flow.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun main() { val function = suspend { // this is suspending lambda expression delay(1000) "UserName" } function.asFlow() .collect { println(it) } } // (1 sec) // UserName

To convert a regular function, we need to reference it first. We do this using :: in Kotlin.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* suspend fun getUserName(): String { delay(1000) return "UserName" } suspend fun main() { ::getUserName .asFlow() .collect { println(it) } } // (1 sec) // UserName

Flow and Reactive Streams

If you use Reactive Streams in your application (like Reactor, RxJava 2.x. or RxJava 3.x), you don't need to make big changes in your code. All objects like Flux, Flowable or Observable implement the Publisher interface, which can be converted to Flow with the asFlow function from the kotlinx-coroutines-reactive library.

suspend fun main(): Unit = coroutineScope { Flux.range(1, 5).asFlow() .collect { print(it) } // 12345 Flowable.range(1, 5).asFlow() .collect { print(it) } // 12345 Observable.range(1, 5).asFlow() .collect { print(it) } // 12345 }

To convert the other way around, you need more specific libraries. With kotlinx-coroutines-reactor, you can convert Flow to Flux. With kotlinx-coroutines-rx3 (or kotlinx-coroutines-rx2), you can convert Flow to Flowable or Observable.

suspend fun main(): Unit = coroutineScope { val flow = flowOf(1, 2, 3, 4, 5) flow.asFlux() .doOnNext { print(it) } // 12345 .subscribe() flow.asFlowable() .subscribe { print(it) } // 12345 flow.asObservable() .subscribe { print(it) } // 12345 }

Flow builders

The most popular way to make a flow is using the flow builder, which we've already used in previous chapters. It behaves similarly to sequence builder for building a sequence, or produce builder for building a channel. We start the builder with the flow function call, and inside its lambda expression we emit the next values using the emit function. We can also use emitAll to emit all the values from Channel or Flow (emitAll(flow) is shorthand for flow.collect { emit(it) }).

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* fun makeFlow(): Flow<Int> = flow { repeat(3) { num -> delay(1000) emit(num) } } suspend fun main() { makeFlow() .collect { println(it) } } // (1 sec) // 0 // (1 sec) // 1 // (1 sec) // 2

This builder has already been used in previous chapters, and it will be used many times in the upcoming ones, so we will see many use cases for it. For now, I will just revisit one from the Sequence builder chapter. Here, the flow builder is used to produce a stream of users that need to be requested page by page from our network API.

fun allUsersFlow( api: UserApi ): Flow<User> = flow { var page = 0 do { val users = api.takePage(page++) // suspending emitAll(users) } while (!users.isNullOrEmpty()) }

Understanding flow builder

The flow builder is the most basic way to create a flow. All other options are based on it.

public fun <T> flowOf(vararg elements: T): Flow<T> = flow { for (element in elements) { emit(element) } }

When we understand how this builder works, we will understand how flow works. flow builder is very simple under the hood: it just creates an object implementing the Flow interface, which just calls the block function inside the collect method1.

fun <T> flow( block: suspend FlowCollector<T>.() -> Unit ): Flow<T> = object : Flow<T>() { override suspend fun collect(collector: FlowCollector<T>){ collector.block() } } interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } fun interface FlowCollector<in T> { suspend fun emit(value: T) }

Knowing this, let's analyze how the following code works:

import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runBlocking fun main(): Unit = runBlocking { flow { // 1 emit("A") emit("B") emit("C") }.collect { value -> // 2 println(value) } } // A // B // C

When we call a flow builder, we just create an object. However, calling collect means calling the block function on the collector interface. The block function in this example is the lambda expression defined at 1. Its receiver is the collector, which is defined at 2 with a lambda expression. When we define a function interface (like FlowCollector) using a lambda expression, the body of this lambda expression will be used as the implementation of the only function expected by this interface, that is emit in this case. So, the body of the emit function is println(value). Thus, when we call collect, we start executing the lambda expression defined at 1, and when it calls emit, it calls the lambda expression defined at 2. This is how flow works. Everything else is built on top of that.

channelFlow

Flow is a cold data stream, so it produces values on demand when they are needed. If you think of the allUsersFlow presented above, the next page of users will be requested when the receiver asks for it. This is desired in some situations. For example, imagine that we are looking for a specific user. If it is in the first page, we don't need to request any more pages. To see this in practice, in the example below we produce the next elements using the flow builder. Notice that the next page is requested lazily when it is needed.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* data class User(val name: String) interface UserApi { suspend fun takePage(pageNumber: Int): List<User> } class FakeUserApi : UserApi { private val users = List(20) { User("User$it") } private val pageSize: Int = 3 override suspend fun takePage( pageNumber: Int ): List<User> { delay(1000) // suspending return users .drop(pageSize * pageNumber) .take(pageSize) } } fun allUsersFlow(api: UserApi): Flow<User> = flow { var page = 0 do { println("Fetching page $page") val users = api.takePage(page++) // suspending emitAll(users.asFlow()) } while (users.isNotEmpty()) } suspend fun main() { val api = FakeUserApi() val users = allUsersFlow(api) val user = users .first { println("Checking $it") delay(1000) // suspending it.name == "User3" } println(user) } // Fetching page 0 // (1 sec) // Checking User(name=User0) // (1 sec) // Checking User(name=User1) // (1 sec) // Checking User(name=User2) // (1 sec) // Fetching page 1 // (1 sec) // Checking User(name=User3) // (1 sec) // User(name=User3)

On the other hand, we might have cases in which we want to fetch pages in advance when we are still processing the elements. Doing this in the presented case could lead to more network calls, but it might also produce a faster result. To achieve this, we would need independent production and consumption. Such independence is typical of hot data streams, like channels. So, we need a hybrid of Channel and Flow. Yes, this is supported: we just need to use the channelFlow function, which is like Flow because it implements the Flow interface. This builder is a regular function, and it is started with a terminal operation (like collect). It is also like a Channel because once it is started, it produces the values in a separate coroutine without waiting for the receiver. Therefore, fetching the next pages and checking users happens concurrently.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* data class User(val name: String) interface UserApi { suspend fun takePage(pageNumber: Int): List<User>? } class FakeUserApi : UserApi { private val users = List(20) { User("User$it") } private val pageSize: Int = 3 override suspend fun takePage( pageNumber: Int ): List<User>? { delay(1000) return users .drop(pageSize * pageNumber) .take(pageSize) } } fun allUsersFlow(api: UserApi): Flow<User> = channelFlow { var page = 0 do { println("Fetching page $page") val users = api.takePage(page++) // suspending users?.forEach { send(it) } } while (!users.isNullOrEmpty()) } suspend fun main() { val api = FakeUserApi() val users = allUsersFlow(api) val user = users .first { println("Checking $it") delay(1000) it.name == "User3" } println(user) } // Fetching page 0 // (1 sec) // Checking User(name=User0) // Fetching page 1 // (1 sec) // Checking User(name=User1) // Fetching page 2 // (1 sec) // Checking User(name=User2) // Fetching page 3 // (1 sec) // Checking User(name=User3) // Fetching page 4 // (1 sec) // User(name=User3)

Inside channelFlow we operate on ProducerScope<T>. ProducerScope is the same type as used by the produce builder. It implements CoroutineScope, so we can use it to start new coroutines with builders. To produce elements, we use send instead of emit. We can also access the channel or control it directly with SendChannel functions.

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> { val channel: SendChannel<E> }

A typical use case for channelFlow is when we need to independently compute values. To support this, channelFlow creates a coroutine scope, so we can directly start coroutine builders like launch. The code below would not work for flow because it does not create the scope needed by coroutine builders.

fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow { launch { collect { send(it) } } other.collect { send(it) } } fun <T> contextualFlow(): Flow<T> = channelFlow { launch(Dispatchers.IO) { send(computeIoValue()) } launch(Dispatchers.Default) { send(computeCpuValue()) } }

Just like all the other coroutines, channelFlow doesn't finish until all its children are in a terminal state.

callbackFlow

Let's say that you need a flow of events you listen for, like user clicks or other kinds of actions. The listening process should be independent from the process of handling these events, so channelFlow would be a good candidate. However, there is a better one: callbackFlow.

For a very long time, there was no difference between channelFlow and callbackFlow. In version 1.3.4, small changes were introduced to make it less error-prone when using callbacks. However, the biggest difference is in how people understand these functions: callbackFlow is for wrapping callbacks.

Inside callbackFlow, we also operate on ProducerScope<T>. Here are a few functions that might be useful for wrapping callbacks:

  • awaitClose { ... } - a function that suspends until the channel is closed. Once it is closed, it invokes its argument. awaitClose is very important for callbackFlow. Take a look at the example below. Without awaitClose, the coroutine will end immediately after registering a callback. This is natural for a coroutine: its body has ended and it has no children to wait for, so it ends. We use awaitClose (even with an empty body) to prevent this, and we listen for elements until the channel is closed in some other way.
  • trySendBlocking(value) - similar to send, but it is blocking instead of suspending, so it can be used on non-suspending functions.
  • close() - ends this channel.
  • cancel(throwable) - ends this channel and sends an exception to the flow.

Here is a typical example of how callbackFlow is used:

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow { val callback = object : Callback { override fun onNextValue(value: T) { trySendBlocking(value) } override fun onApiError(cause: Throwable) { cancel(CancellationException("API Error", cause)) } override fun onCompleted() = channel.close() } api.register(callback) awaitClose { api.unregister(callback) } }

Summary

In this chapter, we've reviewed different ways in which flows can be created. There are many functions for starting a flow, from simple ones like flowOf or emptyFlow, conversion asFlow, up to flow builders. The simplest flow builder is just a flow function, where you can use the emit function to produce the next values. There are also the channelFlow and callbackFlow builders, which create a flow that has some of the characteristics of Channel. Each of these functions has its own use cases, and it's useful to know them in order to leverage the full potential of Flow.

1:

The code below is simplified. In real code, there would be an additional mechanism for releasing a continuation interceptor.