article banner

Testing flow

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

This chapter is a continuation of Testing Kotlin Coroutines, which introduces runTest, backgroundScope, TestScope etc.

You should already have all the knowledge you need to properly test functions that return a flow. The rules for testing suspending functions are quite simple, but to ignite your imagination let's see a couple of examples of the most typical use cases, and the most important problems we need to deal with when testing flows.

Transformation functions

Most of the functions that return Flow call other functions that return Flow. This is the most common and simplest case, so we'll start from learning how to test such functions. Consider the following class:

class ObserveAppointmentsService( private val appointmentRepository: AppointmentRepository ) { fun observeAppointments(): Flow<List<Appointment>> = appointmentRepository .observeAppointments() .filterIsInstance<AppointmentsUpdate>() .map { it.appointments } .distinctUntilChanged() .retry { it is ApiException && it.code in 500..599 } }

The observeAppointments method decorates observeAppointments from AppointmentRepository with a couple of operations, including element filtering, mapping, elimination of repeating elements, and retrying in the case of certain kinds of exceptions. If you asked me to explain what this function does but with a separate sentence for each functionality, you would have a list of the unit tests this function should have:

  • should keep only appointments from updates,
  • should eliminate elements that are identical to the previous element,
  • should retry if there is an API exception with the code 5XX.

To implement these tests, we need to fake or mock AppointmentRepository. For these tests, we could make a fake object whose observeAppointments function returns a constant Flow that will be used as a source. The simplest way to test a function like observeAppointments is by defining its source flow using flowOf, which creates a finite Flow in which time does not play a role. If time does not play a role in the function being tested, we can just transform its result into a list using the toList function, then compare it with the expected result in the assertions.

class FakeAppointmentRepository( private val flow: Flow<AppointmentsEvent> ) : AppointmentRepository { override fun observeAppointments() = flow } class ObserveAppointmentsServiceTest { val aDate1 = Instant.parse("2020-08-30T18:43:00Z") val anAppointment1 = Appointment("APP1", aDate1) val aDate2 = Instant.parse("2020-08-31T18:43:00Z") val anAppointment2 = Appointment("APP2", aDate2) @Test fun `should keep only appointments from...`() = runTest { // given val repo = FakeAppointmentRepository( flowOf( AppointmentsConfirmed, AppointmentsUpdate(listOf(anAppointment1)), AppointmentsUpdate(listOf(anAppointment2)), AppointmentsConfirmed, ) ) val service = ObserveAppointmentsService(repo) // when val result = service.observeAppointments().toList() // then assertEquals( listOf( listOf(anAppointment1), listOf(anAppointment2), ), result ) } // ... }

The second test could be implemented the same way, but I would like to introduce one more element that will make our test a bit more complicated and help us test what we haven't yet tested. The problem with tests like the one above is that they treat a flow just like a list; such an approach simplifies these tests, but they don't verify if elements are actually immediately transferred without any delays. Imagine that a developer added a delay to your flow transformation, as in the following code snippet. Such a change would not be detected by the above test.

class ObserveAppointmentsService( private val appointmentRepository: AppointmentRepository ) { fun observeAppointments(): Flow<List<Appointment>> = appointmentRepository .observeAppointments() .onEach { delay(1000) } // Will not influence // the above test .filterIsInstance<AppointmentsUpdate>() .map { it.appointments } .distinctUntilChanged() .retry { it is ApiException && it.code in 500..599 } }

Let's consider an even more extreme example where someone uses a list transformation instead of a flow transformation. This would be absolute nonsense, yet tests like the previous one would still pass.

class ObserveAppointmentsService( private val appointmentRepository: AppointmentRepository, ) { // Don't do that! fun observeAppointments(): Flow<List<Appointment>> = flow { val list = appointmentRepository .observeAppointments() .filterIsInstance<AppointmentsUpdate>() .map { it.appointments } .distinctUntilChanged() .retry { it is ApiException && it.code in 500..599 } .toList() emitAll(list) } }

I like to have a test that verifies time dependencies, and for that, we need to use runTest and some delay in our source flow. The result flow needs to store information about when its elements were emitted, and we can verify the result in an assertion.

class ObserveAppointmentsServiceTest { // ... @Test fun `should eliminate elements that are...`() = runTest { // given val repo = FakeAppointmentRepository(flow { delay(1000) emit(AppointmentsUpdate(listOf(anAppointment1))) emit(AppointmentsUpdate(listOf(anAppointment1))) delay(1000) emit(AppointmentsUpdate(listOf(anAppointment2))) delay(1000) emit(AppointmentsUpdate(listOf(anAppointment2))) emit(AppointmentsUpdate(listOf(anAppointment1))) }) val service = ObserveAppointmentsService(repo) // when val result = service.observeAppointments() .map { currentTime to it } .toList() // then assertEquals( listOf( 1000L to listOf(anAppointment1), 2000L to listOf(anAppointment2), 3000L to listOf(anAppointment1), ), result ) } // ... }

Test name shortened due to limited page size - would not be shortened in a real project.

Finally, consider the third functionality: "should retry when there is an API exception with the code 5XX". If we returned a flow that should not retry, we wouldn't test the retry behavior. If we returned a flow that should retry, the function being tested would retry infinitely and would produce an infinite flow. The easiest way to test an infinite flow is by limiting the number of its elements using take.

class ObserveAppointmentsServiceTest { // ... @Test fun `should retry when API exception...`() = runTest { // given val repo = FakeAppointmentRepository(flow { emit(AppointmentsUpdate(listOf(anAppointment1))) throw ApiException(502, "Some message") }) val service = ObserveAppointmentsService(repo) // when val result = service.observeAppointments() .take(3) .toList() // then assertEquals( listOf( listOf(anAppointment1), listOf(anAppointment1), listOf(anAppointment1), ), result ) } }

We should also test "should not retry upon a non-API exception" and "should not retry upon an API exception with a non-5XX code", but I will skip these test cases in this book.

Another option is to make a flow that first throws an exception that should cause a retry, and then it throws one that should not. This way, we can test not only a sample exception that should cause a retry, but also one that should not.

class ObserveAppointmentsServiceTest { // ... @Test fun `should retry when API exception...`() = runTest { // given var retried = false val someException = object : Exception() {} val repo = FakeAppointmentRepository(flow { emit(AppointmentsUpdate(listOf(anAppointment1))) if (!retried) { retried = true throw ApiException(502, "Some message") } else { throw someException } }) val service = ObserveAppointmentsService(repo) // when val result = service.observeAppointments() .catch<Any> { emit(it) } .toList() // then assertTrue(retried) assertEquals( listOf( listOf(anAppointment1), listOf(anAppointment1), someException, ), result ) } }

Testing infinite flows

Testing classes that use StateFlow or SharedFlow is a bit more complicated. First, they need a scope; if we define our test using runTest, this scope should be backgroundScope, not this, so our test does not await this scope's completion. Second, these flows are infinite, so they don't complete unless their scope is cancelled. There are a couple of ways to test infinite flows, which I will present by means of an example.

Consider the following service that can be used to observe messages from a specific user. This class also uses SharedFlow so no more than one connection to the source of messages is made, even if there are multiple observers. This means that observeMessages returns a flow that will never complete unless scope is cancelled.

class MessagesService( messagesSource: Flow<Message>, scope: CoroutineScope ) { private val source = messagesSource .shareIn( scope = scope, started = SharingStarted.WhileSubscribed() ) fun observeMessages(fromUserId: String) = source .filter { it.fromUserId == fromUserId } }

To better understand the problem, consider the following failing test:

class MessagesServiceTest { // Failing test! @Test fun `should emit messages from user`() = runTest { // given val source = flowOf( Message(fromUserId = "0", text = "A"), Message(fromUserId = "1", text = "B"), Message(fromUserId = "0", text = "C"), ) val service = MessagesService( messagesSource = source, scope = backgroundScope, ) // when val result = service.observeMessages("0") .toList() // Here we'll wait forever! // then assertEquals( listOf( Message(fromUserId = "0", text = "A"), Message(fromUserId = "0", text = "C"), ), result ) } }

The test above is suspended forever by toList. The simplest (and, sadly, most popular) solution to this problem uses take with a specific number of expected elements. The test below passes, but it loses a lot of information. Consider a message that should not have been emitted by observeMessages yet was, albeit in the next position. This situation would not be recognized by a unit test. A bigger problem is when someone makes a change in the code that makes it run forever. Finding the reason behind this is much harder than it would be if our test were implemented as in the following examples.

class MessagesServiceTest { @Test fun `should emit messages from user`() = runTest { // given val source = flowOf( Message(fromUserId = "0", text = "A"), Message(fromUserId = "1", text = "B"), Message(fromUserId = "0", text = "C"), ) val service = MessagesService( messagesSource = source, scope = backgroundScope, ) // when val result = service.observeMessages("0") .take(2) .toList() // then assertEquals( listOf( Message(fromUserId = "0", text = "A"), Message(fromUserId = "0", text = "C"), ), result ) } }

The next approach is to start our flow in backgroundScope and store all the elements it emits in a collection. This approach not only better shows us "what is" and "what should be" in failing cases; it also offers us much more flexibility with testing time. In the example below, I've added some delays to verify when messages are sent.

class MessagesServiceTest { @Test fun `should emit messages from user`() = runTest { // given val source = flow { emit(Message(fromUserId = "0", text = "A")) delay(1000) emit(Message(fromUserId = "1", text = "B")) emit(Message(fromUserId = "0", text = "C")) } val service = MessagesService( messagesSource = source, scope = backgroundScope, ) // when val emittedMessages = mutableListOf<Message>() service.observeMessages("0") .onEach { emittedMessages.add(it) } .launchIn(backgroundScope) delay(1) // then assertEquals( listOf( Message(fromUserId = "0", text = "A"), ), emittedMessages ) // when delay(1000) // then assertEquals( listOf( Message(fromUserId = "0", text = "A"), Message(fromUserId = "0", text = "C"), ), emittedMessages ) } }

A good alternative is the toList function, which is only observed for some duration. It offers less flexibility, but I like using it as it is simple and readable. This is how I implement and use such a function:

suspend fun <T> Flow<T>.toListDuring( duration: Duration ): List<T> = coroutineScope { val result = mutableListOf<T>() val job = launch { this@toListDuring.collect(result::add) } delay(duration) job.cancel() return@coroutineScope result } class MessagesServiceTest { @Test fun `should emit messages from user`() = runTest { // given val source = flow { emit(Message(fromUserId = "0", text = "A")) emit(Message(fromUserId = "1", text = "B")) emit(Message(fromUserId = "0", text = "C")) } val service = MessagesService( messagesSource = source, scope = backgroundScope, ) // when val emittedMessages = service.observeMessages("0") .toListDuring(1.milliseconds) // then assertEquals( listOf( Message(fromUserId = "0", text = "A"), Message(fromUserId = "0", text = "C"), ), emittedMessages ) } }

It is also worth mentioning that there are libraries, like Turbine, that offer tools to simplify testing flows, that allow collecting by an object, and that can be used to wait for elements.

class MessagesServiceTest { @Test fun `should emit messages from user`() = runTest { turbineScope { // given val source = flow { emit(Message(fromUserId = "0", text = "A")) emit(Message(fromUserId = "1", text = "B")) emit(Message(fromUserId = "0", text = "C")) } val service = MessagesService( messagesSource = source, scope = backgroundScope, ) // when val messagesTurbine = service .observeMessages("0") .testIn(backgroundScope) // then assertEquals( Message(fromUserId = "0", text = "A"), messagesTurbine.awaitItem() ) assertEquals( Message(fromUserId = "0", text = "C"), messagesTurbine.awaitItem() ) messagesTurbine.expectNoEvents() } } }

Turbine seems quite popular for testing flows, but I'm not a big fan of using third-party libraries when they're not really needed.

Determining how many connections were opened

One of the most important functionalities of MessagesService is that it should start only one connection to the source, no matter how many active observers we have.

// Starts at most one connection to the source class MessagesService( messagesSource: Flow<Message>, scope: CoroutineScope ) { private val source = messagesSource .shareIn( scope = scope, started = SharingStarted.WhileSubscribed() ) fun observeMessages(fromUserId: String) = source .filter { it.fromUserId == fromUserId } } // Can start multiple connections to the source class MessagesService( messagesSource: Flow<Message>, ) { fun observeMessages(fromUserId: String) = messagesSource .filter { it.fromUserId == fromUserId } }

The simplest way to test this behavior is by making a flow that counts how many subscribers it has. This can be done by incrementing a counter in onStart and decrementing it in onCompletion.

private val infiniteFlow = flow<Nothing> { while (true) { delay(100) } } class MessagesServiceTest { // ... @Test fun `should start at most one connection`() = runTest { // given var connectionsCounter = 0 val source = infiniteFlow .onStart { connectionsCounter++ } .onCompletion { connectionsCounter-- } val service = MessagesService( messagesSource = source, scope = backgroundScope, ) // when service.observeMessages("0") .launchIn(backgroundScope) service.observeMessages("1") .launchIn(backgroundScope) service.observeMessages("0") .launchIn(backgroundScope) service.observeMessages("2") .launchIn(backgroundScope) delay(1000) // then assertEquals(1, connectionsCounter) } }

Testing view models

Flow builder lets us describe how elements should be emitted from the source, which is a simple and powerful method. However, there is also another option: using SharedFlow as a source and emitting elements in the test. I find this option especially useful for testing view models.

class ChatViewModel( private val messagesService: MessagesService, ) : ViewModel() { private val _lastMessage = MutableStateFlow<String?>(null) val lastMessage: StateFlow<String?> = _lastMessage private val _messages = MutableStateFlow(emptyList<String>()) val messages: StateFlow<List<String>> = _messages fun start(fromUserId: String) { messagesService.observeMessages(fromUserId) .onEach { val text = it.text _lastMessage.value = text _messages.value = _messages.value + text } .launchIn(viewModelScope) } } class ChatViewModelTest { @Test fun `should expose messages from user`() = runTest { // given val source = MutableSharedFlow<Message>() // when val viewModel = ChatViewModel( messagesService = FakeMessagesService(source) ) viewModel.start("0") // then assertEquals(null, viewModel.lastMessage.value) assertEquals(emptyList(), viewModel.messages.value) // when source.emit(Message(fromUserId = "0", text = "ABC")) // then assertEquals("ABC", viewModel.lastMessage.value) assertEquals(listOf("ABC"), viewModel.messages.value) // when source.emit(Message(fromUserId = "0", text = "DEF")) source.emit(Message(fromUserId = "1", text = "GHI")) // then assertEquals("DEF", viewModel.lastMessage.value) assertEquals( listOf("ABC", "DEF"), viewModel.messages.value ) } }

This way, we can properly test our functions' behavior without depending on virtual time at all, and our unit test is simpler to read.

Summary

I hope this chapter has given you an overview of how we can test classes that use Flow. The logic, in general, is very similar to testing suspending functions, but Flow has its own specificities, and in this chapter,I'm I've shown different examples of how to deal with them.