Channel in Kotlin Coroutines
The Channel API was added as an inter-coroutine communication primitive. Many imagine a channel as a pipe, but I prefer a different metaphor. Are you familiar with public bookcases for exchanging books? One person needs to bring a book for another person to find it. This is very similar to how
Channel supports any number of senders and receivers, and every value that is sent to a channel is received only once.
Channel is an interface that implements two other interfaces:
SendChannel, which is used to send elements (adding elements) and to close the channel;
ReceiveChannel, which receives (or takes)the elements.
Thanks to this distinction, we can expose just
SendChannel in order to restrict the channel entry points.
You might notice that both
receive are suspending functions. This is an essential feature:
- When we try to
receiveand there are no elements in the channel, the coroutine is suspended until the element is available. Like in our metaphorical bookshelf, when someone goes to the shelf to find a book but the bookshelf is empty, that person needs to suspend until someone puts an element there.
- On the other hand,
sendwill be suspended when the channel reaches its capacity. We will soon see that most channels have limited capacity. Like in our metaphorical bookshelf, when someone tries to put a book on a shelf but it is full, that person needs to suspend until someone takes a book and makes space.
If you need to send or receive from a non-suspending function, you can use
tryReceive. Both operations are immediate and return
ChannelResult, which contains information about the success or failure of the operation, as well as its result. Use
tryReceiveonly for channels with limited capacity because they will not work for the rendezvous channel.
A channel might have any number of senders and receivers. However, the most common situation is when there is one coroutine on both sides of the channel.
To see the simplest example of a channel, we need to have a producer (sender) and a consumer (receiver) in separate coroutines. The producer will send elements, and the consumer will receive them. This is how it can be implemented:
Such an implementation is far from perfect. First, the receiver needs to know how many elements will be sent; however, this is rarely the case, so we would prefer to listen for as long as the sender is willing to send. To receive elements on the channel until it is closed, we could use a for-loop or
The common problem with this way of sending elements is that it is easy to forget to close the channel, especially in the case of exceptions. If one coroutine stops producing because of an exception, the other will wait for elements forever. It is much more convenient to use the
produce function, which is a coroutine builder that returns
produce function closes the channel whenever the builder coroutine ends in any way (finished, stopped, cancelled). Thanks to this, we will never forget to call
produce builder is a very popular way to create a channel, and for good reason: it offers a lot of safety and convenience.
Depending on the capacity size we set, we distinguish four types of channels:
- Unlimited - channel with capacity
Channel.UNLIMITEDthat has an unlimited capacity buffer, and
- Buffered - channel with concrete capacity size or
Channel.BUFFERED(which is 64 by default and can be overridden by setting the
kotlinx.coroutines.channels.defaultBuffersystem property in JVM).
- Rendezvous1 (default) - channel with capacity 0 or
Channel.RENDEZVOUS(which is equal to
0), meaning that an exchange can happen only if sender and receiver meet (so it is like a book exchange spot, instead of a bookshelf).
- Conflated - channel with capacity
Channel.CONFLATEDwhich has a buffer of size 1, and each new element replaces the previous one.
Let's now see these capacities in action. We can set them directly on
Channel, but we can also set them when we call the
We will make our producer fast and our receiver slow. With unlimited capacity, the channel should accept all the elements and then let them be received one after another.
With a capacity of concrete size, we will first produce until the buffer is full, after which the producer will need to start waiting for the receiver.
With a channel of default (or
Channel.RENDEZVOUS) capacity, the producer will always wait for a receiver.
Finally, we will not be storing past elements when using the
Channel.CONFLATED capacity. New elements will replace the previous ones, so we will be able to receive only the last one, therefore we lose elements that were sent earlier.
On buffer overflow
To customize channels further, we can control what happens when the buffer is full (
onBufferOverflow parameter). There are the following options:
SUSPEND(default) - when the buffer is full, suspend on the
DROP_OLDEST- when the buffer is full, drop the oldest element.
DROP_LATEST- when the buffer is full, drop the latest element.
As you might guess, the channel capacity
Channel.CONFLATED is the same as setting the capacity to 1 and
DROP_OLDEST. Currently, the
produce function does not allow us to set custom
onBufferOverflow, so to set it we need to define a channel using the function
On undelivered element handler
Channel function parameter which we should know about is
onUndeliveredElement. It is called when an element couldn't be handled for some reason. Most often this means that a channel was closed or cancelled, but it might also happen when
hasNext throw an error. We generally use it to close resources that are sent by this channel.
Multiple coroutines can receive from a single channel; however, to receive them properly we should use a for-loop (
consumeEach is not safe to use from multiple coroutines).
The elements are distributed fairly. The channel has a FIFO (first-in-first-out) queue of coroutines waiting for an element. This is why in the above example you can see that the elements are received by the next coroutines (0, 1, 2, 0, 1, 2, etc).
To better understand why, imagine kids in a kindergarten queuing for candies. Once they get some, they immediately eat them and go to the last position in the queue. Such distribution is fair (assuming the number of candies is a multiple of the number of kids, and assuming their parents are fine with their children eating candies).
Multiple coroutines can send to a single channel. In the below example, you can see two coroutines sending elements to the same channel.
Sometimes, we need to merge multiple channels into one. For that, you might find the following function useful as it merges multiple channels using the
Sometimes we set two channels such that one produces elements based on those received from another. In such cases, we call it a pipeline.
Channels as a communication primitive
Channels are useful when different coroutines need to communicate with each other. They guarantee no conflicts (i.e., no problem with the shared state) and fairness.
To see them in action, imagine that different baristas are making coffees. Each barista should be a separate coroutine working independently. Different coffee types take different amounts of time to prepare, but we want to handle orders in the order they appear. The easiest way to solve this problem is by sending both the orders and the resulting coffees in channels. A barista can be defined using the
When we set up a pipeline, we can use the previously defined
fanIn function to merge the results produced by the different baristas into one:
You will find more practical examples in the next chapter.
A typical case in which we use channels is when values are produced on one side, and we want to process them on the other side. Examples include responding to user clicks, new notifications from a server, or updating search results over time (a good example is SkyScanner, which searches for the cheapest flights by querying multiple airline websites). However, in most of these cases it's better to use
callbackFlow, both of which are a hybrid of Channel and Flow (we will explain them in the Flow building chapter).
In their pure form, I find channels useful in some more complex processing cases. For example, let's say that we are maintaining an online shop, like Amazon. Let's say that your service receives a big number of seller changes that might affect their offers. For each change, we need to first find a list of offers to update, and then we update them one after another.
Doing this the traditional way wouldn't be optimal. One seller might even have hundreds of thousands of offers. Doing it all in a single long process is not the best idea.
First, either an internal exception or a server restart might leave us not knowing where we stopped. Second, one big seller might block the server for a long time, thus leaving small sellers waiting for their changes to apply. Moreover, we should not send too many network requests at the same time so as to not overload the service that needs to handle them (and our network interface).
The solution to this problem might be to set up a pipeline. The first channel could contain the sellers to process, while the second one would contain the offers to be updated. These channels would have a buffer. The buffer in the second one could prevent our service from getting more offers when too many are already waiting. Thus, our server would be able to balance the number of offers we are updating at the same time.
We might also easily add some intermediate steps, like removing duplicates. By defining the number of coroutines that listen on each channel, we decide how many concurrent requests to the external service we wish to make. Manipulating these parameters gives us a lot of freedom. There are also many improvements that can be added quite easily, like persistence (for cases where the server is restarted) or element uniqueness (for cases where the seller made another change before the previous one was processed).
Channel is a powerful inter-coroutine communication primitive. It supports any number of senders and receivers, and every value that is sent to a channel is received once. We often create a channel using the
produce builder. Channels can also be used to set up a pipeline where we control the number of coroutines working on some tasks. Nowadays, we most often use channels in connection with Flow, which will be presented later in the book.
The origin is the French word "rendez-vous", which commonly means “appointment”. This beautiful word has crossed borders: in English, there is the less popular word "rendezvous"; in Polish, there is the word "randka", which means a date (a romantic appointment).
Channel is an interface, so
Channel() is a call of a function that is pretending to be a constructor.
consumeEach function uses a for-loop under the hood, but it also cancels the channel once it has consumed all its elements (so, once it is closed).