Your task is to implement chunked extension function for Flow<T> that splits the flow into chunks of T items and emits them every duration time interval. It should emit the last chunk immediately when the flow is completed.
fun <T> Flow<T>.chunked(
duration: Duration
): Flow<List<T>> = TODO()
This problem can either be solved in the below playground or you can clone kotlin-exercises project and solve it locally. In the project, you can find code template for this exercise in effective/safe/Chunked.kt. You can find there starting code and unit tests.
Things to consider about your solution:
Is it thread-safe? Does it use a coroutine-friendly mechanism to assure thread-safety?
Which collection should you use under the hood? What are the pros and cons of different collections?
Do you stop collecting the source flow when this flow is cancelled or completed? Do you complete this flow when the source flow is completed? Do you pass exceptions from the source flow?
Once you are done with the exercise, you can check your solution here.
Playground
import junit.framework.TestCase.assertEquals
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.*
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.currentTime
import kotlinx.coroutines.test.runTest
import org.junit.Test
import java.util.*
import kotlin.test.assertIs
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
fun <T> Flow<T>.chunked(
duration: Duration
): Flow<List<T>> = TODO()
class ChunkedTest {
@Test
fun `should emit values and complete when original flow completes`() = runTest {
val actual = (1..5).asFlow()
.onEach { delay(it.toLong()) }
.chunked(100.milliseconds)
.withVirtualTime(this)
.toList()
val expected = listOf(
ValueAndTime(listOf(1, 2, 3, 4, 5), 15),
)
assertEquals(expected, actual)
}
@Test
fun `should not emit empty chunks`() = runTest {
val actual = flow {
emit(1)
delay(105)
emit(2)
delay(100)
}
.chunked(10.milliseconds)
.withVirtualTime(this)
.toList()
val expected = listOf(
ValueAndTime(listOf(1), 10),
ValueAndTime(listOf(2), 110),
)
assertEquals(expected, actual)
}
@Test
fun `should stop observing when observer is cancelled`() = runTest {
val source = MutableSharedFlow<Int>()
val job = launch {
source.chunked(100.milliseconds).collect()
}
delay(1)
assertEquals(1, source.subscriptionCount.value)
job.cancel()
delay(1)
assertEquals(0, source.subscriptionCount.value)
}
@Test
fun `should pass exception from source flow`() = runTest {
class TestException : RuntimeException()
val source = flow {
emit(1)
throw TestException()
}
val result = try {
source.chunked(100.milliseconds).toList()
null
} catch (e: Throwable) {
e
}
assertIs<TestException>(result)
}
@Test
fun `should chunk example flow`() = runTest {
val actual = (1..30).asFlow()
.onEach { delay(it.toLong()) }
.chunked(100.milliseconds)
.withVirtualTime(this)
.toList()
val expected = listOf(
ValueAndTime(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13), 100),
ValueAndTime(listOf(14, 15, 16, 17, 18, 19), 200),
ValueAndTime(listOf(20, 21, 22, 23), 300),
ValueAndTime(listOf(24, 25, 26, 27), 400),
ValueAndTime(listOf(28, 29, 30), 465),
)
assertEquals(expected, actual)
}
}
fun <T> Flow<T>.withVirtualTime(testScope: TestScope): Flow<ValueAndTime<T>> =
map { ValueAndTime(it, testScope.currentTime) }
data class ValueAndTime<T>(val value: T, val timeMillis: Long)
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.