Channel in Kotlin Coroutines
The Channel API was added as an inter-coroutine communication primitive. Many imagine it as a pipe, but I prefer a different metaphor. Do you recognize the public bookcases for exchanging books? One person needs to bring a book, for another to find it. This is very similar to how
Channel supports any number of senders and receivers, and every value that is sent to the channel is received once.
Channel is an interface, that implements two other interfaces:
SendChannelused to send elements (adding elements) and to close the channel,
ReceiveChannelfor receiving the elements (taking them).
Thanks to this distinction, we can expose just
SendChannel, to restrict the channel entry points.
You might notice that both
receive are suspending functions. This is an essential feature:
- When we try to
receiveand there are no elements in the channel, the coroutine is suspended until the element is available. Like in our metaphorical bookshelf, when someone goes to the shelf to find a book, and the bookshelf is empty, that person needs to suspend until someone puts an element there.
- On the other hand,
sendwill be suspended when the channel reaches its capacity. We will soon see that most channels have a limited capacity. Like in our metaphorical bookshelf, when someone tries to put a book into a shelf, but the bookshelf is full, that person needs to suspend until someone takes a book and makes space.
A channel might have any number of senders and receivers. Although the most common situation is when there is one coroutine on both sides.
To see the simplest example of a channel, we need to have a producer (sender) and a consumer (receiver) on separate coroutines. The producer will be sending elements, and the consumer will be receiving them. This is how it can be implemented:
Such an implementation is far from perfect. First, the receiver needs to know how many elements will be sent. This is rarely a case, so we would prefer to listen as long as the sender is willing to send. To receive elements on the channel until it is closed, we could use a for-loop or
consumeEach function (the first option is generally preferable, as
consumeEach is not safe to use from multiple coroutines).
The common problem with this way of sending elements is that it is easy to forget about closing the channel. Especially in the case of exceptions. If one coroutine stops producing because of an exception, the other will wait for elements forever. It is much more convenient to use the
produce function, which is a coroutine builder returning
produce function closes the channel whenever the builder coroutine ends in any way (finished, stopped, cancelled). Thanks to that, we will never forget to call
produce builder is a very popular way to create a channel, for good reasons: it gives a lot of safety and convenience.
Depending on the capacity size we set, we distinguish four types of channels:
- Unlimited - channel with capacity
Channel.UNLIMITED, that has unlimited capacity buffer, and
sendis never suspended.
- Buffered - channel with concrete capacity size or
Channel.BUFFERED(that is 64 by default, and can be overridden by setting the system property
- Rendezvous1 (default) - channel with capacity 0 or
Channel.RENDEZVOUS(that is equal to
0), meaning that exchange can happen only if sender and receiver meet. This means that always one of them will be suspended at least for a moment when waiting for another.
- Conflated - channel with capacity
Channel.CONFLATED, with a buffer of size 1 where each new element replaces the previous one.
Let's now see those capacities in action. We can set them directly on
Channel, but we can also set them when we call the
We will make our producer fast and our receiver slow. With unlimited capacity, the channel should accept all the elements, and then let them be received one after another.
With a capacity of concrete size, we will first produce until the buffer is full, and then the producer will need to start waiting for the receiver.
With a channel of default (or
Channel.RENDEZVOUS) capacity, the producer will wait for a receiver from the beginning.
Finally, with the conflated capacity we will not be storing past elements. New elements will replace the previous ones, so we will be able to receive only the last one resulting in losing the previosly unrecived elements.
On buffer overflow
To customize channels further we can control what will happen when the buffer is full (
onBufferOverflow parameter). There are the following options:
SUSPEND(default) - when the buffer is full, suspend on the
DROP_OLDEST- when the buffer is full, drop the oldest element.
DROP_LATEST- when the buffer is full, drop the latest element.
As you might guess, the channel capacity
Channel.CONFLATED, is the same as setting the capacity to 1, and
DROP_OLDEST. Currently, the
produce function does not allow us to set custom
onBufferOverflow, so to set it, we need to define a channel using the function
On undelivered element handler
Channel function parameter which we should know about, is
onUndeliveredElement. It is called when an element couldn't be handled for some reason. Most often it means that a channel was closed or cancelled, but it might also happen in a case of
hasNext throwing an error. We generally use it to close resources that are sent by this channel.
Multiple coroutines can receive from a single channel, but to receive them properly, we should use a for-loop (
consumeEach is not safe to use from multiple coroutines).
The elements are distributed fairly. The channel has a FIFO (first-in-first-out) queue of coroutines waiting for an element. That is why in the above example you can see that the elements are received by the next coroutines (0, 1, 2, 0, 1, 2, etc).
Multiple coroutines can send to a single channel. In the below example, you can see two coroutines sending elements to the same channel.
Sometimes we need to merge multiple channels into one. For that, you can find the following function useful, which is merging multiple channels using the
Sometimes we set two channels in the way that one produces elements based on those received from another. In such cases, we call it a pipeline.
Channels as a communication primitive
Channels are useful when different coroutines need to communicate with each other. They guarantee no conflicts (i.e. no problem with shared state) and fairness.
To see them in action, imagine the following problem where different baristas are making coffees. Each barista should be a separate coroutine working independently. Different coffee types take different amounts of time to prepare, but we want to handle orders in the order as they appear. The easiest way to solve this problem is by both sending orders and the result coffees in channels. A barista can be defined using the
When we set a pipeline, we can use the previously defined
fanIn function to merge the results produced by different baristas into one:
You will find more practical examples in the next chapter.
The typical cases where we use channels is when on one side values are produced, and on the other, we want to process them. Like responding to user clicks, new notifications from a server, or updating search results over time (a good example is SkyScanner, which searches for the cheapest flights by asking multiple airline websites). Although in most of those cases, we rather use
callbackFlow, which are a hybrid of Channel and Flow (we will explain them in the chapter Flow building).
On SkyScanner we can see better and better flight search results as more and more airlines respond.
In their pure form, I find channels useful in some more complex processing cases. For example, let's say that we are maintaining an online shop, like Amazon. Let's say that your service receives a big number of changes on sellers, and those changes might influence their offers. For each change, we need to first find a list of offers to update, and then update them one after another.
Doing that the traditional way wouldn't be optimal. One seller might have even hundreds of thousands of offers. Doing it all in a single long process is not the best idea.
First, either an internal exception or server restart might leave us without knowing where we stopped. Second, one big seller might block the server for a long time, at the cost of small sellers waiting for their changes to apply. Moreover, we should not send so many network requests at the same time, to not overload the service that needs to handle them (and our network interface as well).
The solution to this problem might be to set a pipeline. The first channel could contain the sellers to process, the second one the offers to update. Those channels will have a buffer. Buffer in the second one could prevent our service from getting more offers when too many are waiting already. Thanks to that, our server will be able to balance the number of offers we are updating at the same time.
We might also easily add some intermediate steps, like removing duplicates. By setting a number of coroutines listening on each channel, we decide how many concurrent requests to the external service we wish to make. Manipulating those parameters gives us a lot of freedom. There are also many improvements that can be added quite easily, like persistence (for the case when the server is restarted) or elements uniqueness (for the case where the seller made another change before the previous one was processed).
Channel is a powerful inter-coroutine communication primitive. It supports any number of senders and receivers, and every value that is sent to the channel is received once. We often create a channel using
produce builder. Channels could be used to set a pipeline, where we control the number of coroutines working on some tasks. Nowadays we most often use channels in connection with Flow, which will be presented later in the book.
The origin is the French word "rendez-vous" that is commonly used for an appointment. This beautiful word crossed borders, and can be found as a less popular English word "rendezvous". Also in the Polish language, there is a word "Randka" that means a date (a romantic appointment).
Channel is an interface, so
Channel() is a call of a function pretending to be a constructor.