article banner (priority)

Common Kotlin Coroutines use-cases

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub.

All good things must come to an end, and the same is sadly true of this book. Of course, there is still much more to say about Kotlin Coroutines, but I believe we’ve covered the essentials well. Now it is your turn to start using this knowledge in practice and deepen your understanding.

As a short summary, I would like to present the most important use cases and reflections on good style. Firstly, if we introduce coroutines into our code, it is best to use them from the bottom to the top of your application. We should avoid mixing suspending functions with blocking functions other concurrency styles, like callbacks or explicit thread starting. The easiest way to achieve this is by using libraries that have built-in support for suspending functions.

// Retrofit class GithubApi { @GET("orgs/{organization}/repos?per_page=100") suspend fun getOrganizationRepos( @Path("organization") organization: String ): List<Repo> } // Room @Dao interface LocationDao { @Insert(onConflict = OnConflictStrategy.IGNORE) suspend fun insertLocation(location: Location) @Query("DELETE FROM location_table") suspend fun deleteLocations() @Query("SELECT * FROM location_table ORDER BY time") fun observeLocations(): Flow<List<Location>> }

If you cannot avoid calling a blocking function, wrap it with withContext and set a proper dispatcher0 (either Dispatchers.IO or a custom dispatcher that is built using Dispatchers.IO and limitedParallelism).

class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { private val dispatcher = Dispatchers.IO .limitedParallelism(5) override suspend fun getUser(): UserData = withContext(dispatcher) { discReader.read<UserData>("userName") } }

If you need to use a callback function, wrap it with suspendCancellableCoroutine. If possible, handle exceptions2 and cancellation3.

suspend fun requestNews(): News { return suspendCancellableCoroutine<News> { cont -> val call = requestNews( onSuccess = { news -> cont.resume(news) }, onError = { e -> cont.resumeWithException(e) } ) cont.invokeOnCancellation { call.cancel() } } }

If a process involves CPU-intensive operations, use Dispatchers.Default; if it involves UI modifications that need to happen on the main thread, use Dispatchers.Main1.

suspend fun calculateModel() = withContext(Dispatchers.Default) { model.fit( dataset = newTrain, epochs = 10, batchSize = 100, verbose = false ) } suspend fun setUserName(name: String) = withContext(Dispatchers.Main.immediate) { userNameView.text = name }

Coroutines need to start somewhere. On backend frameworks, like Spring or Ktor, there is built-in support for suspending functions, so it is mainly the framework’s responsibility to start coroutines.

@Controller class UserController( private val tokenService: TokenService, private val userService: UserService, ) { @GetMapping("/me") suspend fun findUser( @PathVariable userId: String, @RequestHeader("Authorization") authorization: String ): UserJson { val userId = tokenService.readUserId(authorization) val user = userService.findUserById(userId) return user.toJson() } }

However, Android and some other backend libraries do not offer such convenience. The solution is to make our own scope and use it to start coroutines. On Android, we can use viewModelScope or lifecycleScope4.

class UserProfileViewModel( private val loadProfileUseCase: LoadProfileUseCase, private val updateProfileUseCase: UpdateProfileUseCase, ) { private val userProfile = MutableSharedFlow<UserProfileData>() val userName: Flow<String> = userProfile .map { it.name } val userSurname: Flow<String> = userProfile .map { it.surname } // ... fun onCreate() { viewModelScope.launch { val userProfileData = loadProfileUseCase.execute() userProfile.value = userProfileData // ... } } fun onNameChanged(newName: String) { viewModelScope.launch { val newProfile = userProfile.copy(name = newName) userProfile.value = newProfile updateProfileUseCase.execute(newProfile) } } }
class UserProfileViewModel { private val _userChanges = MutableSharedFlow<UserChange>() val userChanges: SharedFlow<UserChange> = _userChanges fun onCreate() { viewModelScope.launch { userChanges.collect(::applyUserChange) } } fun onNameChanged(newName: String) { // ... _userChanges.emit(NameChange(newName)) } fun onPublicKeyChanged(newPublicKey: String) { // ... _userChanges.emit(PublicKeyChange(newPublicKey)) } }

In other cases, we need to create a custom scope. This is described in detail in the Constructing a coroutine scope chapter, but here are a few examples:

// On any platform val analyticsScope = CoroutineScope(SupervisorJob()) // Android example with cancellation and exception handler abstract class BaseViewModel : ViewModel() { private val _failure: MutableLiveData<Throwable> = MutableLiveData() val failure: LiveData<Throwable> = _failure private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> _failure.value = throwable } private val context = Dispatchers.Main + SupervisorJob() + exceptionHandler protected val scope = CoroutineScope(context) override fun onCleared() { context.cancelChildren() } } // Spring example with custom exception handler @Configuration public class CoroutineScopeConfiguration { @Bean(name = "coroutineDispatcher") fun coroutineDispatcher(): CoroutineDispatcher = Dispatchers.IO.limitedParallelism(5) @Bean(name = "coroutineExceptionHandler") fun exceptionHandler(): CoroutineExceptionHandler = CoroutineExceptionHandler { _, throwable -> FirebaseCrashlytics.getInstance() .recordException(throwable) } @Bean fun coroutineScope( coroutineDispatcher: CoroutineDispatcher, exceptionHandler: CoroutineExceptionHandler, ) = CoroutineScope( SupervisorJob() + coroutineDispatcher + coroutineExceptionHandler ) }

Such scopes might be useful when we need to start new processes. For example, when some task scheduler regularly starts a process that sends notifications. In this case, we need a regular function that starts coroutines. If we need this function to be blocked until all coroutines are finished, use runBlocking, as presented in the example below.

class NotificationsSender( private val client: NotificationsClient, private val exceptionCollector: ExceptionCollector, ) { private val handler = CoroutineExceptionHandler { _, e -> exceptionCollector.collectException(e) } private val job = SupervisorJob() private val scope = CoroutineScope(job + handler) fun sendNotifications(notifications: List<Notification>) { val jobs = notifications.map { notification -> scope.launch { client.send(notification) } } runBlocking { jobs.joinAll() } } fun cancel() { job.cancelChildren() } }

Between starting coroutines and suspending functions from repositories, we mainly have suspending functions calling suspending functions.

class NetworkUserRepository( private val api: UserApi, ) : UserRepository { override suspend fun getUser(): User = api.getUser().toDomainUser() } class NetworkNewsService( private val newsRepo: NewsRepository, private val settings: SettingsRepository, ) { suspend fun getNews(): List<News> = newsRepo .getNews() .map { it.toDomainNews() } suspend fun getNewsSummary(): List<News> { val type = settings.getNewsSummaryType() return newsRepo.getNewsSummary(type) } }

When you need to introduce concurrency into suspending functions, wrap their body with coroutineScope and use async builder5.

suspend fun getArticlesForUser( userToken: String?, ): List<ArticleJson> = coroutineScope { val articles = async { articleRepository.getArticles() } val user = userService.getUser(userToken) articles.await() .filter { canSeeOnList(user, it) } .map { toArticleJson(it) } }

This can be scaled to a bigger number of async processes, but we can also limit it to a certain concurrency by using Flow and flatMapMerge6.

suspend fun getOffers( categories: List<Category> ): List<Offer> = coroutineScope { categories .map { async { api.requestOffers(it) } } .flatMap { it.await() } } // A better solution suspend fun getOffers( categories: List<Category> ): Flow<Offer> = categories .asFlow() .flatMapMerge(concurrency = 20) { suspend { api.requestOffers(it) }.asFlow() // or flow { emit(api.requestOffers(it)) } }

If you need to ignore exceptions, use supervisorScope instead of coroutineScope.

suspend fun notifyAnalytics(actions: List<UserAction>) = supervisorScope { actions.forEach { action -> launch { notifyAnalytics(action) } } }

If you use async, you also need to catch exceptions that are thrown from await7.

class ArticlesRepositoryComposite( private val articleRepositories: List<ArticleRepository>, ) : ArticleRepository { override suspend fun fetchArticles(): List<Article> = supervisorScope { articleRepositories .map { async { it.fetchArticles() } } .mapNotNull { try { it.await() } catch (e: Throwable) { e.printStackTrace() null } } .flatten() .sortedByDescending { it.publishedAt } } }

To set a timeout for a coroutine, use withTimeout or withTimeoutOrNull8.

suspend fun getUserOrNull(): User? = withTimeoutOrNull(5000) { fetchUser() }

For coroutine testing use cases, see the Testing Kotlin Coroutines chapter. For use cases that involve safely accessing a shared state, see The problem with shared states chapter.

0:

For details, see the Dispatchers chapter.

1:

For details, see the Dispatchers chapter.

2:

For details, see the How does suspension work? chapter.

3:

For details, see the Cancellation chapter.

4:

For details, see the Constructing a coroutine scope chapter.

5:

For details, see the Coroutine builders chapter.

6:

For details, see the Flow processing chapter.

7:

For details, see the Coroutine scope functions chapter.

8:

For details, see the Coroutine scope functions chapter.