article banner

Select in Kotlin Coroutines

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Coroutines provide the select function, which lets us await the result of the first coroutine that completes. It also offers the possibility of sending to the first channel that has space in the buffer or receiving from the first channel that has an available element. This lets us make races between coroutines or join results from multiple sources. Let's see it in practice.

The select function is still experimental, even though this feature has been available since early Kotlin Coroutines releases. It is highly unlikely that select will be removed, but its API might still change. This feature will probably never be stabilized because there is little demand for it - it is actually used rather rarely, so I decided to keep this chapter as short as possible.

Selecting deferred values

Let's say that we want to request data from multiple sources, but we're only interested in the fastest response. The easiest way to achieve this is to start these requests in async processes; then, use the select function as an expression, and await different values inside it. Inside select, we can call onAwait on Deferred value, which specifies a possible select expression result. Inside its lambda expression, you can transform the value. In the example below, we just return an async result, so the select expression will complete once the first async task is completed, then it will return its result.

import kotlinx.coroutines.* import kotlinx.coroutines.selects.select suspend fun requestData1(): String { delay(100_000) return "Data1" } suspend fun requestData2(): String { delay(1000) return "Data2" } suspend fun askMultipleForData(): String = coroutineScope { select<String> { async { requestData1() }.onAwait { it } async { requestData2() }.onAwait { it } }.also { coroutineContext.cancelChildren() } } suspend fun main(): Unit = coroutineScope { println(askMultipleForData()) } // (1 sec) // Data2

Notice that after the select expression, we call cancelChildren on coroutineContext to cancel all the coroutines that were started inside the select expression. This is important because without it, coroutineScope would not complete until all the coroutines started inside its scope are completed.

The solution above is a bit complex, which is why many developers define a helper function raceOf or use an external library (like Splitties by Louis CAD) that includes such a function. The raceOf function is a simple helper function that takes a lambda expression with multiple async tasks and returns the result of the first one that completes.

// Implementation using raceOf from Splitties library suspend fun askMultipleForData(): String = raceOf({ requestData1() }, { requestData2() }) suspend fun main(): Unit = coroutineScope { println(askMultipleForData()) } // (1 sec) // Data2

Selecting from channels

The select function can also be used with channels. These are the main functions that can be used inside it:

  • onReceive - selected when this channel has a value. It receives this value (like the receive method) and uses it as an argument for its lambda expression. When onReceive is selected, select returns the result of its lambda expression.
  • onReceiveCatching - selected when this channel has a value or is closed. It receives ChannelResult, which either represents a value or signals that this channel is closed, and it uses this value as an argument for its lambda expression. When onReceiveCatching is selected, select returns the result of its lambda expression.
  • onSend - selected when this channel has space in the buffer. It sends a value to this channel (like the send method) and invokes its lambda expression with a reference to the channel. When onSend is selected, select returns Unit.

The select expression can be used with onReceive or onReceiveCatching to receive from multiple channels.

import kotlinx.coroutines.channels.* import kotlinx.coroutines.* import kotlinx.coroutines.selects.select suspend fun CoroutineScope.produceString(s: String, time: Long) = produce { while (true) { delay(time) send(s) } } fun main() = runBlocking { val fooChannel = produceString("foo", 210L) val barChannel = produceString("BAR", 500L) repeat(7) { select { fooChannel.onReceive { println("From fooChannel: $it") } barChannel.onReceive { println("From barChannel: $it") } } } coroutineContext.cancelChildren() } // From fooChannel: foo // From fooChannel: foo // From barChannel: BAR // From fooChannel: foo // From fooChannel: foo // From barChannel: BAR // From fooChannel: foo

The select function can be used with onSend to send to the first channel that has space in the buffer.

import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.selects.select import kotlinx.coroutines.* fun main(): Unit = runBlocking { val c1 = Channel<Char>(capacity = 2) val c2 = Channel<Char>(capacity = 2) // Send values launch { for (c in 'A'..'H') { delay(400) select<Unit> { c1.onSend(c) { println("Sent $c to 1") } c2.onSend(c) { println("Sent $c to 2") } } } } // Receive values launch { while (true) { delay(1000) val c = select<String> { c1.onReceive { "$it from 1" } c2.onReceive { "$it from 2" } } println("Received $c") } } } // Sent A to 1 // Sent B to 1 // Received A from 1 // Sent C to 1 // Sent D to 2 // Received B from 1 // Sent E to 1 // Sent F to 2 // Received C from 1 // Sent G to 1 // Received E from 1 // Sent H to 1 // Received G from 1 // Received H from 1 // Received D from 2 // Received F from 2

Summary

select is a useful function that lets us await the result of the first coroutine that completes, or send to or receive from the first from multiple channels. It is mainly used to implement different patterns for operating on channels, but it can also be used to implement coroutine races.