class NotificationSender(
private val client: NotificationClient,
private val exceptionCollector: ExceptionCollector,
dispatcher: CoroutineDispatcher,
) {
private val exceptionHandler =
CoroutineExceptionHandler { _, throwable ->
exceptionCollector.collectException(throwable)
}
val scope: CoroutineScope = CoroutineScope(
SupervisorJob() + dispatcher + exceptionHandler
)
fun sendNotifications(notifications: List<Notification>) {
notifications.forEach { notification ->
scope.launch {
client.send(notification)
}
}
}
fun cancel() {
scope.coroutineContext.cancelChildren()
}
}
Example solution in playground
import kotlinx.coroutines.*
import kotlinx.coroutines.test.StandardTestDispatcher
import org.junit.Test
import kotlin.test.assertEquals
class NotificationSender(
private val client: NotificationClient,
private val exceptionCollector: ExceptionCollector,
dispatcher: CoroutineDispatcher,
) {
private val exceptionHandler =
CoroutineExceptionHandler { _, throwable ->
exceptionCollector.collectException(throwable)
}
val scope: CoroutineScope = CoroutineScope(
SupervisorJob() + dispatcher + exceptionHandler
)
fun sendNotifications(notifications: List<Notification>) {
notifications.forEach { notification ->
scope.launch {
client.send(notification)
}
}
}
fun cancel() {
scope.coroutineContext.cancelChildren()
}
}
data class Notification(val id: String)
interface NotificationClient {
suspend fun send(notification: Notification)
}
interface ExceptionCollector {
fun collectException(throwable: Throwable)
}
class NotificationSenderTest {
@Test
fun `should send notifications concurrently`() {
val fakeNotificationsClient = FakeNotificationClient(delayTime = 200)
val fakeExceptionCollector = FakeExceptionCollector()
val testDispatcher = StandardTestDispatcher()
val sender = NotificationSender(fakeNotificationsClient, fakeExceptionCollector, testDispatcher)
val notifications = List(20) { Notification("ID$it") }
// when
sender.sendNotifications(notifications)
testDispatcher.scheduler.advanceUntilIdle()
// then
assertEquals(notifications, fakeNotificationsClient.sent)
assertEquals(200, testDispatcher.scheduler.currentTime, "Notifications should be sent concurrently")
}
@Test
fun `should cancel all coroutines when cancel is called`() {
val fakeNotificationsClient = FakeNotificationClient(delayTime = 1000)
val fakeExceptionCollector = FakeExceptionCollector()
val testDispatcher = StandardTestDispatcher()
val sender = NotificationSender(fakeNotificationsClient, fakeExceptionCollector, testDispatcher)
val notifications = List(20) { Notification("ID$it") }
// when
sender.sendNotifications(notifications)
testDispatcher.scheduler.advanceTimeBy(500)
sender.cancel()
// then
assert(sender.scope.coroutineContext.job.children.all { it.isCancelled })
// and scope should still be active
assert(sender.scope.isActive)
}
@Test
fun `should not cancel other sending processes when one of them fails`() {
val fakeNotificationsClient = FakeNotificationClient(delayTime = 100, failEvery = 10)
val fakeExceptionCollector = FakeExceptionCollector()
val testDispatcher = StandardTestDispatcher()
val sender = NotificationSender(fakeNotificationsClient, fakeExceptionCollector, testDispatcher)
val notifications = List(100) { Notification("ID$it") }
// when
sender.sendNotifications(notifications)
testDispatcher.scheduler.advanceUntilIdle()
// then
assertEquals(90, fakeNotificationsClient.sent.size)
}
@Test
fun `should collect exceptions from all coroutines`() {
val fakeNotificationsClient = FakeNotificationClient(delayTime = 100, failEvery = 10)
val fakeExceptionCollector = FakeExceptionCollector()
val testDispatcher = StandardTestDispatcher()
val sender = NotificationSender(fakeNotificationsClient, fakeExceptionCollector, testDispatcher)
val notifications = List(100) { Notification("ID$it") }
// when
sender.sendNotifications(notifications)
testDispatcher.scheduler.advanceUntilIdle()
// then
assertEquals(10, fakeExceptionCollector.collected.size)
}
}
class FakeNotificationClient(
val delayTime: Long = 0L,
val failEvery: Int = Int.MAX_VALUE
) : NotificationClient {
var sent = emptyList<Notification>()
var counter = 0
var usedThreads = emptyList<String>()
override suspend fun send(notification: Notification) {
if (delayTime > 0) delay(delayTime)
usedThreads += Thread.currentThread().name
counter++
if (counter % failEvery == 0) {
throw FakeFailure(notification)
}
sent += notification
}
}
class FakeFailure(val notification: Notification) : Throwable("Planned fail for notification ${notification.id}")
class FakeExceptionCollector : ExceptionCollector {
var collected = emptyList<Throwable>()
override fun collectException(throwable: Throwable) = synchronized(this) {
collected += throwable
}
}
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.