Solution: MessageService

class MessageService( private val messageRepository: MessageRepository ) { fun threadsSearch( query: Flow<String> ): Flow<MessageThread> = query .flatMapLatest { messageRepository.searchThreads(it) } fun subscribeThreads( threads: Flow<MessageThread> ): Flow<MessageThreadUpdate> = threads .flatMapMerge(concurrency = Int.MAX_VALUE) { messageRepository.subscribeThread(it.id) } fun sendMessages( messages: Flow<List<Message>> ): Flow<MessageSendingResponse> = messages .flatMapConcat { messageRepository.sendMessages(it) } }

Example solution in playground

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.currentTime import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals class MessageService( private val messageRepository: MessageRepository ) { fun threadsSearch( query: Flow<String> ): Flow<MessageThread> = query .flatMapLatest { messageRepository.searchThreads(it) } fun subscribeThreads( threads: Flow<MessageThread> ): Flow<MessageThreadUpdate> = threads .flatMapMerge(concurrency = Int.MAX_VALUE) { messageRepository.subscribeThread(it.id) } fun sendMessages( messages: Flow<List<Message>> ): Flow<MessageSendingResponse> = messages .flatMapConcat { messageRepository.sendMessages(it) } } interface MessageRepository { fun searchThreads( query: String ): Flow<MessageThread> fun subscribeThread( threadId: String ): Flow<MessageThreadUpdate> fun sendMessages( messages: List<Message> ): Flow<MessageSendingResponse> } data class MessageThread(val id: String, val name: String) data class MessageThreadUpdate(val threadId: String, val messages: List<Message>) data class Message(val senderId: String, val text: String, val threadId: String) data class MessageSendingResponse(val messageId: String, val success: Boolean) class MessageServiceTests { @Test fun `should search for threads based on the last query`() = runTest { val repo = object : OpenMessageRepository() { override fun searchThreads(query: String): Flow<MessageThread> = flow { delay(1000) emit(MessageThread("Resp$query", "Name$query")) } } val service = MessageService(repo) val query = flow { emit("A") delay(500) emit("B") delay(1500) emit("C") } val result = service.threadsSearch(query) .withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(MessageThread("RespB", "NameB"), 1500), ValueAndTime(MessageThread("RespC", "NameC"), 3000), ), result ) } @Test fun `should search for all threads`() = runTest { val repo = object : OpenMessageRepository() { override fun searchThreads(query: String): Flow<MessageThread> = flow { delay(1000) emit(MessageThread("Resp$query", "Name$query")) delay(1000) emit(MessageThread("2Resp$query", "2Name$query")) } } val service = MessageService(repo) val query = flow { emit("A") delay(2500) emit("B") delay(1500) emit("C") delay(500) emit("D") } val result = service.threadsSearch(query).toList() assertEquals( listOf( MessageThread("RespA", "NameA"), MessageThread("2RespA", "2NameA"), MessageThread("RespB", "NameB"), MessageThread("RespD", "NameD"), MessageThread("2RespD", "2NameD"), ), result ) } @Test fun `should subscribe to threads`() = runTest { val repo = object : OpenMessageRepository() { override fun subscribeThread(threadId: String): Flow<MessageThreadUpdate> = flow { emit(MessageThreadUpdate(threadId, listOf(Message("A", "B", threadId)))) delay(1000) emit(MessageThreadUpdate(threadId, listOf(Message("C", "D", threadId)))) } } val service = MessageService(repo) val threads = flow { emit(MessageThread("T1", "Name1")) delay(500) emit(MessageThread("T2", "Name2")) delay(1500) emit(MessageThread("T3", "Name3")) } val result = service.subscribeThreads(threads) .withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(MessageThreadUpdate("T1", listOf(Message("A", "B", "T1"))), 0), ValueAndTime(MessageThreadUpdate("T2", listOf(Message("A", "B", "T2"))), 500), ValueAndTime(MessageThreadUpdate("T1", listOf(Message("C", "D", "T1"))), 1000), ValueAndTime(MessageThreadUpdate("T2", listOf(Message("C", "D", "T2"))), 1500), ValueAndTime(MessageThreadUpdate("T3", listOf(Message("A", "B", "T3"))), 2000), ValueAndTime(MessageThreadUpdate("T3", listOf(Message("C", "D", "T3"))), 3000), ), result ) } @Test fun `should subscribe to unlimited number of threads`() = runTest { val repo = object : OpenMessageRepository() { override fun subscribeThread(threadId: String): Flow<MessageThreadUpdate> = flow { emit(MessageThreadUpdate(threadId, listOf(Message("A", "B", threadId)))) delay(1000) emit(MessageThreadUpdate(threadId, listOf(Message("C", "D", threadId)))) } } val service = MessageService(repo) val threads = flow { repeat(1000) { emit(MessageThread("T$it", "Name$it")) delay(1) } } val result = service.subscribeThreads(threads).toList() assertEquals(2000, result.size) assertEquals(999 + 1000, currentTime) } @Test fun `should send messages synchroniously`() = runTest { val repo = object : OpenMessageRepository() { override fun sendMessages(messages: List<Message>): Flow<MessageSendingResponse> = flow { messages.forEach { delay(1000) emit(MessageSendingResponse(it.threadId, true)) } } } val service = MessageService(repo) val messages = channelFlow { send(listOf(Message("A", "B", "T1"))) delay(500) send(listOf(Message("C", "D", "T2"))) delay(1500) send(listOf(Message("E", "F", "T3"))) } val result = service.sendMessages(messages) .withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(MessageSendingResponse("T1", true), 1000), ValueAndTime(MessageSendingResponse("T2", true), 2000), ValueAndTime(MessageSendingResponse("T3", true), 3000), ), result ) } } open class OpenMessageRepository : MessageRepository { override fun searchThreads(query: String): Flow<MessageThread> { TODO() } override fun subscribeThread(threadId: String): Flow<MessageThreadUpdate> { TODO() } override fun sendMessages(messages: List<Message>): Flow<MessageSendingResponse> { TODO() } } fun <T> Flow<T>.withVirtualTime(testScope: TestScope): Flow<ValueAndTime<T>> = map { ValueAndTime(it, testScope.currentTime) } data class ValueAndTime<T>(val value: T, val timeMillis: Long)