article banner

Kotlin Coroutines use cases for Domain Layer

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Now, let's discuss common Kotlin Coroutines use cases in the domain layer. This is where the business logic is implemented, therefore it's where we define use cases, services, facades, etc. In this layer, we should avoid operating on coroutine scope objects and exposing suspending functions. The layer below this (the presentation layer) is responsible for starting coroutines on scope objects; in the domain layer, we should use coroutine scope functions to start coroutines.

In practice, in the domain layer, we mainly have suspend functions that call other suspending functions.

class NetworkUserRepository( private val api: UserApi, ) : UserRepository { override suspend fun getUser(): User = api.getUser().toDomainUser() } class NetworkNewsService( private val newsRepo: NewsRepository, private val settings: SettingsRepository, ) { suspend fun getNews(): List<News> = newsRepo .getNews() .map { it.toDomainNews() } suspend fun getNewsSummary(): List<News> { val type = settings.getNewsSummaryType() return newsRepo.getNewsSummary(type) } }

Concurrent calls

When we want two processes to happen in parallel, we should wrap our function body with coroutineScope and use async builder inside it to start each process that should run asynchronously.

suspend fun produceCurrentUser(): User = coroutineScope { val profile = async { repo.getProfile() } val friends = async { repo.getFriends() } User(profile.await(), friends.await()) }

It is essential to understand that this modification should only make these two processes run in parallel. All the other mechanisms, like cancellation, exception handling, or context passing, should stay the same. So, if you consider the produceCurrentUserSeq and produceCurrentUserPar functions below, the only important difference is that the first one is sequential, while the second one starts two parallel processes8.

suspend fun produceCurrentUserSeq(): User { val profile = repo.getProfile() val friends = repo.getFriends() return User(profile, friends) } suspend fun produceCurrentUserPar(): User = coroutineScope { val profile = async { repo.getProfile() } val friends = async { repo.getFriends() } User(profile.await(), friends.await()) }

When we want to start two asynchronous processes and then await their completion, we can do that by creating a new coroutine for each of them using the async function; however, the same result can also be achieved if we start only one process using async and run the second one on the same coroutine. The following implementation of produceCurrentUserPar will have practically the same behavior as the previous one. Which option should be preferred? I think that most developers will prefer the first one, arguing that using async for each process we want to execute in parallel makes our code more readable. On the other hand, some developers will prefer the second option, arguing that it is more efficient because it uses fewer coroutines and creates fewer objects. It is up to you to choose which option you prefer.

suspend fun produceCurrentUserPar(): User = coroutineScope { val profile = async { repo.getProfile() } val friends = repo.getFriends() User(profile.await(), friends) }
suspend fun getArticlesForUser( userToken: String?, ): List<ArticleJson> = coroutineScope { val articles = async { articleRepository.getArticles() } val user = userService.getUser(userToken) articles.await() .filter { canSeeOnList(user, it) } .map { toArticleJson(it) } }

We can use async together with collection processing functions to start an asynchronous process for each list element. In such cases, it is good practice to await results using the awaitAll function.

suspend fun getOffers( categories: List<Category> ): List<Offer> = coroutineScope { categories .map { async { api.requestOffers(it) } } .awaitAll() .flatten() }

If you want to limit the number of concurrent calls, you can use a rate limiter. For example, the resilience4j library offers rate limiters for suspending functions. You can also transform your list into Flow and then use flatMapMerge with the concurrency parameter, which specifies how many concurrent calls you will send10.

fun getOffers( categories: List<Category> ): Flow<List<Offer>> = categories .asFlow() .flatMapMerge(concurrency = 20) { suspend { api.requestOffers(it) }.asFlow() // or flow { emit(api.requestOffers(it)) } }

When you use coroutineScope, remember that an exception in any child coroutine will break the coroutine created by coroutineScope, cancel all its other children, and then throw an exception. This is the behavior we typically expect, but in some cases it doesn't suit us very well. When we want to start a number of concurrent processes that are considered independent, we should instead use supervisorScope, which ignores exceptions in its children11.

suspend fun notifyAnalytics(actions: List<UserAction>) = supervisorScope { actions.forEach { action -> launch { notifyAnalytics(action) } } }

When we want to limit the execution time of a process, we can use withTimeout or withTimeoutOrNull, both of which cancel their process if it takes longer than the time specified by the argument12.

suspend fun getUserOrNull(): User? = withTimeoutOrNull(5000) { fetchUser() }

Flow transformations

Before we finish discussing this layer, we should also review the typical ways in which we process Flow. In most cases, we just use basic flow processing functions map, filter, or onEach, and occasionally less common functions like scan or flatMapMerge13.

class UserStateProvider( private val userRepository: UserRepository ) { fun userStateFlow(): Flow<User> = userRepository .observeUserChanges() .filter { it.isSignificantChange } .scan(userRepository.currentUser()) { user, update -> user.with(update) } .map { it.toDomainUser() } }

When you want to merge two flows together, you might use functions like merge, zip, or combine14.

class ArticlesProvider( private val ktAcademy: KtAcademyRepository, private val kotlinBlog: KtAcademyRepository, ) { fun observeArticles(): Flow<Article> = merge( ktAcademy.observeArticles().map { it.toArticle() }, kotlinBlog.observeArticles().map { it.toArticle() }, ) } class NotificationStatusProvider( private val userStateProvider: UserStateProvider, private val notificationsProvider: NotificationsProvider, private val statusFactory: NotificationStatusFactory, ) { fun notificationStatusFlow(): Flow<NotificationStatus> = notificationsProvider.observeNotifications() .filter { it.status == Notification.UNSEEN } .combine(userStateProvider.userStateFlow()) { notifications, user -> statusFactory.produce(notifications, user) } }

If you want a single flow to be observed by multiple coroutines, transform it to SharedFlow. A common way to do this is by using shareIn with a scope. To keep this flow active only when needed, use the WhileSubscribed option for the started parameter15.

class LocationService( locationDao: LocationDao, scope: CoroutineScope ) { private val locations = locationDao.observeLocations() .shareIn( scope = scope, started = SharingStarted.WhileSubscribed(), ) fun observeLocations(): Flow<List<Location>> = locations }
8:

For details, see the Coroutine Scope Functions chapter.

10:

For details, see the Flow processing chapter, section flatMapConcat, flatMapMerge, and flatMapLatest.

11:

For details, see the Coroutine Scope Functions chapter, section supervisorScope.

12:

For details, see the Coroutine Scope Functions chapter, section withTimeout.

13:

For details, see the Flow processing chapter.

14:

For details, see the Flow processing chapter, section merge, zip, and combine.

15:

For details, see the SharedFlow and StateFlow chapter.