Flattening flow: flatMapConcat, flatMapMerge and flatMapLatest
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
Another well-known function for collections is flatMap
. In the case of collections, it is similar to a map, but the transformation function needs to return a collection that is then flattened. For example, if you have a list of departments, each of which has a list of employees, you can use flatMap
to make a list of all employees in all departments.
How should flatMap
look on a flow? It seems intuitive that we might expect its transformation function to return a flow that should then be flattened. The problem is that flow elements can be spread in time. So, should the flow produced from the second element wait for the one produced from the first one, or should it process them concurrently? Since there is no clear answer, there is no flatMap
function for Flow
, but instead there are flatMapConcat
, flatMapMerge
and flatMapLatest
.
The flatMapConcat
function is a sequential option, that do not introduce any asynchrony. Only after the first flow is done, the second one can start. In the following example, we make a flow from the characters "A", "B", and "C". The flow produced by each of them includes these characters and the numbers 1, 2, and 3, with a 1-second delay in between.
The second mentioned function, flatMapMerge
, introduces asynchrony. Each flow is started on a new coroutine, so they can be processed concurrently. The following example is the same as the previous one, but with flatMapMerge
instead of flatMapConcat
.
The number of flows that can be concurrently processed can be set using the concurrency
parameter. The default value of this parameter is 16, but it can be changed in the JVM using the DEFAULT_CONCURRENCY_PROPERTY_NAME
property. Beware of this default limitation because if you use flatMapMerge
on a flow with many elements, only 16 will be processed at the same time.
The typical use of flatMapMerge
is when we need to request data for each element in a flow. For instance, we have a list of categories, and you need to request offers for each of them. You already know that you can do this with the async
function. There are two advantages of using a flow with flatMapMerge
instead:
- we can control the concurrency parameter and decide how many categories we want to fetch at the same time (to avoid sending hundreds of requests at the same time);
- we can return
Flow
and send the next elements as they arrive (so, on the function-use side, they can be handled immediately).
Notice that in the above example I needed to wrap api.requestOffers(it)
with flow
builder. You cannot use flowOf
instead, because then suspension would happen during the creation of the flow, not during its collection. flatMapMerge
introduces asynchrony only for the collection of the flows, not for their creation.
flatMapMerge
has a special behavior for the concurrency parameter set to 1. In this case, it behaves like flatMapConcat
. That is not consistent with the other values of flatMapMerge
, because flatMapConcat
operates on one coroutine only (for producing and collecting its flows), and flatMapMerge
operates on one coroutine for producing and the number of coroutines for collecting constrained by the concurrency parameter (so for concurrently set to 1 it should operate on 2 coroutines, not 1).
The last function is flatMapLatest
. It forgets about the previous flow once a new one appears. With every new value, the previous flow processing is forgotten. So, if there is no delay between "A", "B" and "C", then you will only see "1_C", "2_C", and "3_C".
It gets more interesting when the elements from the initial flow are delayed. What happens in the example below is that (after 1.2 sec) "A" starts its flow, which was created using flowFrom
. This flow produces an element "1_A" in 1 second, but 200 ms later "B" appears and this previous flow is closed and forgotten. "B" flow managed to produce "1_B" when "C" appeared and started producing its flow. This one will finally produce elements "1_C", "2_C", and "3_C", with a 1-second delay in between.