Introduction to the Flow
A flow represents a stream of values that are computed asynchronously. The Flow
interface itself only allows the flowing elements to be collected, which means handling each element as it reaches the end of the flow (collect
for Flow
is like forEach
for collections).
As you can see, collect
is the only member function in Flow
. All others are defined as extensions. This is similar to Iterable
or Sequence
, both of which only have iterator
as a member function.
Comparing flow to other ways of representing values
The concept of Flow should be well known to those using RxJava or Reactor, but others might need a better explanation. Imagine that you need a function to return more than a single value. When all these values are provided at the same time, we use a collection like List
or Set
.
The essence here is that List
and Set
represent a fully calculated collection. Since the process of calculating these values takes time, we need to wait for all the values before we can have them returned.
If the elements are calculated one by one, we prefer to have the next elements as soon as they appear. One way of doing this is by using Sequence
, which we've already learned about in the Sequence builder chapter.
Sequences are perfect for representing a flow of values calculated on demand when calculating them might be CPU-intensive (like calculating complex results) or blocking (like reading files). However, it is essential to know that sequence terminal operations (like forEach
) are not suspending, so any suspension inside a sequence builder means blocking the thread that waits for the value. This is why, in the scope of a sequence
builder, you cannot use any suspending function except for those called on the SequenceScope
receiver (yield
and yieldAll
).
This mechanism was introduced so sequences are not misused. Even if the above example could compile, it wouldn't be correct anyway because the terminal operation (like forEach
) would be blocking the thread instead of the suspending coroutine, which could lead to unexpected thread-blocking. Consider that someone might want to use a sequence to fetch, in a paginated manner, a list of all the users from an HTTP endpoint until an empty page is received. The problem is, that any consumption of such a sequence would be blocking, because iterator
function in Sequence
is not suspend.
This is not what sequences should be used for. Sequences are prefect for sources of data whose size might be big (or infinite) and elements might be heavy, so we want to calculate or read them on demand, lazily.
I hope you already have some idea that thread blocking can be very dangerous and lead to unexpected situations. To make this crystal clear, take a look at the example below. We use Sequence
, so its forEach
is a blocking operation. This is why a coroutine started on the same thread with launch
will wait, so one coroutine’s execution blocks another’s.
This is a case where we should use Flow
instead of Sequence
. Such an approach fully supports coroutines in its operations. Its builder and operations are suspending functions, and it supports structured concurrency and proper exception handling. We will explain all this in the next chapters, but for now let's see how it helps with this case.
Flow should be used for streams of data that need to use coroutines. For example, it can be used to produce a stream of users that are fetched from an API page by page. Notice that the caller of this function can handle the next pages as they come and decide how many pages will be fetched. For instance, if we call allUsersFlow(api).first()
, we will fetch only the first page; if we call allUsersFlow(api).toList()
, we will fetch all of them; if we call allUsersFlow(api).find { it.id == id }
, we will fetch pages until we find the one we're looking for.
The characteristics of Flow
Flow’s terminal operations (like collect
) suspend a coroutine instead of blocking a thread. They also support other coroutine functionalities, such as respecting the coroutine context and handling exceptions. Flow processing can be cancelled, and structured concurrency is supported out of the box. The flow
builder is not suspending and does not require any scope. It is the terminal operation that is suspending and builds a relation to its parent coroutine (similar to the coroutineScope
function).
The below example shows how CoroutineName
context is passed from collect
to the lambda expression in the flow
builder. It also shows that launch
cancellation also leads to proper flow processing cancellation.
Flow nomenclature
Every flow consists of a few elements:
- Flow needs to start somewhere. It often starts with a flow builder, conversion from a different object, or from some helper function. The most important option will be explained in the next chapter, Flow building.
- The last operation on the flow is called the terminal operation, which is very important as it is often the only one that is suspending or requires a scope. The typical terminal operation is
collect
, either with or without a lambda expression. However, there are also other terminal operations. Some of them will be explained in the Flow processing chapter. - Between the start operation and the terminal operation, we might have intermediate operations, each of which modifies the flow in some way. We will learn about different intermediate operations in the Flow lifecycle and Flow processing chapters.
Real-life use cases
Practice shows that we more often need a flow instead of a channel. If you request a stream of data, you typically want to request it on-demand. If you need to observe something, such as changes in your database or events from UI widgets or sensors, you likely want these events to be received by each observer. You also need to stop listening when no one is observing. This is why, for all these cases, using a flow is preferred over using a channel (although in some cases we will use a hybrid of these two).
The most typical usages of flow include:
- receiving or sending messages that are communicated through Server-Sent Events, such as WebSockets, RSocket, notifications, etc.;
- observing user actions, such as text changes or clicks;
- receiving updates from sensors or other information about a device, such as its location or orientation;
- observing changes in databases.
Here is how we can observe changes in an SQL database using the Room library:
Let's see some examples of how we might use a flow to handle a stream of responses from an API. I will start with the one I recently worked on. Consider trading workstation, like Bloomberg or Scanz, that always shows you the current state of the market. Since market changes constantly, those programs update many times per second. This is a perfect use case for a flow, both on the backend and on the client.
A more everyday example might be a chat, or a client providing realtime suggestions for a search. For example, when we search for the best flight on SkyScanner, some offers arrive quickly, but then more arrive over time; therefore, you see better and better results. This is also a great case for a flow.
In addition to these situations, a flow is also a useful tool for different concurrent processing cases. For example, imagine that you have a list of sellers, for each of which you need to fetch their offers. We've already learned that we can do this using async
inside collection processing:
The above approach is correct in many cases, but it has one downside: if the list of sellers is big, sending so many requests at once would not be good neither for us nor for the server we are requesting from. Sure, this can be limited in the repository with a rate limiter, but we also might want to control it on the use side, for which we might use Flow. In this case, to limit the number of concurrent calls to 20, we can use flatMapMerge
(one of the flow processing functions we will explain in the Flow processing chapter) with concurrency modifier set to 20.
Operating on Flow instead of on a collection gives us much more control over concurrency behavior, contexts, exceptions, and much more. We will discover these functionalities in the next chapters. This is where (in my experience) Flow is most useful. I hope you will see this clearly once we have covered all its different functionalities.
Finally, because they prefer a reactive style of programming, some teams like to use flow instead of suspending functions. Such a style became popular on Android, where RxJava was popular, but now Flow is often treated as a better alternative. In such teams, Flow is often used when only a single value is returned from functions. I prefer just suspending functions in such cases, but both approaches are valid.
As you can see, there are quite a few use cases for flows. In some projects, they will be used commonly, while in others they will be used only from time to time, but I hope you can see that they are useful and worth learning about.
Summary
In this chapter, we've introduced the concept of Flow. It represents a stream of asynchronously computed values that supports coroutines (unlike sequences). There are quite a few use cases where Flow is useful. We will explore them in the next chapters as we learn more about Flow capabilities.