Solution: Flow utils

val infiniteFlow = flow { while (true) { emit(Unit) } } val neverFlow = flow<Nothing> { suspendCancellableCoroutine { } } fun everyFlow(timeMillis: Long) = flow { while (true) { delay(timeMillis) emit(Unit) } } fun <T> flowOf(lambda: suspend () -> T) = flow { emit(lambda()) } fun <T> flowOfFlatten(lambda: suspend () -> Flow<T>) = flow { val flow = lambda() flow.collect { emit(it) } }

Example solution in playground

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.currentTime import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals val infiniteFlow = flow { while (true) { emit(Unit) } } val neverFlow = flow<Nothing> { suspendCancellableCoroutine { } } fun everyFlow(timeMillis: Long) = flow { while (true) { delay(timeMillis) emit(Unit) } } fun <T> flowOf(lambda: suspend () -> T) = flow { emit(lambda()) } fun <T> flowOfFlatten(lambda: suspend () -> Flow<T>) = flow { val flow = lambda() flow.collect { emit(it) } } class FlowUtilsTest { @Test fun `should create infinite flow`() = runTest { val flow = infiniteFlow val result = flow.take(10).onEach { delay(1000) }.toList() assertEquals(listOf(Unit, Unit, Unit, Unit, Unit, Unit, Unit, Unit, Unit, Unit), result) assertEquals(10_000, currentTime) } @Test fun `should create never flow`() = runTest { var emitted = false var completed = false neverFlow.onEach { emitted = true }.onCompletion { completed = true }.launchIn(backgroundScope) delay(Long.MAX_VALUE - 1) assertEquals(false, emitted) assertEquals(false, completed) assertEquals(Long.MAX_VALUE - 1, currentTime) } @Test fun `should create every flow`() = runTest { val flow = everyFlow(1000) val result = flow.take(10).toList() assertEquals(listOf(Unit, Unit, Unit, Unit, Unit, Unit, Unit, Unit, Unit, Unit), result) assertEquals(10_000, currentTime) } @Test fun `should create flow of`() = runTest { assertEquals(listOf(1), flowOf { 1 }.toList()) assertEquals(listOf("A"), flowOf { "A" }.toList()) assertEquals(0, currentTime) assertEquals(listOf(1), flowOf { delay(1000); 1 }.toList()) assertEquals(1000, currentTime) } @Test fun `should create flow of flatten`() = runTest { assertEquals(listOf(1), flowOfFlatten { flowOf(1) }.toList()) assertEquals(listOf("A"), flowOfFlatten { flowOf("A") }.toList()) assertEquals(0, currentTime) val flow: List<ValueAndTime<Int>> = flowOfFlatten { delay(1000) flow { repeat(10) { delay(1000) emit(it) } } }.withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(0, 2000), ValueAndTime(1, 3000), ValueAndTime(2, 4000), ValueAndTime(3, 5000), ValueAndTime(4, 6000), ValueAndTime(5, 7000), ValueAndTime(6, 8000), ValueAndTime(7, 9000), ValueAndTime(8, 10000), ValueAndTime(9, 11000), ), flow ) } } fun <T> Flow<T>.withVirtualTime(testScope: TestScope): Flow<ValueAndTime<T>> = map { ValueAndTime(it, testScope.currentTime) } data class ValueAndTime<T>(val value: T, val timeMillis: Long)