Common Kotlin Coroutines use-cases
All good things must come to an end, and the same is sadly true of this book. Of course, there is still much more to say about Kotlin Coroutines, but I believe we’ve covered the essentials well. Now it is your turn to start using this knowledge in practice and deepen your understanding.
As a short summary, I would like to present the most important use cases and reflections on good style. Firstly, if we introduce coroutines into our code, it is best to use them from the bottom to the top of your application. We should avoid mixing suspending functions with blocking functions other concurrency styles, like callbacks or explicit thread starting. The easiest way to achieve this is by using libraries that have built-in support for suspending functions.
// Retrofit
class GithubApi {
@GET("orgs/{organization}/repos?per_page=100")
suspend fun getOrganizationRepos(
@Path("organization") organization: String
): List<Repo>
}
// Room
@Dao
interface LocationDao {
@Insert(onConflict = OnConflictStrategy.IGNORE)
suspend fun insertLocation(location: Location)
@Query("DELETE FROM location_table")
suspend fun deleteLocations()
@Query("SELECT * FROM location_table ORDER BY time")
fun observeLocations(): Flow<List<Location>>
}
If you cannot avoid calling a blocking function, wrap it with withContext
and set a proper dispatcher0 (either Dispatchers.IO
or a custom dispatcher that is built using Dispatchers.IO
and limitedParallelism
).
class DiscUserRepository(
private val discReader: DiscReader
) : UserRepository {
private val dispatcher = Dispatchers.IO
.limitedParallelism(5)
override suspend fun getUser(): UserData =
withContext(dispatcher) {
discReader.read<UserData>("userName")
}
}
If you need to use a callback function, wrap it with suspendCancellableCoroutine
. If possible, handle exceptions2 and cancellation3.
suspend fun requestNews(): News {
return suspendCancellableCoroutine<News> { cont ->
val call = requestNews(
onSuccess = { news -> cont.resume(news) },
onError = { e -> cont.resumeWithException(e) }
)
cont.invokeOnCancellation {
call.cancel()
}
}
}
If a process involves CPU-intensive operations, use Dispatchers.Default
; if it involves UI modifications that need to happen on the main thread, use Dispatchers.Main
1.
suspend fun calculateModel() =
withContext(Dispatchers.Default) {
model.fit(
dataset = newTrain,
epochs = 10,
batchSize = 100,
verbose = false
)
}
suspend fun setUserName(name: String) =
withContext(Dispatchers.Main.immediate) {
userNameView.text = name
}
Coroutines need to start somewhere. On backend frameworks, like Spring or Ktor, there is built-in support for suspending functions, so it is mainly the framework’s responsibility to start coroutines.
@Controller
class UserController(
private val tokenService: TokenService,
private val userService: UserService,
) {
@GetMapping("/me")
suspend fun findUser(
@PathVariable userId: String,
@RequestHeader("Authorization") authorization: String
): UserJson {
val userId = tokenService.readUserId(authorization)
val user = userService.findUserById(userId)
return user.toJson()
}
}
However, Android and some other backend libraries do not offer such convenience. The solution is to make our own scope and use it to start coroutines. On Android, we can use viewModelScope
or lifecycleScope
4.
class UserProfileViewModel(
private val loadProfileUseCase: LoadProfileUseCase,
private val updateProfileUseCase: UpdateProfileUseCase,
) {
private val userProfile =
MutableSharedFlow<UserProfileData>()
val userName: Flow<String> = userProfile
.map { it.name }
val userSurname: Flow<String> = userProfile
.map { it.surname }
// ...
fun onCreate() {
viewModelScope.launch {
val userProfileData =
loadProfileUseCase.execute()
userProfile.value = userProfileData
// ...
}
}
fun onNameChanged(newName: String) {
viewModelScope.launch {
val newProfile = userProfile.copy(name = newName)
userProfile.value = newProfile
updateProfileUseCase.execute(newProfile)
}
}
}
class UserProfileViewModel {
private val _userChanges =
MutableSharedFlow<UserChange>()
val userChanges: SharedFlow<UserChange> = _userChanges
fun onCreate() {
viewModelScope.launch {
userChanges.collect(::applyUserChange)
}
}
fun onNameChanged(newName: String) {
// ...
_userChanges.emit(NameChange(newName))
}
fun onPublicKeyChanged(newPublicKey: String) {
// ...
_userChanges.emit(PublicKeyChange(newPublicKey))
}
}
In other cases, we need to create a custom scope. This is described in detail in the Constructing a coroutine scope chapter, but here are a few examples:
// On any platform
val analyticsScope = CoroutineScope(SupervisorJob())
// Android example with cancellation and exception handler
abstract class BaseViewModel : ViewModel() {
private val _failure: MutableLiveData<Throwable> =
MutableLiveData()
val failure: LiveData<Throwable> = _failure
private val exceptionHandler =
CoroutineExceptionHandler { _, throwable ->
_failure.value = throwable
}
private val context =
Dispatchers.Main + SupervisorJob() + exceptionHandler
protected val scope = CoroutineScope(context)
override fun onCleared() {
context.cancelChildren()
}
}
// Spring example with custom exception handler
@Configuration
public class CoroutineScopeConfiguration {
@Bean(name = "coroutineDispatcher")
fun coroutineDispatcher(): CoroutineDispatcher =
Dispatchers.IO.limitedParallelism(5)
@Bean(name = "coroutineExceptionHandler")
fun exceptionHandler(): CoroutineExceptionHandler =
CoroutineExceptionHandler { _, throwable ->
FirebaseCrashlytics.getInstance()
.recordException(throwable)
}
@Bean
fun coroutineScope(
coroutineDispatcher: CoroutineDispatcher,
exceptionHandler: CoroutineExceptionHandler,
) = CoroutineScope(
SupervisorJob() +
coroutineDispatcher +
coroutineExceptionHandler
)
}
Such scopes might be useful when we need to start new processes. For example, when some task scheduler regularly starts a process that sends notifications. In this case, we need a regular function that starts coroutines. If we need this function to be blocked until all coroutines are finished, use runBlocking
, as presented in the example below.
class NotificationsSender(
private val client: NotificationsClient,
private val exceptionCollector: ExceptionCollector,
) {
private val handler = CoroutineExceptionHandler { _, e ->
exceptionCollector.collectException(e)
}
private val job = SupervisorJob()
private val scope = CoroutineScope(job + handler)
fun sendNotifications(notifications: List<Notification>) {
val jobs = notifications.map { notification ->
scope.launch {
client.send(notification)
}
}
runBlocking { jobs.joinAll() }
}
fun cancel() {
job.cancelChildren()
}
}
Between starting coroutines and suspending functions from repositories, we mainly have suspending functions calling suspending functions.
class NetworkUserRepository(
private val api: UserApi,
) : UserRepository {
override suspend fun getUser(): User =
api.getUser().toDomainUser()
}
class NetworkNewsService(
private val newsRepo: NewsRepository,
private val settings: SettingsRepository,
) {
suspend fun getNews(): List<News> = newsRepo
.getNews()
.map { it.toDomainNews() }
suspend fun getNewsSummary(): List<News> {
val type = settings.getNewsSummaryType()
return newsRepo.getNewsSummary(type)
}
}
When you need to introduce concurrency into suspending functions, wrap their body with coroutineScope
and use async
builder5.
suspend fun getArticlesForUser(
userToken: String?,
): List<ArticleJson> = coroutineScope {
val articles = async { articleRepository.getArticles() }
val user = userService.getUser(userToken)
articles.await()
.filter { canSeeOnList(user, it) }
.map { toArticleJson(it) }
}
This can be scaled to a bigger number of async processes, but we can also limit it to a certain concurrency by using Flow and flatMapMerge
6.
suspend fun getOffers(
categories: List<Category>
): List<Offer> = coroutineScope {
categories
.map { async { api.requestOffers(it) } }
.flatMap { it.await() }
}
// A better solution
suspend fun getOffers(
categories: List<Category>
): Flow<Offer> = categories
.asFlow()
.flatMapMerge(concurrency = 20) {
suspend { api.requestOffers(it) }.asFlow()
// or flow { emit(api.requestOffers(it)) }
}
If you need to ignore exceptions, use supervisorScope
instead of coroutineScope
.
suspend fun notifyAnalytics(actions: List<UserAction>) =
supervisorScope {
actions.forEach { action ->
launch {
notifyAnalytics(action)
}
}
}
If you use async
, you also need to catch exceptions that are thrown from await
7.
class ArticlesRepositoryComposite(
private val articleRepositories: List<ArticleRepository>,
) : ArticleRepository {
override suspend fun fetchArticles(): List<Article> =
supervisorScope {
articleRepositories
.map { async { it.fetchArticles() } }
.mapNotNull {
try {
it.await()
} catch (e: Throwable) {
e.printStackTrace()
null
}
}
.flatten()
.sortedByDescending { it.publishedAt }
}
}
To set a timeout for a coroutine, use withTimeout
or withTimeoutOrNull
8.
suspend fun getUserOrNull(): User? =
withTimeoutOrNull(5000) {
fetchUser()
}
For coroutine testing use cases, see the Testing Kotlin Coroutines chapter. For use cases that involve safely accessing a shared state, see The problem with shared states chapter.
Nicola Corti is a Google Developer Expert for Kotlin. He has been working with the language since before version 1.0 and he is the maintainer of several open-source libraries and tools.
He's currently working as Android Infrastructure Engineer at Spotify in Stockholm, Sweden.
Furthermore, he is an active member of the developer community.
His involvement goes from speaking at international conferences about Mobile development to leading communities across Europe (GDG Pisa, KUG Hamburg, GDG Sthlm Android).
In his free time, he also loves baking, photography, and running.