Flow building
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
Each flow needs to start somewhere. There are many ways to do this, depending on what we need. In this chapter, we will focus on the most important options.
Flow from raw values
The simplest way to create a flow is by using the flowOf
function, where we just define what values this flow should have(similar to the listOf
function for a list).
At times, we might also need a flow with no values. For this, we have the emptyFlow()
function (similar to the emptyList
function for a list).
Converters
We can also convert every Iterable
, Iterator
or Sequence
into a Flow
using the asFlow
function.
These functions produce a flow of elements that are available immediately. They are useful to start a flow of elements that we can then process using the flow processing functions.
Converting a function to a flow
Flow is frequently used to represent a single value delayed in time (like a Single
in RxJava). So, it makes sense to convert a suspending function into a flow. The result of this function will be the only value in this flow. For that, there is the asFlow
extension function, which works on function types (both suspend () -> T
and () -> T
). Here it is used to convert a suspending lambda expression into Flow
.
To convert a regular function, we need to reference it first. We do this using ::
in Kotlin.
Flow and Reactive Streams
If you use Reactive Streams in your application (like Reactor, RxJava 2.x. or RxJava 3.x), you don't need to make big changes in your code. All objects like Flux
, Flowable
or Observable
implement the Publisher
interface, which can be converted to Flow
with the asFlow
function from the kotlinx-coroutines-reactive
library.
To convert the other way around, you need more specific libraries. With kotlinx-coroutines-reactor
, you can convert Flow
to Flux
. With kotlinx-coroutines-rx3
(or kotlinx-coroutines-rx2
), you can convert Flow
to Flowable
or Observable
.
Flow builders
The most popular way to make a flow is using the flow
builder, which we've already used in previous chapters. It behaves similarly to sequence
builder for building a sequence, or produce
builder for building a channel. We start the builder with the flow
function call, and inside its lambda expression we emit the next values using the emit
function. We can also use emitAll
to emit all the values from Channel
or Flow
(emitAll(flow)
is shorthand for flow.collect { emit(it) }
).
This builder has already been used in previous chapters, and it will be used many times in the upcoming ones, so we will see many use cases for it. For now, I will just revisit one from the Sequence builder chapter. Here, the flow
builder is used to produce a stream of users that need to be requested page by page from our network API.
Understanding flow builder
The flow builder is the most basic way to create a flow. All other options are based on it.
When we understand how this builder works, we will understand how flow works. flow
builder is very simple under the hood: it just creates an object implementing the Flow
interface, which just calls the block
function inside the collect
method1.
Knowing this, let's analyze how the following code works:
When we call a flow
builder, we just create an object. However, calling collect
means calling the block
function on the collector
interface. The block
function in this example is the lambda expression defined at 1. Its receiver is the collector
, which is defined at 2 with a lambda expression. When we define a function interface (like FlowCollector
) using a lambda expression, the body of this lambda expression will be used as the implementation of the only function expected by this interface, that is emit
in this case. So, the body of the emit
function is println(value)
. Thus, when we call collect
, we start executing the lambda expression defined at 1, and when it calls emit
, it calls the lambda expression defined at 2. This is how flow works. Everything else is built on top of that.
channelFlow
Flow
is a cold data stream, so it produces values on demand when they are needed. If you think of the allUsersFlow
presented above, the next page of users will be requested when the receiver asks for it. This is desired in some situations. For example, imagine that we are looking for a specific user. If it is in the first page, we don't need to request any more pages. To see this in practice, in the example below we produce the next elements using the flow
builder. Notice that the next page is requested lazily when it is needed.
On the other hand, we might have cases in which we want to fetch pages in advance when we are still processing the elements. Doing this in the presented case could lead to more network calls, but it might also produce a faster result. To achieve this, we would need independent production and consumption. Such independence is typical of hot data streams, like channels. So, we need a hybrid of Channel and Flow. Yes, this is supported: we just need to use the channelFlow
function, which is like Flow because it implements the Flow
interface. This builder is a regular function, and it is started with a terminal operation (like collect
). It is also like a Channel because once it is started, it produces the values in a separate coroutine without waiting for the receiver. Therefore, fetching the next pages and checking users happens concurrently.
Inside channelFlow
we operate on ProducerScope<T>
. ProducerScope
is the same type as used by the produce
builder. It implements CoroutineScope
, so we can use it to start new coroutines with builders. To produce elements, we use send
instead of emit
. We can also access the channel or control it directly with SendChannel
functions.
A typical use case for channelFlow
is when we need to independently compute values. To support this, channelFlow
creates a coroutine scope, so we can directly start coroutine builders like launch
. The code below would not work for flow
because it does not create the scope needed by coroutine builders.
Just like all the other coroutines, channelFlow
doesn't finish until all its children are in a terminal state.
callbackFlow
Let's say that you need a flow of events you listen for, like user clicks or other kinds of actions. The listening process should be independent from the process of handling these events, so channelFlow
would be a good candidate. However, there is a better one: callbackFlow
.
For a very long time, there was no difference between channelFlow
and callbackFlow
. In version 1.3.4, small changes were introduced to make it less error-prone when using callbacks. However, the biggest difference is in how people understand these functions: callbackFlow
is for wrapping callbacks.
Inside callbackFlow
, we also operate on ProducerScope<T>
. Here are a few functions that might be useful for wrapping callbacks:
awaitClose { ... }
- a function that suspends until the channel is closed. Once it is closed, it invokes its argument.awaitClose
is very important forcallbackFlow
. Take a look at the example below. WithoutawaitClose
, the coroutine will end immediately after registering a callback. This is natural for a coroutine: its body has ended and it has no children to wait for, so it ends. We useawaitClose
(even with an empty body) to prevent this, and we listen for elements until the channel is closed in some other way.trySendBlocking(value)
- similar tosend
, but it is blocking instead of suspending, so it can be used on non-suspending functions.close()
- ends this channel.cancel(throwable)
- ends this channel and sends an exception to the flow.
Here is a typical example of how callbackFlow
is used:
Summary
In this chapter, we've reviewed different ways in which flows can be created. There are many functions for starting a flow, from simple ones like flowOf
or emptyFlow
, conversion asFlow
, up to flow builders. The simplest flow builder is just a flow
function, where you can use the emit
function to produce the next values. There are also the channelFlow
and callbackFlow
builders, which create a flow that has some of the characteristics of Channel. Each of these functions has its own use cases, and it's useful to know them in order to leverage the full potential of Flow.
The code below is simplified. In real code, there would be an additional mechanism for releasing a continuation interceptor.