article banner

Exercise: MessageService

Implement a MessageService class that has three functions:

  • threadsSearch - takes a flow of queries and returns a flow of threads. Each query should be used to observe threads from the repository. If a new query is emitted, the previous one should be cancelled.
  • subscribeThreads - takes a flow of threads and returns a flow of thread updates. Each thread should be subscribed to in the repository. Should observe all threads concurrently.
  • sendMessages - takes a flow of messages lists and returns a flow of responses that are the result of sending those messages. Each message should be synchronously sent to the repository, and its result should be the response.
class MessageService( private val messageRepository: MessageRepository ) { fun threadsSearch( query: Flow<String> ): Flow<MessageThread> = TODO() fun subscribeThreads( threads: Flow<MessageThread> ): Flow<MessageThreadUpdate> = TODO() fun sendMessages( messages: Flow<List<Message>> ): Flow<MessageSendingResponse> = TODO() } interface MessageRepository { fun searchThreads( query: String ): Flow<MessageThread> fun subscribeThread( threadId: String ): Flow<MessageThreadUpdate> fun sendMessages( messages: List<Message> ): Flow<MessageSendingResponse> }

This problem can either be solved in the below playground or you can clone kotlin-exercises project and solve it locally. In the project, you can find code template for this exercise in coroutines/flow/MessageService.kt. You can find there starting code and unit tests.

Once you are done with the exercise, you can check your solution here.

Playground

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.currentTime import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals class MessageService( private val messageRepository: MessageRepository ) { fun threadsSearch( query: Flow<String> ): Flow<MessageThread> = TODO() fun subscribeThreads( threads: Flow<MessageThread> ): Flow<MessageThreadUpdate> = TODO() fun sendMessages( messages: Flow<List<Message>> ): Flow<MessageSendingResponse> = TODO() } interface MessageRepository { fun searchThreads( query: String ): Flow<MessageThread> fun subscribeThread( threadId: String ): Flow<MessageThreadUpdate> fun sendMessages( messages: List<Message> ): Flow<MessageSendingResponse> } data class MessageThread(val id: String, val name: String) data class MessageThreadUpdate(val threadId: String, val messages: List<Message>) data class Message(val senderId: String, val text: String, val threadId: String) data class MessageSendingResponse(val messageId: String, val success: Boolean) class MessageServiceTests { @Test fun `should search for threads based on the last query`() = runTest { val repo = object : OpenMessageRepository() { override fun searchThreads(query: String): Flow<MessageThread> = flow { delay(1000) emit(MessageThread("Resp$query", "Name$query")) } } val service = MessageService(repo) val query = flow { emit("A") delay(500) emit("B") delay(1500) emit("C") } val result = service.threadsSearch(query) .withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(MessageThread("RespB", "NameB"), 1500), ValueAndTime(MessageThread("RespC", "NameC"), 3000), ), result ) } @Test fun `should search for all threads`() = runTest { val repo = object : OpenMessageRepository() { override fun searchThreads(query: String): Flow<MessageThread> = flow { delay(1000) emit(MessageThread("Resp$query", "Name$query")) delay(1000) emit(MessageThread("2Resp$query", "2Name$query")) } } val service = MessageService(repo) val query = flow { emit("A") delay(2500) emit("B") delay(1500) emit("C") delay(500) emit("D") } val result = service.threadsSearch(query).toList() assertEquals( listOf( MessageThread("RespA", "NameA"), MessageThread("2RespA", "2NameA"), MessageThread("RespB", "NameB"), MessageThread("RespD", "NameD"), MessageThread("2RespD", "2NameD"), ), result ) } @Test fun `should subscribe to threads`() = runTest { val repo = object : OpenMessageRepository() { override fun subscribeThread(threadId: String): Flow<MessageThreadUpdate> = flow { emit(MessageThreadUpdate(threadId, listOf(Message("A", "B", threadId)))) delay(1000) emit(MessageThreadUpdate(threadId, listOf(Message("C", "D", threadId)))) } } val service = MessageService(repo) val threads = flow { emit(MessageThread("T1", "Name1")) delay(500) emit(MessageThread("T2", "Name2")) delay(1500) emit(MessageThread("T3", "Name3")) } val result = service.subscribeThreads(threads) .withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(MessageThreadUpdate("T1", listOf(Message("A", "B", "T1"))), 0), ValueAndTime(MessageThreadUpdate("T2", listOf(Message("A", "B", "T2"))), 500), ValueAndTime(MessageThreadUpdate("T1", listOf(Message("C", "D", "T1"))), 1000), ValueAndTime(MessageThreadUpdate("T2", listOf(Message("C", "D", "T2"))), 1500), ValueAndTime(MessageThreadUpdate("T3", listOf(Message("A", "B", "T3"))), 2000), ValueAndTime(MessageThreadUpdate("T3", listOf(Message("C", "D", "T3"))), 3000), ), result ) } @Test fun `should subscribe to unlimited number of threads`() = runTest { val repo = object : OpenMessageRepository() { override fun subscribeThread(threadId: String): Flow<MessageThreadUpdate> = flow { emit(MessageThreadUpdate(threadId, listOf(Message("A", "B", threadId)))) delay(1000) emit(MessageThreadUpdate(threadId, listOf(Message("C", "D", threadId)))) } } val service = MessageService(repo) val threads = flow { repeat(1000) { emit(MessageThread("T$it", "Name$it")) delay(1) } } val result = service.subscribeThreads(threads).toList() assertEquals(2000, result.size) assertEquals(999 + 1000, currentTime) } @Test fun `should send messages synchroniously`() = runTest { val repo = object : OpenMessageRepository() { override fun sendMessages(messages: List<Message>): Flow<MessageSendingResponse> = flow { messages.forEach { delay(1000) emit(MessageSendingResponse(it.threadId, true)) } } } val service = MessageService(repo) val messages = channelFlow { send(listOf(Message("A", "B", "T1"))) delay(500) send(listOf(Message("C", "D", "T2"))) delay(1500) send(listOf(Message("E", "F", "T3"))) } val result = service.sendMessages(messages) .withVirtualTime(this) .toList() assertEquals( listOf( ValueAndTime(MessageSendingResponse("T1", true), 1000), ValueAndTime(MessageSendingResponse("T2", true), 2000), ValueAndTime(MessageSendingResponse("T3", true), 3000), ), result ) } } open class OpenMessageRepository : MessageRepository { override fun searchThreads(query: String): Flow<MessageThread> { TODO() } override fun subscribeThread(threadId: String): Flow<MessageThreadUpdate> { TODO() } override fun sendMessages(messages: List<Message>): Flow<MessageSendingResponse> { TODO() } } fun <T> Flow<T>.withVirtualTime(testScope: TestScope): Flow<ValueAndTime<T>> = map { ValueAndTime(it, testScope.currentTime) } data class ValueAndTime<T>(val value: T, val timeMillis: Long)