article banner

Exercise: UserRefresher

You want to make sure that refreshing is done one at a time. You want to make sure that if there are multiple refreshes, they will be executed synchronously, so one after another. You have the following code:

class UserRefresher( private val scope: CoroutineScope, private val refreshData: suspend (Int) -> Unit, ) { private var refreshJob: Job? = null suspend fun refresh(userId: Int) { refreshJob?.join() refreshJob = scope.launch { refreshData(userId) } } }

The problem is that this implementation is not correct, because if it is started concurrently, you might have two coroutines running at the same time.

There are two ways to solve this problem:

  • Using Channel and a single coroutine to handle data refresh.
  • Using Mutex to synchronize refreshes.

Solve this problem using both techniques.

This problem can either be solved in the below playground or you can clone kotlin-exercises project and solve it locally. In the project, you can find code template for this exercise in effective/safe/UserRefresher.kt. You can find there starting code and unit tests.

Once you are done with the exercise, you can check your solution here.

Playground

import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.test.currentTime import kotlinx.coroutines.test.runTest import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger import org.junit.Test import kotlin.test.assertEquals import kotlin.time.measureTime class UserRefresher( private val scope: CoroutineScope, private val refreshData: suspend (Int) -> Unit, ) { private var refreshJob: Job? = null suspend fun refresh(userId: Int) { refreshJob?.join() refreshJob = scope.launch { refreshData(userId) } } } class UserRefresherTest { @Test fun `should finish all refreshes`(): Unit = runTest { val refreshed = ConcurrentHashMap.newKeySet<Int>() val finished = AtomicInteger(0) val userRefresher = UserRefresher( scope = backgroundScope, refreshData = { userId -> refreshed += userId finished.incrementAndGet() } ) coroutineScope { repeat(1000) { launch { userRefresher.refresh(it) } } } await { finished.get() >= 1000 } assertEquals(1000, refreshed.size) } @Test fun `should not start more than one refresh job`(): Unit = runTest { val finished = AtomicInteger(0) val userRefresher = UserRefresher( scope = backgroundScope, refreshData = { userId -> delay(1000) finished.incrementAndGet() } ) coroutineScope { repeat(1000) { launch { userRefresher.refresh(it) } } } assert(currentTime <= 1000) await { finished.get() >= 1000 } assertEquals(1000 * 1000, currentTime) } @Test fun `should not start more than one refresh job (on real time)`(): Unit = runBlocking(Dispatchers.Default) { val finished = AtomicInteger(0) val backgroundScope = CoroutineScope(Job()) val userRefresher = UserRefresher( scope = backgroundScope, refreshData = { userId -> delay(10) finished.incrementAndGet() } ) val sendTime = measureTime { coroutineScope { repeat(100) { launch { userRefresher.refresh(it) } } } } val executionTime = measureTime { await { finished.get() >= 100 } } assertEquals(0, sendTime.inWholeSeconds) assert(1 >= executionTime.inWholeSeconds) backgroundScope.cancel() } } suspend fun await(condition: () -> Boolean) { while (!condition()) { delay(1) } }