import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.test.*
import org.junit.Test
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
fun producingUnits(num: Int): Flow<Unit> =
flow { repeat(num) { emit(Unit) } }
// or
// List(num) {}.asFlow()
// or
// (1..num).map { }.asFlow()
fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> =
onEach { delay(timeMillis) }
// or
// flow { collect { delay(timeMillis); emit(it) } }
fun <T, R> Flow<T>.mapIndexed(
transformation: suspend (index: Int, T) -> R
): Flow<R> =
withIndex().map { transformation(it.index, it.value) }
// or
// flow {
// var index = 0
// collect {
// emit(transformation(index, it))
// index++
// }
// }
// or
// flow {
// collectIndexed { index, value ->
// emit(transformation(index, value))
// }
// }
fun Flow<*>.toNextNumbers(): Flow<Int> =
mapIndexed { index, _ -> index + 1 }
// or
// withIndex().map { it.index + 1 }
// or
// scan(0) { acc, _ -> acc + 1 }.drop(1)
// or
// flow {
// var value = 1
// collect {
// emit(value++)
// }
// }
// or
// flow {
// collectIndexed { index, _ ->
// emit(index + 1)
// }
// }
fun <T> Flow<T>.withHistory(): Flow<List<T>> =
scan(emptyList()) { acc, value -> acc + value }
// or
// flow {
// var history = listOf<T>()
// emit(history)
// collect {
// history += it
// emit(history)
// }
// }
fun makeLightSwitch(
switch1: Flow<Boolean>,
switch2: Flow<Boolean>
): Flow<Boolean> =
switch1.onStart { emit(false) }
.combine(switch2.onStart { emit(false) }) { e1, e2 ->
e1 xor e2
}
fun makeLightSwitchToggle(
switch1: Flow<*>,
switch2: Flow<*>
): Flow<Boolean> =
merge(switch1, switch2)
.mapIndexed { index, _ -> index % 2 == 0 }
fun polonaisePairing(
track1: Flow<Person>,
track2: Flow<Person>
): Flow<Pair<Person, Person>> =
track1.zip(track2) { p1, p2 -> Pair(p1, p2) }
data class Person(val name: String)
@Suppress("FunctionName")
class FlowTests {
@Test()
fun producingUnitsTests() = runTest {
assertEquals(listOf(), producingUnits(0).toList())
assertEquals(listOf(Unit), producingUnits(1).toList())
assertEquals(listOf(Unit, Unit), producingUnits(2).toList())
assertEquals(listOf(Unit, Unit, Unit), producingUnits(3).toList())
for (i in 1..100 step 7) {
assertEquals(List(i) { Unit }, producingUnits(i).toList())
}
}
@Test()
fun flowDelayEachTests() = runTest {
val emittedNum = AtomicInteger()
producingUnits(100)
.delayEach(1000)
.onEach { emittedNum.incrementAndGet() }
.launchIn(this)
assertEquals(0, emittedNum.get())
delay(1_500)
assertEquals(1, emittedNum.get())
delay(2_000)
assertEquals(3, emittedNum.get())
delay(12_000)
assertEquals(15, emittedNum.get())
}
@Test()
fun mapIndexedTests() = runTest {
assertEquals(
listOf("0 A", "1 B", "2 C", "3 D"),
('A'..'D').asFlow()
.mapIndexed { index, letter -> "$index $letter" }
.toList()
)
val actual: List<ValueAndTime<Pair<Int, Any>>> = flow<Any> {
delay(10)
emit(10)
delay(100)
emit("A")
delay(1000)
emit('C')
}.mapIndexed { index, any -> Pair(index, any) }
.withVirtualTime(this)
.toList()
val expected: List<ValueAndTime<Pair<Int, Any>>> = listOf(
ValueAndTime(Pair(0, 10), 10),
ValueAndTime(Pair(1, "A"), 110),
ValueAndTime(Pair(2, 'C'), 1110),
)
assertEquals(expected, actual)
}
@Test()
fun toNextNumbersTests() = runTest {
assertEquals(listOf(), producingUnits(0).toNextNumbers().toList())
assertEquals(listOf(1), producingUnits(1).toNextNumbers().toList())
assertEquals(listOf(1, 2), producingUnits(2).toNextNumbers().toList())
assertEquals(listOf(1, 2, 3), producingUnits(3).toNextNumbers().toList())
for (i in 1..100 step 7) {
val list = List(i) { it + 1 }
assertEquals(list, list.map {}.asFlow().toNextNumbers().toList())
}
}
@Test()
fun withHistoryTests() = runTest {
assertEquals(listOf(listOf()), producingUnits(0).withHistory().toList())
assertEquals(listOf(listOf(), listOf(Unit)), producingUnits(1).withHistory().toList())
assertEquals(listOf(listOf(), listOf(Unit), listOf(Unit, Unit)), producingUnits(2).withHistory().toList())
assertEquals(
listOf(listOf(), listOf(1), listOf(1, 2)),
flowOf(1, 2).withHistory().toList()
)
assertEquals(
listOf(listOf(), listOf(true), listOf(true, false)),
flowOf(true, false).withHistory().toList()
)
val flow = flow {
delay(10)
emit("A")
delay(100)
emit(10)
delay(1000)
emit("C")
}
assertEquals(
listOf(
ValueAndTime(listOf(), 0),
ValueAndTime(listOf("A"), 10),
ValueAndTime(listOf("A", 10), 110),
ValueAndTime(listOf("A", 10, "C"), 1110),
),
flow.withHistory()
.withVirtualTime(this)
.toList()
)
}
@Test()
fun makeLightSwitchTests() = runTest {
val switchOne = flow<Boolean> {
emit(true)
delay(1000)
emit(false)
delay(10)
emit(true)
delay(500) // 1500
emit(false)
}
val switchTwo = flow<Boolean> {
emit(false)
delay(200)
emit(true)
delay(1000) // 1200
emit(false)
}
var lightOn = false
launch {
makeLightSwitch(switchOne, switchTwo).collect {
lightOn = it
}
}
delay(50)
assertEquals(true, lightOn)
delay(200) // 250
assertEquals(false, lightOn)
delay(800) // 1050
assertEquals(false, lightOn)
delay(200) // 1250
assertEquals(true, lightOn)
delay(300) // 1550
assertEquals(false, lightOn)
}
@Test()
fun makeLightSwitchExampleTests() = runTest {
val switch1 = flow {
delay(7_000)
emit(true)
delay(6_000)
emit(false)
delay(11_000)
emit(true)
delay(2_000)
emit(true)
}
val switch2 = flow {
delay(16_000)
emit(false)
delay(2_000)
emit(true)
delay(2_000)
emit(false)
delay(9_000)
emit(true)
delay(2_000)
emit(false)
delay(2_000)
emit(true)
}
val result = makeLightSwitch(switch1, switch2)
.fold(mapOf<Long, Boolean>()) { acc, e -> acc + (currentTime to e) }
val expected = mapOf(
0L to false,
7_000L to true,
13_000L to false,
16_000L to false,
18_000L to true,
20_000L to false,
24_000L to true,
26_000L to true,
29_000L to false,
31_000L to true,
33_000L to false,
)
assertEquals(expected, result)
}
@Test()
fun makeLightSwitchToggleTests() = runTest {
val switchOne = flow<Unit> {
emit(Unit)
delay(1000)
emit(Unit)
delay(10)
emit(Unit)
delay(500) // 1500
emit(Unit)
}
val switchTwo = flow<Unit> {
emit(Unit)
delay(200)
emit(Unit)
delay(1000) // 1200
emit(Unit)
}
var lightOn = false
launch {
makeLightSwitchToggle(switchOne, switchTwo).collect {
lightOn = it
}
}
delay(50)
assertEquals(false, lightOn)
delay(200) // 250
assertEquals(true, lightOn)
delay(800) // 1050
assertEquals(true, lightOn)
delay(200) // 1250
assertEquals(false, lightOn)
delay(300) // 1550
assertEquals(true, lightOn)
}
@Test()
fun makeLightSwitchToggleExampleTests() = runTest {
val switch1 = flow {
delay(7_000)
emit(Unit)
delay(6_000)
emit(Unit)
delay(27_000)
emit(Unit)
delay(16_000)
emit(Unit)
delay(1_000)
emit(Unit)
}
val switch2 = flow {
delay(16_000)
emit(Unit)
delay(20_000)
emit(Unit)
delay(17_000)
emit(Unit)
}
val result = makeLightSwitchToggle(switch1, switch2)
.fold(mapOf<Long, Boolean>()) { acc, e -> acc + (currentTime to e) }
val expected = mapOf(
7_000L to true,
13_000L to false,
16_000L to true,
36_000L to false,
40_000L to true,
53_000L to false,
56_000L to true,
57_000L to false,
)
assertEquals(expected, result)
}
@Test()
fun polonaisePairingTests() = runTest {
val track1 = flow<Person> {
emit(Person("A"))
emit(Person("B"))
delay(1000)
emit(Person("C"))
emit(Person("D"))
}
val track2 = flow<Person> {
emit(Person("1"))
delay(600)
emit(Person("2"))
delay(1000)
emit(Person("3"))
}
val res = polonaisePairing(track1, track2).toList()
val expected = listOf("A" to "1", "B" to "2", "C" to "3").map { Person(it.first) to Person(it.second) }
assertEquals(expected, res)
var lastPair: Pair<Person, Person>? = null
launch {
polonaisePairing(track1, track2).collect { lastPair = it }
}
runCurrent()
assertEquals(Person("A") to Person("1"), lastPair)
delay(200) // 200
assertEquals(Person("A") to Person("1"), lastPair)
delay(500) // 700
assertEquals(Person("B") to Person("2"), lastPair)
delay(500) // 1200
assertEquals(Person("B") to Person("2"), lastPair)
delay(500) // 1700
assertEquals(Person("C") to Person("3"), lastPair)
}
}
fun <T> Flow<T>.withVirtualTime(testScope: TestScope): Flow<ValueAndTime<T>> =
map { ValueAndTime(it, testScope.currentTime) }
data class ValueAndTime<T>(val value: T, val timeMillis: Long)