article banner

Eliminating coroutine races

It is not an uncommon situation when we start two coroutines at (more or less) the same time, but we need one of them finished before the other one can be executed. Let's see a few real-life cases, and learn how to deal with them.

The problem

When you call two functions, each launching a coroutine, you cannot be sure which one will be executed first. That is why the below code will sometimes print "ab" and sometimes "ba".

import kotlinx.coroutines.* fun main() { repeat(100) { GlobalScope.launch { Thread.sleep(100) } } // To make processor busy test() // sometimes "ab" and sometimes "ba" runBlocking { scope.coroutineContext.job.children.forEach { it.join() } } } fun test() { a() b() } val scope = CoroutineScope(Dispatchers.Default) fun a() = scope.launch { print("a") } fun b() = scope.launch { print("b") }

Regular processes

The simplest way to make one process wait until another one is finished is to store this first process Job (which is the result from launch or async) and use its join function, that suspends until the process is finished (I describe it in detail in the book Kotlin Coroutines: Deep Dive). The below code uses this trick, and so it always prints "ab".

import kotlinx.coroutines.* fun main() { repeat(100) { GlobalScope.launch { Thread.sleep(100) } } // To make processor busy test() // ab runBlocking { scope.coroutineContext.job.children.forEach { it.join() } } } fun test() { a() b() } val scope = CoroutineScope(Dispatchers.Default) val job: Job? = null fun a() { job = scope.launch { print("a") } } fun b() = scope.launch { job?.join() print("b") }

Listener and handler

Consider a situation where you need to use one coroutine to set up a listener for events, and another to send an event. This is a typical situation in Android when we follow MVI pattern. Consider that BaseViewModel starts listening for events in its constructor, and we need to send the first event in a constructor of some of its implementations.

class BaseViewModel : ViewModel() { private val eventFlow = MutableSharedFlow<Event>() init { eventFlow // ... .onEach(::handleIntent) // ... .launchIn(viewModelScope) } fun sendEvent(event: Event) { viewModelScope.launch { intentFlow.emit(event) } } fun handleIntent(event: Event) { // ... } } class SomeViewModel : BaseViewModel() { init { sendEvent(Event()) // Will it be handled or not? } }

When we initialize SomeViewModel, we first call its parent constructor, and we start a coroutine that should listen on eventFlow, but than straight after that, we start another coroutine to send an event to this flow. So the race begins. Will the event be handled or not? Sometimes it will be, but sometimes event will be sent before the listener is set up, and it will be lost, because MutableSharedFlow has no replay by default.

Here we cannot use our Job solution, because the coroutine listening for events will never complete. The coroutine listening for events will stay active until it is explicitly cancelled.

class BaseViewModel : ViewModel() { private val eventFlow = MutableSharedFlow<Event>() private val eventFlowJob: Job init { eventFlowJob = eventFlow // ... .onEach(::handleIntent) // ... .launchIn(viewModelScope) } fun sendEvent(event: Event) { viewModelScope.launch { eventFlowJob.join() // WAITING FOREVER! intentFlow.emit(event) } } fun handleIntent(event: Event) { // ... } } class SomeViewModel : BaseViewModel() { init { sendEvent(Event()) // Will it be handled or not? } }

The simplest functional solution is to give our MutableSharedFlow some replay. Then, each new observer will receive past events. Beware, however, that this solution changes the behavior of our flow, and it might be problematic if we plan to use other observers. Each new observer will receive all past events.

class BaseViewModel : ViewModel() { private val eventFlow = MutableSharedFlow<Event>(replay = Int.MAX_VALUE) init { eventFlow // ... .onEach(::handleIntent) // ... .launchIn(viewModelScope) } fun sendEvent(event: Event) { viewModelScope.launch { intentFlow.emit(event) } } fun handleIntent(event: Event) { // ... } } class SomeViewModel : BaseViewModel() { init { sendEvent(Event()) // Will it be handled or not? } }

There is another solution, that should certainly be known to those using SharedFlow intensively. We use CompletableDeferred (more intuitive) or Job (lighter) to wait for the listener to be set up. For those objects we can await completion using join, and we can complete them using complete function. The best place to complete them is in onSubscription operator, that is called when the first observer subscribes to the flow. We could also use onStart operator, that is called for each new observer.

class BaseViewModel : ViewModel() { private val eventFlow = MutableSharedFlow<Event>() private val eventFlowListenerStarted = CompletableDeferred<Unit>() init { eventFlow .onSubscription { eventFlowListenerStarted.complete(Unit) } // ... .onEach(::handleIntent) // ... .launchIn(viewModelScope) } fun sendEvent(event: Event) { viewModelScope.launch { eventFlowListenerStarted.join() intentFlow.emit(event) } } fun handleIntent(event: Event) { // ... } }
class BaseViewModel : ViewModel() { private val eventFlow = MutableSharedFlow<Event>() private val eventFlowListenerStarted = Job() init { eventFlow .onSubscription { eventFlowListenerStarted.complete() } // ... .onEach(::handleIntent) // ... .launchIn(viewModelScope) } fun sendEvent(event: Event) { viewModelScope.launch { eventFlowListenerStarted.join() intentFlow.emit(event) } } fun handleIntent(event: Event) { // ... } }

Technically speaking, there is one more suction, but I find it overly complicated. We can use subscriptionCount from MutableSharedFlow.

class BaseViewModel : ViewModel() { private val eventFlow = MutableSharedFlow<Event>() init { eventFlow // ... .onEach(::handleIntent) // ... .launchIn(viewModelScope) } fun sendEvent(event: Event) { viewModelScope.launch { waitForListener() intentFlow.emit(event) } } fun handleIntent(event: Event) { // ... } suspend fun waitForListener() { val subscriptionCount = eventFlow.subscriptionCount if (subscriptionCount.value == 0) { subscriptionCount .filter { it > 0 } .first() } } }

I would like to thank Damian Koźlak and Krzysztof Dąbrowski from Codequest for inspiration for the article.