article banner

Kotlin Coroutines use cases for Data/Adapters Layer

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

I will start by presenting typical Kotlin Coroutines use cases from the Data/Adapters Layer, which is where we implement repositories, providers, adapters, data sources, etc. This layer is relatively easy nowadays because many popular JVM libraries support Kotlin Coroutines either out of the box or with some additional dependencies.

As an example, we might use Retrofit, which is a popular library for network requests. It has out-of-the-box support for suspending functions. It is enough to add the suspend modifier to make its request definition functions suspending instead of blocking.

// Retrofit class GithubApi { @GET("orgs/{organization}/repos?per_page=100") suspend fun getOrganizationRepos( @Path("organization") organization: String ): List<Repo> }

Another good example is Room, which is a popular library for communication with SQLite databases on Android. It supports both the suspend modifier to make its functions suspending, as well as Flow for observing table states.

// Room @Dao interface LocationDao { @Insert suspend fun insertLocation(location: Location) @Query("DELETE FROM location_table") suspend fun deleteLocations() @Query("SELECT * FROM location_table ORDER BY time") fun observeLocations(): Flow<List<Location>> }

Callback functions

If you need to use a library that does not support Kotlin Coroutines but instead forces you to use callback functions, turn them into suspending functions using the suspendCancellableCoroutine0. When a callback is called, the coroutine should be resumed using the resume method on Continuation. If this callback function is cancellable, it should be cancelled inside the invokeOnCancellation lambda expression1.

suspend fun requestNews(): News { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi { news -> cont.resume(news) } cont.invokeOnCancellation { call.cancel() } } }

Callback functions that allow us to set separate functions for success and failure can be implemented in a few different ways. We could wrap over callback functions and return Result, and either resume our coroutine with Result.success or with Result.failure.

suspend fun requestNews(): Result<News> { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi( onSuccess = { news -> cont.resume(Result.success(news)) }, onError = { e -> cont.resume(Result.failure(e)) } ) cont.invokeOnCancellation { call.cancel() } } }

Another option is to return a nullable value and either resume our coroutine with the result data or with the null value.

suspend fun requestNews(): News? { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi( onSuccess = { news -> cont.resume(news) }, onError = { e -> cont.resume(null) } ) cont.invokeOnCancellation { call.cancel() } } }

The last popular option is to resume with a result in the case of callback function success or resume with an exception in case of failure. In the latter case, the exception is thrown from the suspension point2.

suspend fun requestNews(): News { return suspendCancellableCoroutine<News> { cont -> val call = requestNewsApi( onSuccess = { news -> cont.resume(news) }, onError = { e -> cont.resumeWithException(e) } ) cont.invokeOnCancellation { call.cancel() } } }

Blocking functions

Another common situation is when a library you use requires the use of blocking functions. You should never call blocking functions on regular suspending functions. In Kotlin Coroutines, we use threads with great precision, and blocking them is a big problem. If we block the thread from Dispatchers.Main on Android, our whole application freezes. If we block the thread from Dispatchers.Default, we can forget about efficient processor use. This is why we should never make a blocking call without first specifying the dispatcher3.

When we need to make a blocking call, we should specify the dispatcher using withContext. In most cases, when we implement repositories in applications, it is enough to use Dispatchers.IO4.

class DiscSaveRepository( private val discReader: DiscReader ) : SaveRepository { override suspend fun loadSave(name: String): SaveData = withContext(Dispatchers.IO) { discReader.read("save/$name") } }

However, it's important to understand that Dispatchers.IO is limited to 64 threads, which might not be enough on the backend and Android. If every request needs to make a blocking call, and you have thousands of requests per second, the queue for these 64 threads might start growing quickly. In such a situation, you might consider using limitedParallelism on Dispatchers.IO to make a new dispatcher with an independent limit that is greater than 64 threads5.

class LibraryGoogleAccountVerifier : GoogleAccountVerifier { private val dispatcher = Dispatchers.IO .limitedParallelism(100) private var verifier = GoogleIdTokenVerifier.Builder(..., ...) .setAudience(...) .build() override suspend fun getUserData( googleToken: String ): GoogleUserData? = withContext(dispatcher) { verifier.verify(googleToken) ?.payload ?.let { GoogleUserData( email = it.email, name = it.getString("given_name"), surname = it.getString("family_name"), imageUrl = it.getString("picture"), ) } } }

A dispatcher with a limit that is independent of Dispatchers.IO should be used whenever we suspect that our function might be called by so many coroutines that they might use a significant number of threads. In such cases, we do not want to block threads from Dispatchers.IO because we do not know which processes will wait until our process is finished.

When we implement a library, we often do not know how our functions will be used, and we should generally operate on dispatchers with independent pools of threads. What should be the limit of such dispatchers? You need to decide for yourself. If you make this a small number, coroutines might wait for one another. If you make it too big, you might use a lot of memory and CPU time on all these active threads.

class CertificateGenerator { private val dispatcher = Dispatchers.IO .limitedParallelism(5) suspend fun generate(data: CertificateData): UserData = withContext(dispatcher) { Runtime.getRuntime() .exec("generateCertificate " + data.toArgs()) } }

We should also ensure that all CPU-intensive processes run on Dispatchers.Default, and all processes that modify the main view run on Dispatchers.Main.immediate. For that, withContext might also be useful.

suspend fun calculateModel() = withContext(Dispatchers.Default) { model.fit( dataset = newTrain, epochs = 10, batchSize = 100, verbose = false ) }
suspend fun setUserName(name: String) = withContext(Dispatchers.Main.immediate) { userNameView.text = name }

Observing with Flow

Suspending functions are perfect for representing the process of producing/fetching a single value; however, when we expect more than one value, we should use Flow instead. We've seen one example already: in the Room library, we use suspend functions to perform a single database operation, and we use the Flow type to observe changes in a table.

// Room @Dao interface LocationDao { @Insert(onConflict = OnConflictStrategy.IGNORE) suspend fun insertLocation(location: Location) @Query("DELETE FROM location_table") suspend fun deleteLocations() @Query("SELECT * FROM location_table ORDER BY time") fun observeLocations(): Flow<List<Location>> }

We have a similar situation when we consider network calls. When we fetch a single value from an API, it is best to use a suspend function; however, when we set up a WebSocket and listen for messages, we should use Flow instead. To create such a flow (if the library we use does not support returning one), we should use callbackFlow (or channelFlow). Remember to end your builder with awaitClose6.

fun listenMessages(): Flow<List<Message>> = callbackFlow { socket.on("NewMessage") { args -> trySendBlocking(args.toMessage()) } awaitClose() }

A popular use for Flow is observing UI events, like button clicks or text changes.

fun EditText.listenTextChange(): Flow<String> = callbackFlow { val watcher = doAfterTextChanged { trySendBlocking(it.toString()) } awaitClose { removeTextChangedListener(watcher) } }

Flow can also be used for other callback functions, and it should be used when these callbacks might produce multiple values.

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow { val callback = object : Callback { override fun onNextValue(value: T) { trySendBlocking(value) } override fun onApiError(cause: Throwable) { cancel(CancellationException("API Error", cause)) } override fun onCompleted() = channel.close() } api.register(callback) awaitClose { api.unregister(callback) } }

If you need to use a specific dispatcher in a flow builder, use flowOn on the produced flow7.

fun fibonacciFlow(): Flow<BigDecimal> = flow { var a = BigDecimal.ZERO var b = BigDecimal.ONE emit(a) emit(b) while (true) { val temp = a a = b b += temp emit(b) } }.flowOn(Dispatchers.Default) fun filesContentFlow(path: String): Flow<String> = channelFlow { File(path).takeIf { it.exists() } ?.listFiles() ?.forEach { send(it.readText()) } }.flowOn(Dispatchers.IO)
0:

For details, see the How does suspension work? chapter, section Resuming with a value.

1:

For details, see the Cancellation chapter, section invokeOnCompletion.

2:

For details, see the How does suspension work? chapter, section Resume with an exception.

3:

For details, see the Dispatchers chapter.

4:

For details, see the Dispatchers chapter, section IO dispatcher.

5:

For details, see the Dispatchers chapter, section IO dispatcher with a custom pool of threads.

6:

For details, see the Flow building chapter, section callbackFlow.

7:

For details, see the Flow lifecycle functions chapter, section flowOn.