Operating on Kotlin Flow is great. It is simple, supports many useful functions with name consistent with collection processing, and provides abstractions to nearly everything we need. Though one ugly thing is flatMapMerge, that we often need to use, and it offers a couple of unpleasant surprises.
Introduction: flatMap
Let's start from saying, that there is no flatMap function on Flow (not anymore). It is because Flowis cold by its nature, and it operates on a single coroutine by default, so the most consistent implementation of flatMap should be synchronous. Though, the general expectation is that flatMap on a stream of data should be asynchronous. So we have flatMapConcat that is synchronous, and flatMapMerge that is asynchronous.
This sounds like a good idea, but apparently flatMapMerge implementation is at least surprising in a couple of ways. Here are its biggest problems:
Transformation is synchronous.
Has unintuitive default limit.
Has inconsistent behavior for concurrency 1.
Let's discuss them one by one, but first, let's see its implementation.
flatMapMerge implementation
flatMapMerge implementation is a simple function. It is just using map and then flattenMerge. flattenMerge for concurrency > 1 uses ChannelFlowMerge.
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
// Simplified
internal class ChannelFlowMerge<T>(
private val flow: Flow<Flow<T>>,
private val concurrency: Int,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) {
val semaphore = Semaphore(concurrency)
val collector = SendingCollector(scope)
flow.collect { inner ->
semaphore.acquire()
scope.launch {
try {
inner.collect(collector)
} finally {
semaphore.release()
}
}
}
}
// ...
}
This implementation shows all the problems, but let's see them now in their natural habitat: in real-life use cases.
flatMapMerge starts asynchronous flows, but its transformation is synchronous
Let's say, that you need to fetch a student for each id in a flow. This is a common incorrect implementation:
fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds
.flatMapMerge { flowOf(fetchStudent(it)) }
private suspend fun fetchStudent(id: Int): Student = ...
The problem is, that even though flatMapMerge starts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of flatMapMerge: the first step is map, which is a synchronous operation. That is why the above studentsFlow would fetch students synchronously. Here is an executable example:
There are at least a couple of popular ways how to deal with this issue. We can use flow builder, or a suspend lambda and asFlow, or a custom flowOf function that accepts suspending lambda, nevertheless each of those solutions add some extra complexity.
These become a common pattern that Kotlin Coroutines youngsters learn when they start using this library. People got used to that, but it does not justify unnecessary complexity it introduces.
flatMapMerge has unintuitive default limit
When you use flatMapMerge, you can set concurrency limit, which is a great feature. The problem is that there is a default limit, which is 16. Why 16? Why not 64 or 128? flatMapMerge is a universal function, and I believe that the only reasonable default limit would be Int.MAX_VALUE. 16 is a low number, much lower than the limit of most network clients, and it might easily surprise developers who just wanted to start a couple of coroutines concurrently.
flatMapMerge has inconsistent behavior for concurrency 1
Time for the last problem with flatMapMerge, that I haven't realized until recently (great thanks to the great group from the Commerce Media Tech who helped me realize this). flatMapMerge guarantees that for concurrency == 1 it behaves just like flatMapConcat. But this is inconsistent with how flatMapMerge should behave for concurrency == 1.
You see, flatMapMerge uses one coroutine to transform each element to a flow, and then it starts a new coroutine for each created flow. That means, that for concurrency == 1 it should use two coroutines. One for the transformation, and one for the flow. But it doesn't. It uses one coroutine for both, and the only reason for that is that flatMapConcat behaves like that. Let me show you an example. The below code takes 12 seconds, because it uses only one coroutine. The same coroutine waits for "A", then waits for "A_0", "A_1", "A_2", and then waits for "B", "B_0", "B_1", "B_2", and so on.
If we use flatMapMerge with concurrency == 3, the code takes 6 seconds, because it uses 4 coroutines: 1 to produce "A", "B", "C", and then one of the created flows. The last flow is created after 3 seconds, so then its coroutine is started, and it needs 3 seconds to finish, so the whole process takes 6 seconds.
If there were no special behavior for concurrency == 1, the whole process would take 10 seconds, because it would use two coroutines. One for all transformations, and one for the flow. This is how output would look like:
However, since flatMapMerge was designed to behave like flatMapConcat for concurrency == 1, it uses only one coroutine, and the whole process takes 12 seconds. This is how output looks like:
If you want to introduce a new coroutine for the below and above flow, you can use buffer function (Beware: buffer also has an unintuitive default limit, which is 64). With it, we can simulate how flatMapMerge should behave for concurrency == 1.
If you are not happy with the default behavior of flatMapMerge, you can create your own concurrentMap function. Such a function would be optimized for suspending elements' mapping, and could have no default limit. Such a function would be a safer and more predictable alternative to flatMapMerge.
fun <T, Y> Flow<T>.concurrentMap(
maxConcurrency: Int = Int.MAX_VALUE,
transform: suspend (T) -> Y
): Flow<Y> = this
.flatMapMerge(concurrency = maxConcurrency) {
flow { emit(transform(it)) }
}
// usage
fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds
.concurrentMap { fetchStudent(it) }
Summary
Be careful with flatMapMerge. It is a great function, but it has a couple of unpleasant surprises. Even though it starts coroutines for each flow created by its transformation, the transformation itself is synchronous. It has an unintuitive default limit of 16, and it has inconsistent behavior for concurrency == 1.
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.
Passionate developer with more than a decade of experience, Renato has been focused on Kotlin/JVM for the past few years, especially on being a transformation agent for teams wishing to adopt Kotlin, and utilize its amazing features such as Coroutines to handle complex usecases at scale.
Ties is a software engineer with passion for concepts, sofware engineering fundamentals and helping others. He combines these passions by doing public speaking, volenteer work for organisations like Devoxx4kids and codingcoach and working as a Software Quality Expert at Alliander.