Solution: UserRefresher
This is the solution using a Channel
:
class UserRefresher(
private val scope: CoroutineScope,
private val refreshData: suspend (Int) -> Unit,
) {
private val queue = Channel<Int>(Channel.UNLIMITED)
init {
scope.launch {
for (userId in queue) {
refreshData(userId)
}
}
}
suspend fun refresh(userId: Int) {
queue.send(userId)
}
}
This is the solution using Mutex
:
class UserRefresher(
private val scope: CoroutineScope,
private val refreshData: suspend (Int) -> Unit,
) {
private val mutex = Mutex()
fun refresh(userId: Int) {
scope.launch {
mutex.withLock {
refreshData(userId)
}
}
}
}
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.currentTime
import kotlinx.coroutines.test.runTest
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.time.measureTime
class UserRefresher(
private val scope: CoroutineScope,
private val refreshData: suspend (Int) -> Unit,
) {
private val queue = Channel<Int>(Channel.UNLIMITED)
init {
scope.launch {
for (userId in queue) {
refreshData(userId)
}
}
}
suspend fun refresh(userId: Int) {
queue.send(userId)
}
}
class UserRefresherTest {
@Test
fun `should finish all refreshes`(): Unit = runTest {
val refreshed = ConcurrentHashMap.newKeySet<Int>()
val finished = AtomicInteger(0)
val userRefresher = UserRefresher(
scope = backgroundScope,
refreshData = { userId ->
refreshed += userId
finished.incrementAndGet()
}
)
coroutineScope {
repeat(1000) {
launch { userRefresher.refresh(it) }
}
}
await { finished.get() >= 1000 }
assertEquals(1000, refreshed.size)
}
@Test
fun `should not start more than one refresh job`(): Unit = runTest {
val finished = AtomicInteger(0)
val userRefresher = UserRefresher(
scope = backgroundScope,
refreshData = { userId ->
delay(1000)
finished.incrementAndGet()
}
)
coroutineScope {
repeat(1000) {
launch { userRefresher.refresh(it) }
}
}
assert(currentTime <= 1000)
await { finished.get() >= 1000 }
assertEquals(1000 * 1000, currentTime)
}
@Test
fun `should not start more than one refresh job (on real time)`(): Unit = runBlocking(Dispatchers.Default) {
val finished = AtomicInteger(0)
val backgroundScope = CoroutineScope(Job())
val userRefresher = UserRefresher(
scope = backgroundScope,
refreshData = { userId ->
delay(10)
finished.incrementAndGet()
}
)
val sendTime = measureTime {
coroutineScope {
repeat(100) {
launch { userRefresher.refresh(it) }
}
}
}
val executionTime = measureTime {
await { finished.get() >= 100 }
}
assertEquals(0, sendTime.inWholeSeconds)
assert(1 >= executionTime.inWholeSeconds)
backgroundScope.cancel()
}
}
suspend fun await(condition: () -> Boolean) {
while (!condition()) { delay(1) }
}
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.