Channel in Kotlin Coroutines

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

The Channel API was added as an inter-coroutine communication primitive. Many imagine it as a pipe, but I prefer a different metaphor. Do you recognize the public bookcases for exchanging books? One person needs to bring a book, for another to find it. This is very similar to how Channel from kotlinx.coroutines works.

Channel supports any number of senders and receivers, and every value that is sent to the channel is received once.

Channel is an interface, that implements two other interfaces:

  • SendChannel used to send elements (adding elements) and to close the channel,
  • ReceiveChannel for receiving the elements (taking them).
interface SendChannel<in E> { suspend fun send(element: E) fun close(): Boolean } interface ReceiveChannel<out E> { suspend fun receive(): E } interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

Thanks to this distinction, we can expose just ReceiveChannel or SendChannel, to restrict the channel entry points.

You might notice that both send and receive are suspending functions. This is an essential feature:

  • When we try to receive and there are no elements in the channel, the coroutine is suspended until the element is available. Like in our metaphorical bookshelf, when someone goes to the shelf to find a book, and the bookshelf is empty, that person needs to suspend until someone puts an element there.
  • On the other hand, send will be suspended when the channel reaches its capacity. We will soon see that most channels have a limited capacity. Like in our metaphorical bookshelf, when someone tries to put a book into a shelf, but the bookshelf is full, that person needs to suspend until someone takes a book and makes space.

A channel might have any number of senders and receivers. Although the most common situation is when there is one coroutine on both sides.

To see the simplest example of a channel, we need to have a producer (sender) and a consumer (receiver) on separate coroutines. The producer will be sending elements, and the consumer will be receiving them. This is how it can be implemented:

import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel //sampleStart suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { index -> delay(1000) println("Producing next one") channel.send(index * 2) } } launch { repeat(5) { val received = channel.receive() println(received) } } } // (1 sec) // Producing next one // 0 // (1 sec) // Producing next one // 2 // (1 sec) // Producing next one // 4 // (1 sec) // Producing next one // 6 // (1 sec) // Producing next one // 8 //sampleEnd

Such an implementation is far from perfect. First, the receiver needs to know how many elements will be sent. This is rarely a case, so we would prefer to listen as long as the sender is willing to send. To receive elements on the channel until it is closed, we could use a for-loop or consumeEach function (the first option is generally preferable, as consumeEach is not safe to use from multiple coroutines).

import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel //sampleStart suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { index -> println("Producing next one") delay(1000) channel.send(index * 2) } channel.close() } launch { for (element in channel) { println(element) } // or // channel.consumeEach { element -> // println(element) // } } } //sampleEnd

The common problem with this way of sending elements is that it is easy to forget about closing the channel. Especially in the case of exceptions. If one coroutine stops producing because of an exception, the other will wait for elements forever. It is much more convenient to use the produce function, which is a coroutine builder returning ReceiveChannel.

// This function produces a channel with // next positive integers from 0 to `max` fun CoroutineScope.produceNumbers( max: Int ): ReceiveChannel<Int> = produce { var x = 0 while (x < 5) send(x++) }

The produce function closes the channel whenever the builder coroutine ends in any way (finished, stopped, cancelled). Thanks to that, we will never forget to call close. The produce builder is a very popular way to create a channel, for good reasons: it gives a lot of safety and convenience.

import kotlinx.coroutines.channels.produce import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val channel = produce { repeat(5) { index -> println("Producing next one") delay(1000) send(index * 2) } } for (element in channel) { println(element) } } //sampleEnd

Channel types

Depending on the capacity size we set, we distinguish four types of channels:

  • Unlimited - channel with capacity Channel.UNLIMITED, that has unlimited capacity buffer, and send is never suspended.
  • Buffered - channel with concrete capacity size or Channel.BUFFERED (that is 64 by default, and can be overridden by setting the system property kotlinx.coroutines.channels.defaultBuffer on JVM).
  • Rendezvous1 (default) - channel with capacity 0 or Channel.RENDEZVOUS (that is equal to 0), meaning that exchange can happen only if sender and receiver meet. This means that always one of them will be suspended at least for a moment when waiting for another.
  • Conflated - channel with capacity Channel.CONFLATED, with a buffer of size 1 where each new element replaces the previous one.

Let's now see those capacities in action. We can set them directly on Channel, but we can also set them when we call the produce function.

We will make our producer fast and our receiver slow. With unlimited capacity, the channel should accept all the elements, and then let them be received one after another.

import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.produce import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val channel = produce(capacity = Channel.UNLIMITED) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } } // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (1 - 4 * 0.1 = 0.6 sec) // 0 // (1 sec) // 2 // (1 sec) // 4 // (1 sec) // 6 // (1 sec) // 8 // (1 sec) //sampleEnd

With a capacity of concrete size, we will first produce until the buffer is full, and then the producer will need to start waiting for the receiver.

import kotlinx.coroutines.channels.produce import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val channel = produce(capacity = 3) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } } // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (1 - 2 * 0.1 = 0.8 sec) // 0 // Sent // (1 sec) // 2 // Sent // (1 sec) // 4 // (1 sec) // 6 // (1 sec) // 8 // (1 sec) //sampleEnd

With a channel of default (or Channel.RENDEZVOUS) capacity, the producer will wait for a receiver from the beginning.

import kotlinx.coroutines.channels.produce import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val channel = produce { // or produce(capacity = Channel.RENDEZVOUS) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } } // 0 // Sent // (1 sec) // 2 // Sent // (1 sec) // 4 // Sent // (1 sec) // 6 // Sent // (1 sec) // 8 // Sent // (1 sec) //sampleEnd

Finally, with the conflated capacity we will not be storing past elements. New elements will replace the previous ones, so we will be able to receive only the last one resulting in losing the previosly unrecived elements.

import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.produce import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val channel = produce(capacity = Channel.CONFLATED) { repeat(5) { index -> send(index * 2) delay(100) println("Sent") } } delay(1000) for (element in channel) { println(element) delay(1000) } } // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (1 - 4 * 0.1 = 0.6 sec) // 8 //sampleEnd

On buffer overflow

To customize channels further we can control what will happen when the buffer is full (onBufferOverflow parameter). There are the following options:

  • SUSPEND (default) - when the buffer is full, suspend on the send method.
  • DROP_OLDEST - when the buffer is full, drop the oldest element.
  • DROP_LATEST - when the buffer is full, drop the latest element.

As you might guess, the channel capacity Channel.CONFLATED, is the same as setting the capacity to 1, and onBufferOverflow to DROP_OLDEST. Currently, the produce function does not allow us to set custom onBufferOverflow, so to set it, we need to define a channel using the function Channel2.

import kotlinx.coroutines.channels.* import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>( capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST ) launch { repeat(5) { index -> channel.send(index * 2) delay(100) println("Sent") } channel.close() } delay(1000) for (element in channel) { println(element) delay(1000) } } // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (1 - 4 * 0.1 = 0.6 sec) // 6 // (1 sec) // 8 //sampleEnd

On undelivered element handler

One more Channel function parameter which we should know about, is onUndeliveredElement. It is called when an element couldn't be handled for some reason. Most often it means that a channel was closed or cancelled, but it might also happen in a case of send, receive, receiveOrNull, or hasNext throwing an error. We generally use it to close resources that are sent by this channel.

val channel = Channel<Resource>(capacity) { resource -> resource.close() } // or // val channel = Channel<Resource>( // capacity, // onUndeliveredElement = { resource -> // resource.close() // } // ) // Producer code val resourceToSend = openResource() channel.send(resourceToSend) // Consumer code val resourceReceived = channel.receive() try { // work with received resource } finally { resourceReceived.close() }

Fan-out

Multiple coroutines can receive from a single channel, but to receive them properly, we should use a for-loop (consumeEach is not safe to use from multiple coroutines).

import kotlinx.coroutines.* import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.produce //sampleStart fun CoroutineScope.produceNumbers() = produce { repeat(10) { delay(100) send(it) } } fun CoroutineScope.launchProcessor( id: Int, channel: ReceiveChannel<Int> ) = launch { for (msg in channel) { println("#$id received $msg") } } suspend fun main(): Unit = coroutineScope { val channel = produceNumbers() repeat(3) { id -> delay(10) launchProcessor(id, channel) } } // #0 received 0 // #1 received 1 // #2 received 2 // #0 received 3 // #1 received 4 // #2 received 5 // #0 received 6 // ... //sampleEnd

The elements are distributed fairly. The channel has a FIFO (first-in-first-out) queue of coroutines waiting for an element. That is why in the above example you can see that the elements are received by the next coroutines (0, 1, 2, 0, 1, 2, etc).

Fan-in

Multiple coroutines can send to a single channel. In the below example, you can see two coroutines sending elements to the same channel.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* //sampleStart suspend fun sendString( channel: SendChannel<String>, text: String, time: Long ) { while (true) { delay(time) channel.send(text) } } fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(50) { println(channel.receive()) } coroutineContext.cancelChildren() } // (200 ms) // foo // (200 ms) // foo // (100 ms) // BAR! // (100 ms) // foo // (200 ms) // ... //sampleEnd

Sometimes we need to merge multiple channels into one. For that, you can find the following function useful, which is merging multiple channels using the produce function:

fun <T> CoroutineScope.fanIn( channels: List<ReceiveChannel<T>> ): ReceiveChannel<T> = produce { for (channel in channels) { launch { for (elem in channel) { send(elem) } } } }

Pipelines

Sometimes we set two channels in the way that one produces elements based on those received from another. In such cases, we call it a pipeline.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* //sampleStart // A channel of number from 1 to 3 fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce { repeat(3) { num -> send(num + 1) } } fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce { for (num in numbers) { send(num * num) } } suspend fun main() = coroutineScope { val numbers = numbers() val squared = square(numbers) for (num in squared) { println(num) } } // 1 // 4 // 9 //sampleEnd

Channels as a communication primitive

Channels are useful when different coroutines need to communicate with each other. They guarantee no conflicts (i.e. no problem with shared state) and fairness.

To see them in action, imagine the following problem where different baristas are making coffees. Each barista should be a separate coroutine working independently. Different coffee types take different amounts of time to prepare, but we want to handle orders in the order as they appear. The easiest way to solve this problem is by both sending orders and the result coffees in channels. A barista can be defined using the produce builder:

suspend fun CoroutineScope.serveOrders( orders: ReceiveChannel<Order>, baristaName: String ): ReceiveChannel<CoffeeResult> = produce { for (order in orders) { val coffee = prepareCoffee(order.type) send( CoffeeResult( coffee = coffee, customer = order.customer, baristaName = baristaName ) ) } }

When we set a pipeline, we can use the previously defined fanIn function to merge the results produced by different baristas into one:

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* data class Order(val customer: String, val type: CoffeeType) enum class CoffeeType { ESPRESSO, LATTE } class Milk class GroundCoffee sealed class Coffee class Espresso(val ground: GroundCoffee) : Coffee() { override fun toString(): String = "Espresso" } class Latte(val milk: Milk, val espresso: Espresso) : Coffee() { override fun toString(): String = "Latte" } //sampleStart suspend fun main() = coroutineScope<Unit> { val orders = List(100) { Order("Customer$it", CoffeeType.values().random()) } val ordersChannel = produce { orders.forEach { send(it) } } val coffeeResults = fanIn( serveOrders(ordersChannel, "Alex"), serveOrders(ordersChannel, "Bob"), serveOrders(ordersChannel, "Celine"), ) for (coffeeResult in coffeeResults) { println("Serving $coffeeResult") } } //sampleEnd fun <T> CoroutineScope.fanIn( vararg channels: ReceiveChannel<T> ): ReceiveChannel<T> = produce { for (channel in channels) { launch { for (elem in channel) { send(elem) } } } } data class CoffeeResult(val coffee: Coffee, val customer: String, val baristaName: String) fun CoroutineScope.serveOrders( orders: ReceiveChannel<Order>, baristaName: String ): ReceiveChannel<CoffeeResult> = produce { for (order in orders) { val coffee = prepareCoffee(order.type) send(CoffeeResult(coffee, order.customer, baristaName)) } } private fun prepareCoffee(type: CoffeeType): Coffee { val groundCoffee = groundCoffee() val espresso = makeEspresso(groundCoffee) val coffee = when (type) { CoffeeType.ESPRESSO -> espresso CoffeeType.LATTE -> { val milk = brewMilk() Latte(milk, espresso) } } return coffee } fun groundCoffee(): GroundCoffee { longOperation() return GroundCoffee() } fun brewMilk(): Milk { longOperation() return Milk() } fun makeEspresso(ground: GroundCoffee): Espresso { longOperation() return Espresso(ground) } fun longOperation() { // val size = 820 // ~1 second on my MacBook val size = 350 // ~0.1 second on my MacBook val list = List(size) { it } val listOfLists = List(size) { list } val listOfListsOfLists = List(size) { listOfLists } listOfListsOfLists.hashCode() }

You will find more practical examples in the next chapter.

Practical usage

The typical cases where we use channels is when on one side values are produced, and on the other, we want to process them. Like responding to user clicks, new notifications from a server, or updating search results over time (a good example is SkyScanner, which searches for the cheapest flights by asking multiple airline websites). Although in most of those cases, we rather use channelFlow or callbackFlow, which are a hybrid of Channel and Flow (we will explain them in the chapter Flow building).

On SkyScanner we can see better and better flight search results as more and more airlines respond.

In their pure form, I find channels useful in some more complex processing cases. For example, let's say that we are maintaining an online shop, like Amazon. Let's say that your service receives a big number of changes on sellers, and those changes might influence their offers. For each change, we need to first find a list of offers to update, and then update them one after another.

Doing that the traditional way wouldn't be optimal. One seller might have even hundreds of thousands of offers. Doing it all in a single long process is not the best idea.

First, either an internal exception or server restart might leave us without knowing where we stopped. Second, one big seller might block the server for a long time, at the cost of small sellers waiting for their changes to apply. Moreover, we should not send so many network requests at the same time, to not overload the service that needs to handle them (and our network interface as well).

The solution to this problem might be to set a pipeline. The first channel could contain the sellers to process, the second one the offers to update. Those channels will have a buffer. Buffer in the second one could prevent our service from getting more offers when too many are waiting already. Thanks to that, our server will be able to balance the number of offers we are updating at the same time.

We might also easily add some intermediate steps, like removing duplicates. By setting a number of coroutines listening on each channel, we decide how many concurrent requests to the external service we wish to make. Manipulating those parameters gives us a lot of freedom. There are also many improvements that can be added quite easily, like persistence (for the case when the server is restarted) or elements uniqueness (for the case where the seller made another change before the previous one was processed).

// A simplified implementation suspend fun handleOfferUpdates() = coroutineScope { val sellerChannel = listenOnSellerChanges() val offerToUpdateChannel = produce { repeat(NUMBER_OF_CONCURRENT_OFFER_SERVICE_REQUESTS) { launch { for (seller in sellerChannel) { val offers = offerService .requestOffers(seller.id) offers.forEach { send(it) } } } } } repeat(NUMBER_OF_CONCURRENT_UPDATE_SENDERS) { launch { for (offer in offerToUpdateChannel) { sendOfferUpdate(offer) } } } }

Summary

Channel is a powerful inter-coroutine communication primitive. It supports any number of senders and receivers, and every value that is sent to the channel is received once. We often create a channel using produce builder. Channels could be used to set a pipeline, where we control the number of coroutines working on some tasks. Nowadays we most often use channels in connection with Flow, which will be presented later in the book.

1:

The origin is the French word "rendez-vous" that is commonly used for an appointment. This beautiful word crossed borders, and can be found as a less popular English word "rendezvous". Also in the Polish language, there is a word "Randka" that means a date (a romantic appointment).

2:

Channel is an interface, so Channel() is a call of a function pretending to be a constructor.