Kotlin Coroutines Recipes
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
In this chapter, we will explore a collection of practical Kotlin Coroutine recipes that can help streamline your development process. These recipes have been tested and refined through use in multiple projects, so you can trust that they'll be a valuable addition to your toolkit.
These recipes, along with their accompanying unit tests, can be found in the following GitHub repository:
To make things even more convenient, the recipes have been published to Maven, allowing you to easily access them in your projects.
Recipe 1: Asynchronous map
An asynchronous map is something we have already discussed as a pattern, but I noticed it’s so repetitive that it’s worth extracting it into a function.
Thanks to the mapAsync
function, we're able to abstract away map
, awaitAll
, and coroutineScope
. This makes implementing asynchronous mapping more straightforward and concise. To implement rate limiting and control the number of concurrent requests, we can utilize a semaphore.
By introducing an optional concurrencyLimit
parameter to the mapAsync
function, we can easily manage the number of concurrent requests, thus ensuring that our application remains responsive and efficient.
Recipe 2: Suspending Lazy Initialization
In Kotlin Coroutines, you may have noticed that sometimes suspending functions can be used in non-suspend lambda expressions, such as a map
. This works because suspending functions can be called on non-suspend lambda expressions if these expressions are inlined, and map
is an inline function. While this limitation is reasonable, it might restrict us from using certain functions we've grown accustomed to.
For me, the most important example is the lazy
delegate, which cannot be used with suspending functions.
To make it possible, we need to implement our own suspendLazy
. However, if a value calculation can be suspended, we would need a suspend
getter, and for that we would need to have a suspend
property. This is not supported in Kotlin, so instead we will make a function that generates a getter function.
First of all, we will use Mutex
to prevent more than one coroutine from calculating the same value at the same time1. Note that Mutex
cannot be substituted with a dispatcher that is limited to a single thread because we don’t want more than one process calculating a value, even if the previous one is suspended. Next, we will set a variable for the calculated value. We will use NOT_SET
as a flag that the value is not initialized yet. Now, our process which produces the value and secures it with a mutex should check whether the value has been calculated yet: if it hasn’t, calculate it using the initializer function and then return the value.
Did you notice that this implementation has a memory leak? Once initializer
has been used, we don’t need to keep its reference, so we can free this lambda expression (and all the values it has captured) by setting initializer
to null
2. If we do this, we can change our condition and initialize the lazy value if the initializer
is still not null
. This is the implementation of suspending lazy I use:
Recipe 3: Reusing connections
I showed you how SharedFlow
can reuse a single flow so its values are emitted to multiple flows. This is a very important optimization, especially when this initial flow requires a persistent HTTP connection (like WebSocket or RSocket) or needs to observe a database. Let’s focus for a moment on the persistent HTTP connection. Its maintenance is a serious cost, so we don’t want to needlessly maintain two connections to receive the same data. This is why we will learn later in this book about transforming a flow into a shared flow in order to reuse a single connection.
This pattern is useful for connections that are not parameterized, but what about those started with specific parameters? For instance, when you implement a messenger application and want to observe particular discussion threads. For such cases, I find the following ConnectionPool
class very useful. When getConnection
is called for the first time, it creates a state flow that makes a connection based on the flow specified in its builder. Notice that ConnectionPool
uses state flows with WhileSubscribed
, so they only keep connections active for as long as they are needed.
Notice that the getConnection
method uses a regular synchronization block. This is because it is a regular function, as all functions that return Flow
should be. This synchronization secures access to the connections
variable. The getConnection
function should execute very quickly because it only defines a flow. A connection will be created when at least a single flow needs it. Notice that, thanks to the fact we’re using WhileSubscribed
, a connection will only be maintained when there is at least a single coroutine using it.
WhileSubscribed
can be parameterized. These parameter values could be injected into ConnectionPool
using a constructor, as in the example below.
Recipe 4: Coroutine races
As I mentioned in the Select chapter, to start a couple of suspending processes and await the result of the one that finishes first, we can use the raceOf
function from the Splitties library. However, I am not a fan of depending on a library just to use a function that can be implemented in a couple of lines. So, this is my implementation of raceOf
:
Recipe 5: Retrying a suspending process
Since we live in the real world, we need to face the fact that unexpected errors might occur. When you request some data from a service, it might be temporarily unavailable, your network connection might be broken, or anything else might happen. One of the ways we handle such situations is by implementing an automatic reply to try again if a process fails.
We’ve already learned that we retry a flow using the retry
or retryWhen
methods.
There is no such function for retrying regular suspending processes, but the simplest solution could just be a loop that retries the process until it succeeds.
The problem is that there is no such thing as a standard reply. When we implement such a mechanism, we often want to include:
- the conditions under which the process should be retried, often based on the number of retries and the exception type,
- increasing delay between retries,
- exception and information logging.
I know two good approaches to implementing retry
. The first involves defining a universal function like retryWhen
that can be easily parametrized on the use side. The following snippet presents my implementation of such a function and includes two important features:
- it never retries cancellation exceptions so as not to harm the cancellation mechanism,
- it adds previous exceptions as suppressed exceptions, so they are displayed when the final exception is thrown out of the function.
The second approach is to implement an application-specific retry
function that is predefined for how we want to retry in this application. Here is an example of how such a function might look:
There is a popular algorithm for retrying that is called the exponential backoff, where after each failed retry, there is a growing backoff delay. It is based on the idea that the longer we wait, the less likely it is that the error will occur again. You can find its implementation in my recipes' repository. Here is how it can be used:
Summary
In this section, I've presented a couple of recipes I use in my projects. I hope they will help you not only with the presented problems but also with implementing your own unique recipes.
Details about Mutex
can be found in the The problem with shared states chapter.
This problem is described in detail in Effective Kotlin, Item 50: Eliminate obsolete object references.