article banner

Select in Kotlin Coroutines

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Coroutines provide the select function, which lets us await the result of the first coroutine that completes. It also offers the possibility of sending to the first channel that has space in the buffer or receiving from the first channel that has an available element. This lets us make races between coroutines or join results from multiple sources. Let's see it in practice.

The select function is still experimental, even though this feature has been available since early Kotlin Coroutines releases. It is highly unlikely that select will be removed, but its API might still change. This feature will probably never be stabilized because there is little demand for it - it is actually used rather rarely, so I decided to keep this chapter as short as possible.

Selecting deferred values

Let's say that we want to request data from multiple sources, but we’re only interested in the fastest response. The easiest way to achieve this is to start these requests in async processes; then, use the select function as an expression, and await different values inside it. Inside select, we can call onAwait on Deferred value, which specifies a possible select expression result. Inside its lambda expression, you can transform the value. In the example below, we just return an async result, so the select expression will complete once the first async task is completed, then it will return its result.

import kotlinx.coroutines.* import kotlinx.coroutines.selects.select suspend fun requestData1(): String { delay(100_000) return "Data1" } suspend fun requestData2(): String { delay(1000) return "Data2" } val scope = CoroutineScope(SupervisorJob()) suspend fun askMultipleForData(): String { val defData1 = scope.async { requestData1() } val defData2 = scope.async { requestData2() } return select { defData1.onAwait { it } defData2.onAwait { it } } } suspend fun main(): Unit = coroutineScope { println(askMultipleForData()) } // (1 sec) // Data2

Notice that async in the example above needs to be started on an outside scope. This means that if you cancel the coroutine that started askMultipleForData, these async tasks will not be cancelled. This is a problem, but I don’t know any better implementation. If we had used coroutineScope, then it would have awaited its children; so, for the implementation below, we'll still receive Data2 as a result, but after 100 seconds instead of after one.

// ... suspend fun askMultipleForData(): String { val defData1 = scope.async { requestData1() } val defData2 = scope.async { requestData2() } return select<String> { defData1.onAwait { it } defData2.onAwait { it } } } suspend fun main(): Unit = coroutineScope { println(askMultipleForData()) } // (100 sec) // Data2

A coroutine race can be implemented well using async and select, but it would require explicit scope cancellation. We could cancel those other coroutines in the also that is called once select has produced a value.

suspend fun askMultipleForData(): String = coroutineScope { select<String> { async { requestData1() }.onAwait { it } async { requestData2() }.onAwait { it } }.also { coroutineContext.cancelChildren() } } suspend fun main(): Unit = coroutineScope { println(askMultipleForData()) } // (1 sec) // Data2

The solution above is a bit complex, which is why many developers define a helper function or use an external library (like Splitties by Louis CAD) that includes the following raceOf function. Such a function can be defined in a couple of lines, as I will show in the Recipes chapter.

// Implementation using raceOf from Splitties library suspend fun askMultipleForData(): String = raceOf({ requestData1() }, { requestData2() }) suspend fun main(): Unit = coroutineScope { println(askMultipleForData()) } // (1 sec) // Data2

Selecting from channels

The select function can also be used with channels. These are the main functions that can be used inside it:

  • onReceive - selected when this channel has a value. It receives this value (like the receive method) and uses it as an argument for its lambda expression. When onReceive is selected, select returns the result of its lambda expression.
  • onReceiveCatching - selected when this channel has a value or is closed. It receives ChannelResult, which either represents a value or signals that this channel is closed, and it uses this value as an argument for its lambda expression. When onReceiveCatching is selected, select returns the result of its lambda expression.
  • onSend - selected when this channel has space in the buffer. It sends a value to this channel (like the send method) and invokes its lambda expression with a reference to the channel. When onSend is selected, select returns Unit.

Let's see some practical applications. The select expression can be used with onReceive or onReceiveCatching to receive from multiple channels.

import kotlinx.coroutines.channels.* import kotlinx.coroutines.* import kotlinx.coroutines.selects.select suspend fun CoroutineScope.produceString( s: String, time: Long ) = produce { while (true) { delay(time) send(s) } } fun main() = runBlocking { val fooChannel = produceString("foo", 210L) val barChannel = produceString("BAR", 500L) repeat(7) { select { fooChannel.onReceive { println("From fooChannel: $it") } barChannel.onReceive { println("From barChannel: $it") } } } coroutineContext.cancelChildren() } // From fooChannel: foo // From fooChannel: foo // From barChannel: BAR // From fooChannel: foo // From fooChannel: foo // From barChannel: BAR // From fooChannel: foo

The select function can be used with onSend to send to the first channel that has space in the buffer.

import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.selects.select import kotlinx.coroutines.* fun main(): Unit = runBlocking { val c1 = Channel<Char>(capacity = 2) val c2 = Channel<Char>(capacity = 2) // Send values launch { for (c in 'A'..'H') { delay(400) select<Unit> { c1.onSend(c) { println("Sent $c to 1") } c2.onSend(c) { println("Sent $c to 2") } } } } // Receive values launch { while (true) { delay(1000) val c = select<String> { c1.onReceive { "$it from 1" } c2.onReceive { "$it from 2" } } println("Received $c") } } } // Sent A to 1 // Sent B to 1 // Received A from 1 // Sent C to 1 // Sent D to 2 // Received B from 1 // Sent E to 1 // Sent F to 2 // Received C from 1 // Sent G to 1 // Received E from 1 // Sent H to 1 // Received G from 1 // Received H from 1 // Received D from 2 // Received F from 2

Summary

select is a useful function that lets us await the result of the first coroutine that completes, or send to or receive from the first from multiple channels. It is mainly used to implement different patterns for operating on channels, but it can also be used to implement async coroutine races.