Channel in Kotlin Coroutines
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
The Channel API was added as an inter-coroutine communication primitive. Many imagine a channel as a pipe, but I prefer a different metaphor. Are you familiar with public bookcases for exchanging books? One person needs to bring a book for another person to find it. This is very similar to how Channel
from kotlinx.coroutines
works.
Channel supports any number of senders and receivers, and every value that is sent to a channel is received only once.
Channel
is an interface that implements two other interfaces:
SendChannel
, which is used to send elements (adding elements) and to close the channel;ReceiveChannel
, which receives (or takes)the elements.
Thanks to this distinction, we can expose just ReceiveChannel
or SendChannel
in order to restrict the channel entry points.
You might notice that both send
and receive
are suspending functions. This is an essential feature:
- When we try to
receive
and there are no elements in the channel, the coroutine is suspended until the element is available. Like in our metaphorical bookshelf, when someone goes to the shelf to find a book but the bookshelf is empty, that person needs to suspend until someone puts an element there. - On the other hand,
send
will be suspended when the channel reaches its capacity. We will soon see that most channels have limited capacity. Like in our metaphorical bookshelf, when someone tries to put a book on a shelf but it is full, that person needs to suspend until someone takes a book and makes space.
If you need to send or receive from a non-suspending function, you can use
trySend
andtryReceive
. Both operations are immediate and returnChannelResult
, which contains information about the success or failure of the operation, as well as its result. UsetrySend
andtryReceive
only for channels with limited capacity because they will not work for the rendezvous channel.
A channel might have any number of senders and receivers. However, the most common situation is when there is one coroutine on both sides of the channel.
To see the simplest example of a channel, we need to have a producer (sender) and a consumer (receiver) in separate coroutines. The producer will send elements, and the consumer will receive them. This is how it can be implemented:
Such an implementation is far from perfect. First, the receiver needs to know how many elements will be sent; however, this is rarely the case, so we would prefer to listen for as long as the sender is willing to send. To receive elements on the channel until it is closed, we could use a for-loop or consumeEach
function3.
The common problem with this way of sending elements is that it is easy to forget to close the channel, especially in the case of exceptions. If one coroutine stops producing because of an exception, the other will wait for elements forever. It is much more convenient to use the produce
function, which is a coroutine builder that returns ReceiveChannel
.
The produce
function closes the channel whenever the builder coroutine ends in any way (finished, stopped, cancelled). Thanks to this, we will never forget to call close
. The produce
builder is a very popular way to create a channel, and for good reason: it offers a lot of safety and convenience.
Channel types
Depending on the capacity size we set, we distinguish four types of channels:
- Unlimited - channel with capacity
Channel.UNLIMITED
that has an unlimited capacity buffer, andsend
never suspends. - Buffered - channel with concrete capacity size or
Channel.BUFFERED
(which is 64 by default and can be overridden by setting thekotlinx.coroutines.channels.defaultBuffer
system property in JVM). - Rendezvous1 (default) - channel with capacity
0
orChannel.RENDEZVOUS
(which is equal to0
), meaning that an exchange can happen only if sender and receiver meet (so it is like a book exchange spot, instead of a bookshelf). - Conflated - channel with capacity
Channel.CONFLATED
which has a buffer of size 1, and each new element replaces the previous one.
Let's now see these capacities in action. We can set them directly on Channel
, but we can also set them when we call the produce
function.
We will make our producer fast and our receiver slow. With unlimited capacity, the channel should accept all the elements and then let them be received one after another.
With a capacity of concrete size, we will first produce until the buffer is full, after which the producer will need to start waiting for the receiver.
With a channel of default (or Channel.RENDEZVOUS
) capacity, the producer will always wait for a receiver.
Finally, we will not be storing past elements when using the Channel.CONFLATED
capacity. New elements will replace the previous ones, so we will be able to receive only the last one, therefore we lose elements that were sent earlier.
On buffer overflow
To customize channels further, we can control what happens when the buffer is full (onBufferOverflow
parameter). There are the following options:
SUSPEND
(default) - when the buffer is full, suspend on thesend
method.DROP_OLDEST
- when the buffer is full, drop the oldest element.DROP_LATEST
- when the buffer is full, drop the latest element.
As you might guess, the channel capacity Channel.CONFLATED
is the same as setting the capacity to 1 and onBufferOverflow
to DROP_OLDEST
. Currently, the produce
function does not allow us to set custom onBufferOverflow
, so to set it we need to define a channel using the function Channel
2.
On undelivered element handler
One more Channel
function parameter which we should know about is onUndeliveredElement
. It is called when an element couldn't be handled for some reason. Most often this means that a channel was closed or cancelled, but it might also happen when send
, receive
, receiveOrNull
, or hasNext
throw an error. We generally use it to close resources that are sent by this channel.
Fan-out
Multiple coroutines can receive from a single channel; however, to receive them properly we should use a for-loop (consumeEach
should not be used by multiple coroutines).
The elements are distributed fairly. The channel has a FIFO (first-in-first-out) queue of coroutines waiting for an element. This is why in the above example you can see that the elements are received by the next coroutines (0, 1, 2, 0, 1, 2, etc).
To better understand why, imagine kids in a kindergarten queuing for candies. Once they get some, they immediately eat them and go to the last position in the queue. Such distribution is fair (assuming the number of candies is a multiple of the number of kids, and assuming their parents are fine with their children eating candies).
Fan-in
Multiple coroutines can send to a single channel. In the below example, you can see two coroutines sending elements to the same channel.
Sometimes, we need to merge multiple channels into one. For that, you might find the following function useful as it merges multiple channels using the produce
function:
Pipelines
Sometimes we set two channels such that one produces elements based on those received from another. In such cases, we call it a pipeline.
Practical usage
Channels are useful when different coroutines need to communicate with each other. They guarantee no conflicts (i.e., no problem with the shared state) and fairness. A typical case in which we use channels is when values are produced on one side, and we want to process them on the other side. Channels are used to separate producers and consumers of data. For example, a sender might be a network listener, while a receiver might be a database writer, or a sender might be a UI event listener, while a receiver might be a UI updater. One example might be an application like SkyScanner, which searches for the cheapest flights by querying multiple airline websites. It could use a channel to receive flight offers from different airlines and then send them to the user as they arrive. However, in most of these cases it's better to use channelFlow
or callbackFlow
, both of which are a hybrid of Channel and Flow (we will explain them in the Flow building chapter).
In their pure form, channels are often used as a queue of events that are processed one after another by one or more coroutines.
In some applications, whole pipelines are built using channels. For example, let's say that we are maintaining an online shop, like Amazon. Let's say that your service receives a big number of seller changes that might affect their offers. For each change, we need to first find a list of offers to update, and then we update them one after another.
Doing this the traditional way wouldn't be optimal. One seller might even have hundreds of thousands of offers. Doing it all in a single long process is not the best idea.
First, either an internal exception or a server restart might leave us not knowing where we stopped. Second, one big seller might block the server for a long time, thus leaving small sellers waiting for their changes to apply. Moreover, we should not send too many network requests at the same time so as to not overload the service that needs to handle them (and our network interface).
One solution to this problem is to set up a pipeline using channels. The first channel could contain the sellers to process, while the second one would contain the offers to be updated. These channels would have a buffer. The buffer in the second one could prevent our service from getting more offers when too many are already waiting. Thus, our server would be able to balance the number of offers we are updating at the same time.
We might also easily add functionalities to our pipeline, like:
- limiting queue sizes (to not let them grow too much);
- defining the number of coroutines that listen to on each channel (to decide how many concurrent requests to the external service we wish to make);
- removing duplicates from our queues (to not update the same offer twice, if seller changes are sent too quickly);
- adding persistence (to not lose the state of the pipeline when the server is restarted).
Summary
Channel is a powerful inter-coroutine communication primitive. It supports any number of senders and receivers, and every value that is sent to a channel is received once. We often create a channel using the produce
builder, and observe a channel using for-loop. Channels can be used to set up a pipeline where we control the number of coroutines working on some tasks. Nowadays, we most often use channels in connection with Flow, which will be presented later in the book.
The origin is the French word "rendez-vous", which commonly means “appointment”. This beautiful word has crossed borders: in English, there is the less popular word "rendezvous"; in Polish, there is the word "randka", which means a date (a romantic appointment).
Channel
is an interface, so Channel()
is a call of a function that is pretending to be a constructor.
The consumeEach
function uses a for-loop under the hood, but it also cancels the channel once it has consumed all its elements (so, once it is closed).