Kotlin Coroutines Recipes
In this chapter, we will explore a collection of practical Kotlin Coroutine recipes that can help streamline your development process. These recipes have been tested and refined through use in multiple projects, so you can trust that they'll be a valuable addition to your toolkit.
These recipes, along with their accompanying unit tests, can be found in the following GitHub repository:
To make things even more convenient, the recipes have been published to Maven, allowing you to easily access them in your projects.
Recipe 1: Asynchronous map
An asynchronous map is something we have already discussed as a pattern, but I noticed it’s so repetitive that it’s worth extracting it into a function.
Thanks to the
mapAsync function, we're able to abstract away
coroutineScope. This makes implementing asynchronous mapping more straightforward and concise. To implement rate limiting and control the number of concurrent requests, we can utilize a semaphore.
By introducing an optional
concurrencyLimit parameter to the
mapAsync function, we can easily manage the number of concurrent requests, thus ensuring that our application remains responsive and efficient.
Recipe 2: Suspending Lazy Initialization
In Kotlin Coroutines, you may have noticed that sometimes suspending functions can be used in non-suspend lambda expressions, such as a
map. This works because suspending functions can be called on non-suspend lambda expressions if these expressions are inlined, and
map is an inline function. While this limitation is reasonable, it might restrict us from using certain functions we've grown accustomed to.
For me, the most important example is the
lazy delegate, which cannot be used with suspending functions.
To make it possible, we need to implement our own
suspendLazy. However, if a value calculation can be suspended, we would need a
suspend getter, and for that we would need to have a
suspend property. This is not supported in Kotlin, so instead we will make a function that generates a getter function.
First of all, we will use
Mutex to prevent more than one coroutine from calculating the same value at the same time1. Note that
Mutex cannot be substituted with a dispatcher that is limited to a single thread because we don’t want more than one process calculating a value, even if the previous one is suspended. Next, we will set a variable for the calculated value. We will use
NOT_SET as a flag that the value is not initialized yet. Now, our process which produces the value and secures it with a mutex should check whether the value has been calculated yet: if it hasn’t, calculate it using the initializer function and then return the value.
Did you notice that this implementation has a memory leak? Once
initializer has been used, we don’t need to keep its reference, so we can free this lambda expression (and all the values it has captured) by setting
null2. If we do this, we can change our condition and initialize the lazy value if the
initializer is still not
null. This is the implementation of suspending lazy I use:
Recipe 3: Reusing connections
I showed you how
SharedFlow can reuse a single flow so its values are emitted to multiple flows. This is a very important optimization, especially when this initial flow requires a persistent HTTP connection (like WebSocket or RSocket) or needs to observe a database. Let’s focus for a moment on the persistent HTTP connection. Its maintenance is a serious cost, so we don’t want to needlessly maintain two connections to receive the same data. This is why we will learn later in this book about transforming a flow into a shared flow in order to reuse a single connection.
This pattern is useful for connections that are not parameterized, but what about those started with specific parameters? For instance, when you implement a messenger application and want to observe particular discussion threads. For such cases, I find the following
ConnectionPool class very useful. When
getConnection is called for the first time, it creates a state flow that makes a connection based on the flow specified in its builder. Notice that
ConnectionPool uses state flows with
WhileSubscribed, so they only keep connections active for as long as they are needed.
Notice that the
getConnection method uses a regular synchronization block. This is because it is a regular function, as all functions that return
Flow should be. This synchronization secures access to the
connections variable. The
getConnection function should execute very quickly because it only defines a flow. A connection will be created when at least a single flow needs it. Notice that, thanks to the fact we’re using
WhileSubscribed, a connection will only be maintained when there is at least a single coroutine using it.
WhileSubscribed can be parameterized. These parameter values could be injected into
ConnectionPool using a constructor, as in the example below.
Recipe 4: Coroutine races
As I mentioned in the Select chapter, to start a couple of suspending processes and await the result of the one that finishes first, we can use the
raceOf function from the Splitties library. However, I am not a fan of depending on a library just to use a function that can be implemented in a couple of lines. So, this is my implementation of
Recipe 5: Retrying a suspending process
Since we live in the real world, we need to face the fact that unexpected errors might occur. When you request some data from a service, it might be temporarily unavailable, your network connection might be broken, or anything else might happen. One of the ways we handle such situations is by implementing an automatic reply to try again if a process fails.
We’ve already learned that we retry a flow using the
There is no such function for retrying regular suspending processes, but the simplest solution could just be a loop that retries the process until it succeeds.
The problem is that there is no such thing as a standard reply. When we implement such a mechanism, we often want to include:
- the conditions under which the process should be retried, often based on the number of retries and the exception type,
- increasing delay between retries,
- exception and information logging.
I know two good approaches to implementing
retry. The first involves defining a universal function like
retryWhen that can be easily parametrized on the use side. The following snippet presents my implementation of such a function and includes two important features:
- it never retries cancellation exceptions so as not to harm the cancellation mechanism,
- it adds previous exceptions as suppressed exceptions, so they are displayed when the final exception is thrown out of the function.
The second approach is to implement an application-specific
retry function that is predefined for how we want to retry in this application. Here is an example of how such a function might look:
There is a popular algorithm for retrying that is called the exponential backoff, where after each failed retry, there is a growing backoff delay. It is based on the idea that the longer we wait, the less likely it is that the error will occur again. You can find its implementation in my recipes' repository. Here is how it can be used:
In this section, I've presented a couple of recipes I use in my projects. I hope they will help you not only with the presented problems but also with implementing your own unique recipes.
Mutex can be found in the The problem with shared states chapter.
This problem is described in detail in Effective Kotlin, Item 50: Eliminate obsolete object references.