article banner (priority)

Kotlin Coroutines Recipes

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

In this chapter, we will explore a collection of practical Kotlin Coroutine recipes that can help streamline your development process. These recipes have been tested and refined through use in multiple projects, so you can trust that they'll be a valuable addition to your toolkit.

These recipes, along with their accompanying unit tests, can be found in the following GitHub repository:

To make things even more convenient, the recipes have been published to Maven, allowing you to easily access them in your projects.

Recipe 1: Asynchronous map

An asynchronous map is something we have already discussed as a pattern, but I noticed it’s so repetitive that it’s worth extracting it into a function.

suspend fun <T, R> List<T>.mapAsync( transformation: suspend (T) -> R ): List<R> = coroutineScope { this@mapAsync.map { async { transformation(it) } } .awaitAll() } // Practical example use suspend fun getBestStudent( semester: String, repo: StudentsRepository ): Student = repo.getStudentIds(semester) .mapAsync { repo.getStudent(it) } .maxBy { it.result } // Practical example use suspend fun getCourses(user: User): List<UserCourse> = courseRepository.getAllCourses() .mapAsync { composeUserCourse(user, it) } .filterNot { courseShouldBeHidden(user, it) } .sortedBy { it.state.ordinal }

Thanks to the mapAsync function, we're able to abstract away map, awaitAll, and coroutineScope. This makes implementing asynchronous mapping more straightforward and concise. To implement rate limiting and control the number of concurrent requests, we can utilize a semaphore.

suspend fun <T, R> List<T>.mapAsync( concurrencyLimit: Int = Int.MAX_VALUE, transformation: suspend (T) -> R ): List<R> = coroutineScope { val semaphore = Semaphore(concurrencyLimit) this@mapAsync.map { async { semaphore.withPermit { transformation(it) } } }.awaitAll() }

By introducing an optional concurrencyLimit parameter to the mapAsync function, we can easily manage the number of concurrent requests, thus ensuring that our application remains responsive and efficient.

Recipe 2: Suspending Lazy Initialization

In Kotlin Coroutines, you may have noticed that sometimes suspending functions can be used in non-suspend lambda expressions, such as a map. This works because suspending functions can be called on non-suspend lambda expressions if these expressions are inlined, and map is an inline function. While this limitation is reasonable, it might restrict us from using certain functions we've grown accustomed to.

// Works because map is an inline function, // so we can call suspend await in its lambda, // even though this lambda itself is not suspending. suspend fun getOffers( categories: List<Category> ): List<Offer> = coroutineScope { categories .map { async { api.requestOffers(it) } } .map { it.await() } // Prefer awaitAll .flatten() }

For me, the most important example is the lazy delegate, which cannot be used with suspending functions.

suspend fun makeConnection(): Connection = TODO() val connection by lazy { makeConnection() } // COMPILER ERROR

To make it possible, we need to implement our own suspendLazy. However, if a value calculation can be suspended, we would need a suspend getter, and for that we would need to have a suspend property. This is not supported in Kotlin, so instead we will make a function that generates a getter function.

fun <T> suspendLazy( initializer: suspend () -> T ): suspend () -> T { TODO() }

First of all, we will use Mutex to prevent more than one coroutine from calculating the same value at the same time1. Note that Mutex cannot be substituted with a dispatcher that is limited to a single thread because we don’t want more than one process calculating a value, even if the previous one is suspended. Next, we will set a variable for the calculated value. We will use NOT_SET as a flag that the value is not initialized yet. Now, our process which produces the value and secures it with a mutex should check whether the value has been calculated yet: if it hasn’t, calculate it using the initializer function and then return the value.

private val NOT_SET = Any() fun <T> suspendLazy( initializer: suspend () -> T ): suspend () -> T { val mutex = Mutex() var holder: Any? = NOT_SET return { if (holder !== NOT_SET) holder as T else mutex.withLock { if (holder === NOT_SET) holder = initializer() holder as T } } }

Did you notice that this implementation has a memory leak? Once initializer has been used, we don’t need to keep its reference, so we can free this lambda expression (and all the values it has captured) by setting initializer to null2. If we do this, we can change our condition and initialize the lazy value if the initializer is still not null. This is the implementation of suspending lazy I use:

import kotlinx.coroutines.* import kotlinx.coroutines.sync.* fun <T> suspendLazy( initializer: suspend () -> T ): suspend () -> T { var initializer: (suspend () -> T)? = initializer val mutex = Mutex() var holder: Any? = Any() return { if (initializer == null) holder as T else mutex.withLock { initializer?.let { holder = it() initializer = null } holder as T } } } // Example suspend fun makeConnection(): String { println("Creating connection") delay(1000) return "Connection" } val getConnection = suspendLazy { makeConnection() } suspend fun main() { println(getConnection()) println(getConnection()) println(getConnection()) } // Creating connection // (1 sec) // (1 sec) // Connection // Connection // Connection
// Practical example use val userData: suspend () -> UserData = suspendLazy { service.fetchUserData() } suspend fun getUserData(): UserData = userData()

Recipe 3: Reusing connections

I showed you how SharedFlow can reuse a single flow so its values are emitted to multiple flows. This is a very important optimization, especially when this initial flow requires a persistent HTTP connection (like WebSocket or RSocket) or needs to observe a database. Let’s focus for a moment on the persistent HTTP connection. Its maintenance is a serious cost, so we don’t want to needlessly maintain two connections to receive the same data. This is why we will learn later in this book about transforming a flow into a shared flow in order to reuse a single connection.

class LocationService( locationDao: LocationDao, scope: CoroutineScope ) { private val locations = locationDao.observeLocations() .shareIn( scope = scope, started = SharingStarted.WhileSubscribed(), ) fun observeLocations(): Flow<List<Location>> = locations }

This pattern is useful for connections that are not parameterized, but what about those started with specific parameters? For instance, when you implement a messenger application and want to observe particular discussion threads. For such cases, I find the following ConnectionPool class very useful. When getConnection is called for the first time, it creates a state flow that makes a connection based on the flow specified in its builder. Notice that ConnectionPool uses state flows with WhileSubscribed, so they only keep connections active for as long as they are needed.

class ConnectionPool<K, V>( private val scope: CoroutineScope, private val builder: (K) -> Flow<V>, ) { private val connections = mutableMapOf<K, Flow<V>>() fun getConnection(key: K): Flow<V> = synchronized(this) { connections.getOrPut(key) { builder(key).shareIn( scope, started = SharingStarted.WhileSubscribed(), ) } } } // Practical example use private val scope = CoroutineScope(SupervisorJob()) private val messageConnections = ConnectionPool(scope) { threadId: String -> api.observeMessageThread(threadId) } fun observeMessageThread(threadId: String) = messageConnections.getConnection(threadId)

Notice that the getConnection method uses a regular synchronization block. This is because it is a regular function, as all functions that return Flow should be. This synchronization secures access to the connections variable. The getConnection function should execute very quickly because it only defines a flow. A connection will be created when at least a single flow needs it. Notice that, thanks to the fact we’re using WhileSubscribed, a connection will only be maintained when there is at least a single coroutine using it.

WhileSubscribed can be parameterized. These parameter values could be injected into ConnectionPool using a constructor, as in the example below.

class ConnectionPool<K, V>( private val scope: CoroutineScope, private val replay: Int = 0, private val stopTimeout: Duration, private val replayExpiration: Duration, private val builder: (K) -> Flow<V>, ) { private val connections = mutableMapOf<K, Flow<V>>() fun getConnection(key: K): Flow<V> = synchronized(this) { connections.getOrPut(key) { builder(key).shareIn( scope, started = SharingStarted.WhileSubscribed( stopTimeoutMillis = stopTimeout.inWholeMilliseconds, replayExpirationMillis = replayExpiration.inWholeMilliseconds, ), replay = replay, ) } } }

Recipe 4: Coroutine races

As I mentioned in the Select chapter, to start a couple of suspending processes and await the result of the one that finishes first, we can use the raceOf function from the Splitties library. However, I am not a fan of depending on a library just to use a function that can be implemented in a couple of lines. So, this is my implementation of raceOf:

suspend fun <T> raceOf( racer: suspend CoroutineScope.() -> T, vararg racers: suspend CoroutineScope.() -> T ): T = coroutineScope { select { (listOf(racer) + racers).forEach { racer -> async { racer() }.onAwait { coroutineContext.job.cancelChildren() it } } } } // Example use suspend fun a(): String { delay(1000) return "A" } suspend fun b(): String { delay(2000) return "B" } suspend fun c(): String { delay(3000) return "C" } suspend fun main(): Unit = coroutineScope { println(raceOf({ c() })) // (3 sec) // C println(raceOf({ b() }, { a() })) // (1 sec) // A println(raceOf({ b() }, { c() })) // (2 sec) // B println(raceOf({ b() }, { a() }, { c() })) // (1 sec) // A }
// Practical example use suspend fun fetchUserData(): UserData = raceOf( { service1.fetchUserData() }, { service2.fetchUserData() } )

Recipe 5: Retrying a suspending process

Since we live in the real world, we need to face the fact that unexpected errors might occur. When you request some data from a service, it might be temporarily unavailable, your network connection might be broken, or anything else might happen. One of the ways we handle such situations is by implementing an automatic reply to try again if a process fails.

We’ve already learned that we retry a flow using the retry or retryWhen methods.

fun makeConnection(config: ConnectionConfig) = api .startConnection(config) .retryWhen { e, attempt -> val times = 2.0.pow(attempt.toDouble()).toInt() delay(maxOf(10_000L, 100L * times)) log.error(e) { "Error for $config" } e is ApiException && e.code !in 400..499 }

There is no such function for retrying regular suspending processes, but the simplest solution could just be a loop that retries the process until it succeeds.

inline fun <T> retry(operation: () -> T): T { while (true) { try { return operation() } catch (e: Throwable) { // no-op } } } // Usage suspend fun requestData(): String { if (Random.nextInt(0, 10) == 0) { return "ABC" } else { error("Error") } } suspend fun main(): Unit = coroutineScope { println(retry { requestData() }) } // (1 sec) // ABC
// Practical example use suspend fun checkConnection(): Boolean = retryWhen( predicate = { _, retries -> retries < 3 }, operation = { api.connected() } )

The problem is that there is no such thing as a standard reply. When we implement such a mechanism, we often want to include:

  • the conditions under which the process should be retried, often based on the number of retries and the exception type,
  • increasing delay between retries,
  • exception and information logging.

I know two good approaches to implementing retry. The first involves defining a universal function like retryWhen that can be easily parametrized on the use side. The following snippet presents my implementation of such a function and includes two important features:

  • it never retries cancellation exceptions so as not to harm the cancellation mechanism,
  • it adds previous exceptions as suppressed exceptions, so they are displayed when the final exception is thrown out of the function.
inline fun <T> retryWhen( predicate: (Throwable, retries: Int) -> Boolean, operation: () -> T ): T { var retries = 0 var fromDownstream: Throwable? = null while (true) { try { return operation() } catch (e: Throwable) { if (fromDownstream != null) { e.addSuppressed(fromDownstream) } fromDownstream = e if (e is CancellationException || !predicate(e, retries++) ) { throw e } } } } // Usage suspend fun requestWithRetry() = retryWhen( predicate = { e, retries -> val times = 2.0.pow(attempt.toDouble()).toInt() delay(maxOf(10_000L, 100L * times)) log.error(e) { "Retried" } retries < 10 && e is IllegalStateException } ) { requestData() }

The second approach is to implement an application-specific retry function that is predefined for how we want to retry in this application. Here is an example of how such a function might look:

inline suspend fun <T> retry( operation: () -> T ): T { var retries = 0 while (true) { try { return operation() } catch (e: Exception) { val times = 2.0.pow(attempt.toDouble()).toInt() delay(maxOf(10_000L, 100L * times)) if (e is CancellationException || retries >= 10){ throw e } retries++ log.error(e) { "Retrying" } } } } // Usage suspend fun requestWithRetry() = retry { requestData() }

There is a popular algorithm for retrying that is called the exponential backoff, where after each failed retry, there is a growing backoff delay. It is based on the idea that the longer we wait, the less likely it is that the error will occur again. You can find its implementation in my recipes' repository. Here is how it can be used:

suspend fun fetchUser(): User = retryBackoff( minDelay = 1.seconds, maxDelay = 10.seconds, // optional maxAttempts = 10, // optional backoffFactor = 1.5, // optional jitterFactor = 0.5, // optional beforeRetry = { cause, _, -> // optional println("Retrying after $cause") }, retriesExhausted = { cause -> // optional println("Retries exhausted after $cause") }, ) { api.fetchUser() } fun observeUserUpdates(): Flow<User> = api .observeUserUpdates() .retryBackoff( minDelay = 1.seconds, maxDelay = 1.minutes, // optional maxAttempts = 30, // optional backoffFactor = 2.0, // optional jitterFactor = 0.1, // optional beforeRetry = { cause, _, _ -> // optional println("Retrying after $cause") }, retriesExhausted = { cause -> // optional println("Retries exhausted after $cause") }, )

Summary

In this section, I've presented a couple of recipes I use in my projects. I hope they will help you not only with the presented problems but also with implementing your own unique recipes.

1:

Details about Mutex can be found in the The problem with shared states chapter.

2:

This problem is described in detail in Effective Kotlin, Item 50: Eliminate obsolete object references.