Solution: Chunking a flow

This is my solution to this exercise:

fun <T> Flow<T>.chunked( duration: Duration ): Flow<List<T>> = channelFlow { var chunk = LinkedList<T>() val mutex = Mutex() val sendIfNotEmpty = suspend { if (chunk.isNotEmpty()) { mutex.withLock { send(chunk) chunk = LinkedList<T>() } } } val timerJob = launch { try { while (true) { delay(duration) sendIfNotEmpty() } } finally { sendIfNotEmpty() } } try { collect { chunk.add(it) } } finally { timerJob.cancel() } }

I will be happy to see other solutions! This is not an easy problem, but it certainly has many solutions.

Example solution in playground

import junit.framework.TestCase.assertEquals import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.coroutines.sync.* import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.currentTime import kotlinx.coroutines.test.runTest import org.junit.Test import java.util.* import kotlin.test.assertIs import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds fun <T> Flow<T>.chunked( duration: Duration ): Flow<List<T>> = channelFlow { var chunk = LinkedList<T>() val mutex = Mutex() val sendIfNotEmpty = suspend { if (chunk.isNotEmpty()) { mutex.withLock { send(chunk) chunk = LinkedList<T>() } } } val timerJob = launch { try { while (true) { delay(duration) sendIfNotEmpty() } } finally { sendIfNotEmpty() } } try { collect { chunk.add(it) } } finally { timerJob.cancel() } } class ChunkedTest { @Test fun `should emit values and complete when original flow completes`() = runTest { val actual = (1..5).asFlow() .onEach { delay(it.toLong()) } .chunked(100.milliseconds) .withVirtualTime(this) .toList() val expected = listOf( ValueAndTime(listOf(1, 2, 3, 4, 5), 15), ) assertEquals(expected, actual) } @Test fun `should not emit empty chunks`() = runTest { val actual = flow { emit(1) delay(105) emit(2) delay(100) } .chunked(10.milliseconds) .withVirtualTime(this) .toList() val expected = listOf( ValueAndTime(listOf(1), 10), ValueAndTime(listOf(2), 110), ) assertEquals(expected, actual) } @Test fun `should stop observing when observer is cancelled`() = runTest { val source = MutableSharedFlow<Int>() val job = launch { source.chunked(100.milliseconds).collect() } delay(1) assertEquals(1, source.subscriptionCount.value) job.cancel() delay(1) assertEquals(0, source.subscriptionCount.value) } @Test fun `should pass exception from source flow`() = runTest { class TestException : RuntimeException() val source = flow { emit(1) throw TestException() } val result = try { source.chunked(100.milliseconds).toList() null } catch (e: Throwable) { e } assertIs<TestException>(result) } @Test fun `should chunk example flow`() = runTest { val actual = (1..30).asFlow() .onEach { delay(it.toLong()) } .chunked(100.milliseconds) .withVirtualTime(this) .toList() val expected = listOf( ValueAndTime(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13), 100), ValueAndTime(listOf(14, 15, 16, 17, 18, 19), 200), ValueAndTime(listOf(20, 21, 22, 23), 300), ValueAndTime(listOf(24, 25, 26, 27), 400), ValueAndTime(listOf(28, 29, 30), 465), ) assertEquals(expected, actual) } } fun <T> Flow<T>.withVirtualTime(testScope: TestScope): Flow<ValueAndTime<T>> = map { ValueAndTime(it, testScope.currentTime) } data class ValueAndTime<T>(val value: T, val timeMillis: Long)