article banner

Introduction to the Flow

A flow represents a lazy process that emits values. The Flow interface itself only allows the flowing elements to be collected, which means handling each element as it reaches the end of the flow (collect for Flow is like forEach for collections).

interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) }

As you can see, collect is the only member function in Flow. All others are defined as extensions. This is similar to Iterable or Sequence, both of which only have iterator as a member function.

interface Iterable<out T> { operator fun iterator(): Iterator<T> } interface Sequence<out T> { operator fun iterator(): Iterator<T> }

Comparing flow to other ways of representing values

The concept of Flow should be well known to those using RxJava or Reactor, but others might need a better explanation. Imagine that you need a function to return more than a single value. When all these values are provided at the same time, we use a collection like List or Set.

fun allUsers(): List<User> = api.getAllUsers().map { it.toUser() }

The essence here is that List and Set represent a fully calculated collection. Since the process of calculating these values takes time, we need to wait for all the values before we can have them returned.

fun getList(): List<Int> = List(3) { Thread.sleep(1000) "User$it" } fun main() { val list = getList() println("Function started") list.forEach { println(it) } } // (3 sec) // Function started // User0 // User1 // User2

If the elements are calculated one by one, we prefer to have the next elements as soon as they appear. One way of doing this is by using Sequence, which we've already learned about in the Sequence builder chapter.

fun getSequence(): Sequence<String> = sequence { repeat(3) { Thread.sleep(1000) yield("User$it") } } fun main() { val list = getSequence() println("Function started") list.forEach { println(it) } } // Function started // (1 sec) // User0 // (1 sec) // User1 // (1 sec) // User2

Sequences are perfect for representing a flow of values calculated on demand when calculating them might be CPU-intensive (like calculating complex results) or blocking (like reading files). However, it is essential to know that sequence terminal operations (like forEach) are not suspending, so any suspension inside a sequence builder means blocking the thread that waits for the value. This is why, in the scope of a sequence builder, you cannot use any suspending function except for those called on the SequenceScope receiver (yield and yieldAll).

fun getSequence(): Sequence<String> = sequence { repeat(3) { delay(1000) // Compilation error yield("User$it") } }

This mechanism was introduced so sequences are not misused. Even if the above example could compile, it wouldn't be correct anyway because the terminal operation (like forEach) would be blocking the thread instead of the suspending coroutine, which could lead to unexpected thread-blocking. Consider that someone might want to use a sequence to fetch, in a paginated manner, a list of all the users from an HTTP endpoint until an empty page is received. The problem is, that any consumption of such a sequence would be blocking, because iterator function in Sequence is not suspend.

// Don't do that, we should use Flow instead of Sequence fun allUsersSequence( api: UserApi ): Sequence<User> = sequence { var page = 0 do { val users = api.takePage(page++) // suspending, // so compilation error yieldAll(users) } while (!users.isNullOrEmpty()) }

This is not what sequences should be used for. Sequences are prefect for sources of data whose size might be big (or infinite) and elements might be heavy, so we want to calculate or read them on demand, lazily.

val fibonacci: Sequence<BigInteger> = sequence { var first = 0.toBigInteger() var second = 1.toBigInteger() while (true) { yield(first) val temp = first first += second second = temp } } fun countCharactersInFile(path: String): Int = File(path).useLines { lines -> lines.sumBy { it.length } }

I hope you already have some idea that thread blocking can be very dangerous and lead to unexpected situations. To make this crystal clear, take a look at the example below. We use Sequence, so its forEach is a blocking operation. This is why a coroutine started on the same thread with launch will wait, so one coroutine’s execution blocks another’s.

import kotlinx.coroutines.* fun getSequence(): Sequence<String> = sequence { repeat(3) { Thread.sleep(1000) // the same result as if there were delay(1000) here yield("User$it") } } suspend fun main() { withContext(newSingleThreadContext("main")) { launch { repeat(3) { delay(100) println("Processing on coroutine") } } val list = getSequence() list.forEach { println(it) } } } // (1 sec) // User0 // (1 sec) // User1 // (1 sec) // User2 // Processing on coroutine // (0.1 sec) // Processing on coroutine // (0.1 sec) // Processing on coroutine

This is a case where we should use Flow instead of Sequence. Such an approach fully supports coroutines in its operations. Its builder and operations are suspending functions, and it supports structured concurrency and proper exception handling. We will explain all this in the next chapters, but for now let's see how it helps with this case.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun getFlow(): Flow<String> = flow { repeat(3) { delay(1000) emit("User$it") } } suspend fun main() { withContext(newSingleThreadContext("main")) { launch { repeat(3) { delay(100) println("Processing on coroutine") } } val list = getFlow() list.collect { println(it) } } } // (0.1 sec) // Processing on coroutine // (0.1 sec) // Processing on coroutine // (0.1 sec) // Processing on coroutine // (1 - 3 * 0.1 = 0.7 sec) // User0 // (1 sec) // User1 // (1 sec) // User2

Flow should be used for streams of data that need to use coroutines. For example, it can be used to produce a stream of users that are fetched from an API page by page. Notice that the caller of this function can handle the next pages as they come and decide how many pages will be fetched. For instance, if we call allUsersFlow(api).first(), we will fetch only the first page; if we call allUsersFlow(api).toList(), we will fetch all of them; if we call allUsersFlow(api).find { it.id == id }, we will fetch pages until we find the one we're looking for.

fun allUsersFlow( api: UserApi ): Flow<User> = flow { var page = 0 do { val users = api.takePage(page++) // suspending emitAll(users) } while (!users.isNullOrEmpty()) }

The characteristics of Flow

Flow’s terminal operations (like collect) suspend a coroutine instead of blocking a thread. They also support other coroutine functionalities, such as respecting the coroutine context and handling exceptions. Flow processing can be cancelled, and structured concurrency is supported out of the box. The flow builder is not suspending and does not require any scope. It is the terminal operation that is suspending and builds a relation to its parent coroutine (similar to the coroutineScope function).

The below example shows how CoroutineName context is passed from collect to the lambda expression in the flow builder. It also shows that launch cancellation also leads to proper flow processing cancellation.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* // Notice, that this function is not suspending // and does not need CoroutineScope fun usersFlow(): Flow<String> = flow { repeat(3) { delay(1000) val ctx = currentCoroutineContext() val name = ctx[CoroutineName]?.name emit("User$it in $name") } } suspend fun main() { val users = usersFlow() withContext(CoroutineName("Name")) { val job = launch { // collect is suspending users.collect { println(it) } } launch { delay(2100) println("I got enough") job.cancel() } } } // (1 sec) // User0 in Name // (1 sec) // User1 in Name // (0.1 sec) // I got enough

Flow nomenclature

Every flow consists of a few elements:

  • Flow needs to start somewhere. It often starts with a flow builder, conversion from a different object, or from some helper function. The most important option will be explained in the next chapter, Flow building.
  • The last operation on the flow is called the terminal operation, which is very important as it is often the only one that is suspending or requires a scope. The typical terminal operation is collect, either with or without a lambda expression. However, there are also other terminal operations. Some of them will be explained in the Flow processing chapter.
  • Between the start operation and the terminal operation, we might have intermediate operations, each of which modifies the flow in some way. We will learn about different intermediate operations in the Flow lifecycle and Flow processing chapters.

Real-life use cases

Practice shows that we more often need a flow instead of a channel. If you request a stream of data, you typically want to request it on-demand. If you need to observe something, such as changes in your database or events from UI widgets or sensors, you likely want these events to be received by each observer. You also need to stop listening when no one is observing. This is why, for all these cases, using a flow is preferred over using a channel (although in some cases we will use a hybrid of these two).

The most typical usages of flow include:

  • receiving or sending messages that are communicated through Server-Sent Events, such as WebSockets, RSocket, notifications, etc.;
  • observing user actions, such as text changes or clicks;
  • receiving updates from sensors or other information about a device, such as its location or orientation;
  • observing changes in databases.

Here is how we can observe changes in an SQL database using the Room library:

@Dao interface MyDao { @Query("SELECT * FROM somedata_table") fun getData(): Flow<List<SomeData>> }

Let's see some examples of how we might use a flow to handle a stream of responses from an API. I will start with the one I recently worked on. Consider trading workstation, like Bloomberg or Scanz, that always shows you the current state of the market. Since market changes constantly, those programs update many times per second. This is a perfect use case for a flow, both on the backend and on the client.

A more everyday example might be a chat, or a client providing realtime suggestions for a search. For example, when we search for the best flight on SkyScanner, some offers arrive quickly, but then more arrive over time; therefore, you see better and better results. This is also a great case for a flow.

On SkyScanner we can see better and better flight search results as airlines respond to your offer request.

In addition to these situations, a flow is also a useful tool for different concurrent processing cases. For example, imagine that you have a list of sellers, for each of which you need to fetch their offers. We've already learned that we can do this using async inside collection processing:

suspend fun getOffers( sellers: List<Seller> ): List<Offer> = coroutineScope { sellers .map { seller -> async { api.requestOffers(seller.id) } } .flatMap { it.await() } }

The above approach is correct in many cases, but it has one downside: if the list of sellers is big, sending so many requests at once would not be good neither for us nor for the server we are requesting from. Sure, this can be limited in the repository with a rate limiter, but we also might want to control it on the use side, for which we might use Flow. In this case, to limit the number of concurrent calls to 20, we can use flatMapMerge (one of the flow processing functions we will explain in the Flow processing chapter) with concurrency modifier set to 20.

suspend fun getOffers( sellers: List<Seller> ): List<Offer> = sellers .asFlow() .flatMapMerge(concurrency = 20) { seller -> suspend { api.requestOffers(seller.id) }.asFlow() } .toList()

Operating on Flow instead of on a collection gives us much more control over concurrency behavior, contexts, exceptions, and much more. We will discover these functionalities in the next chapters. This is where (in my experience) Flow is most useful. I hope you will see this clearly once we have covered all its different functionalities.

Finally, because they prefer a reactive style of programming, some teams like to use flow instead of suspending functions. Such a style became popular on Android, where RxJava was popular, but now Flow is often treated as a better alternative. In such teams, Flow is often used when only a single value is returned from functions. I prefer just suspending functions in such cases, but both approaches are valid.

As you can see, there are quite a few use cases for flows. In some projects, they will be used commonly, while in others they will be used only from time to time, but I hope you can see that they are useful and worth learning about.

Summary

In this chapter, we've introduced the concept of Flow. It represents a stream of asynchronously computed values that supports coroutines (unlike sequences). There are quite a few use cases where Flow is useful. We will explore them in the next chapters as we learn more about Flow capabilities.