Kotlin Coroutines dispatchers

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

An important functionality that the Kotlin Coroutines library offers is letting us decide on what thread (or pool of threads) a coroutine should be running (starting and resuming). This is done using dispatchers.

In the English dictionary, a dispatcher is defined as "a person who is responsible for sending out people or vehicles where they are needed, especially emergency vehicles". In Kotlin coroutines, it is a CoroutineContext, that determines on which thread a certain coroutine will run.

Default dispatcher

If you don't set any dispatcher, the one chosen by default is Dispatchers.Default. It is designed to run CPU-intensive operations. It has a pool of threads with a size equal to the number of cores on the machine your code is running on (but not less than two). At least theoretically, this is the optimal number of threads, assuming you are using these threads efficiently - making CPU-intensive calculations and not blocking them.

To see this dispatcher in action, run the following code:

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.random.Random //sampleStart suspend fun main() = coroutineScope { repeat(1000) { launch { // or launch(Dispatchers.Default) { // To make it busy List(1000) { Random.nextLong() }.maxOrNull() val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } //sampleEnd

Example result on my machine (I got 12 cores, so there are 12 threads on the pool):

Running on thread: DefaultDispatcher-worker-1
Running on thread: DefaultDispatcher-worker-5
Running on thread: DefaultDispatcher-worker-7
Running on thread: DefaultDispatcher-worker-6
Running on thread: DefaultDispatcher-worker-11
Running on thread: DefaultDispatcher-worker-2
Running on thread: DefaultDispatcher-worker-10
Running on thread: DefaultDispatcher-worker-4
...

Warning: runBlocking is setting its own dispatcher if no other is set, so inside it, the Dispatcher.Default is not the one chosen automatically. So if we used runBlocking instead of coroutineScope in the above example, all coroutines would be running on "main".

Main dispatcher

Android and many other application frameworks have a concept of a main or UI thread. That is generally the most important thread. On Android, it is the only one that can be used to interact with the UI. Because of that, it needs to be used very often, but also with great care. When the Main thread is blocked, the whole application is frozen. To run a coroutine on the Main thread, we use Dispatchers.Main.

Dispatchers.Main is available on Android if we use the kotlinx-coroutines-android artifact. Similarly, on JavaFX if we use kotlinx-coroutines-javafx, and on Swing if we use kotlinx-coroutines-swing. There are probably some other libraries that set it. If you do not have a dependency that configures the main dispatcher, it is not present and cannot be used.

For unit testing, you can set another dispatcher instead of this one using Dispatchers.setMain(dispatcher) from kotlinx-coroutines-test.

class SomeTest { private val dispatcher = Executors .newSingleThreadExecutor() .asCoroutineDispatcher() @Before fun setup() { Dispatchers.setMain(dispatcher) } @After fun tearDown() { // reset main dispatcher to // the original Main dispatcher Dispatchers.resetMain() dispatcher.close() } @Test fun testSomeUI() = runBlocking { launch(Dispatchers.Main) { // ... } } }

The typical practice on Android is that this dispatcher is used as the default one. If you use libraries that are suspending instead of blocking, and you don't do any complex calculations, you can practically use only Dispatchers.Main. If you do some CPU-intensive operations, you should run them on the Dispatchers.Default. Those two are enough for many applications, but what if you need to block the thread? For instance, if you need to perform long I/O operations (e.g. read big files) or if you need to use some library with blocking functions. You cannot block the Main thread, because your application would freeze. If you block your default dispatcher, you could risk blocking all the threads in the thread pool. Then you wouldn't be able to do any calculations. That is why we need a dispatcher for such a situation, and it is Dispatchers.IO.

IO dispatcher

Dispatchers.IO is designed to be used when we block threads with longer I/O operations. For instance, when we read files, shared preferences, or call blocking functions. This dispatcher also has a pool of threads, but it is much bigger. Additional threads in this pool are created and shut down on demand. The number of threads used by tasks in this dispatcher is limited by the value of the "kotlinx.coroutines.io.parallelism" ([IO_PARALLELISM_PROPERTY_NAME]) system property. It defaults to the limit of 64 threads (or the number of cores if their number is larger).

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch //sampleStart suspend fun main() = coroutineScope { repeat(1000) { launch(Dispatchers.IO) { Thread.sleep(200) val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } // Running on thread: DefaultDispatcher-worker-1 //... // Running on thread: DefaultDispatcher-worker-53 // Running on thread: DefaultDispatcher-worker-14 //sampleEnd

This dispatcher shares threads with a Dispatchers.Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread.

import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { println(Thread.currentThread().name) withContext(Dispatchers.IO) { println(Thread.currentThread().name) } } } // DefaultDispatcher-worker-2 // DefaultDispatcher-worker-2

As a result of the thread sharing, more than 64 (default parallelism) threads can be created (but not used) during operations over the IO dispatcher (64 plus the number of threads on the default dispatcher). Although Dispatchers.IO will never use more threads than its limit allows, so Dispatchers.Default is guaranteed to have enough threads available.

In our applications, we should rarely block threads, and so this dispatcher should not be used often. There is no need to use it when we call functions that are suspending instead of blocking. We mainly use them, when we need to block threads - for instance we do operations on files, or we use blocking libraries.

class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { override suspend fun getUser(): UserData = withContext(Dispatchers.IO) { UserData(discReader.read("userName")) } }

Dispatcher with a pool of threads

There is a different situation if blocking threads might often happen. For instance, if we implement a library that uses blocking calls. If all the libraries would use IO dispatcher, those 64 threads (most likely) might all be blocked. So the best practice for libraries that are (probably) intensively blocking threads, is to define their own dispatcher with their independent pools of threads. We can do that in the following way:

val NUMBER_OF_THREADS = 20 val dispatcher = Executors .newFixedThreadPool(NUMBER_OF_THREADS) .asCoroutineDispatcher()

Such a dispatcher is used when our code might intensively block threads. We might still have all those threads blocked, but at least those are different threads from those used by Dispatchers.IO and Dispatchers.Default. Our calls might be waiting for one another, however, the rest of the application is not affected.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import java.util.concurrent.Executors //sampleStart suspend fun main() = coroutineScope { val dispatcher = Executors.newFixedThreadPool(5) .asCoroutineDispatcher() repeat(1000) { launch(dispatcher) { Thread.sleep(200) val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } dispatcher.close() // Dispatcher created with // asCoroutineDispatcher needs to be closed. // Without it, program will run forever. } // Running on thread: pool-1-thread-1 // Running on thread: pool-1-thread-2 // Running on thread: pool-1-thread-4 // Running on thread: pool-1-thread-5 // Running on thread: pool-1-thread-3 // Running on thread: pool-1-thread-1 // Running on thread: pool-1-thread-2 // ... //sampleEnd

The biggest problem with this approach is that a dispatcher created with ExecutorService.asCoroutineDispatcher() needs to be closed with the close function. Developers often forget about it, which leads to leaking threads. Another problem is that when you create a fixed pool of threads, you are not using them efficiently. You will keep unused threads alive, and without sharing them with other services. This approach is useful when we massively block threads, but in many cases we have a better one: dispatchers with limited parallelism.

Limited parallelism

Since Kotlin Coroutines 1.6, we have an option to transform a dispatcher into one with limited parallelism. This operation limits the number of threads that can be used at the same time. For instance, in some repositories, we might use Dispatchers.IO threads, but no more than 5 at the same time.

class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { private val dispatcher = Dispatchers.IO .limitedParallelism(5) override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } }

Such a dispatcher properly limits the number of threads we can use. Thanks to that we are both sure that we will not block too many threads from the Dispatchers.IO pool, and we know that there will be no more than 5 concurrent reads. It is a very resource-efficient approach, because we will share threads with other services, and limit unnecessary redispatching. Finally, it is easy to use, because we do not need to close this dispatcher.

The only problem is that in applications that extensively depend on blocking API, the default number of threads in the IO dispatcher might not be enough. In properly implemented applications with reasonably distributed thread limits, it should only be an issue if the application meets with a huge number of requests. In such cases, consider changing the number of threads using the system property (as explained in IO dispatcher).

Dispatcher limited to a single thread

For all the dispatchers using multiple threads, we need to consider the problem of sharing the state. Notice that in the example below, 10 000 coroutines are increasing i by 1. So its value should be 10 000, but it is a smaller number. This is a result of shared state (i property) modification on multiple threads at the same time.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { repeat(10_000) { launch(Dispatchers.IO) { // or Default i++ } } delay(1000) println(i) // ~9930 } //sampleEnd

There are many ways how this problem can be solved (most will be described in the chapter The problem with state), but one of the options is to use a dispatcher with just a single thread. If we use just a single thread at a time, we do not need any other synchronization. The classic way to do that was to create such a dispatcher using Executors.

val dispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() // previously: // val dispatcher = newSingleThreadContext("My name")

The problem is that such a dispatcher keeps an extra thread active, and it needs to be closed when it is not used anymore. A modern solution is to use Dispatchers.Default or Dispatchers.IO (if we block threads) with parallelism limited to 1.

import kotlinx.coroutines.* //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { val dispatcher = Dispatchers.Default .limitedParallelism(1) repeat(10000) { launch(dispatcher) { i++ } } delay(1000) println(i) // 10000 } //sampleEnd

The biggest disadvantage is that due to having only one thread, if we block it, our calls will be handled sequentially.

import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun main(): Unit = coroutineScope { val dispatcher = Dispatchers.Default .limitedParallelism(1) val job = Job() repeat(5) { launch(dispatcher + job) { Thread.sleep(1000) } } job.complete() val time = measureTimeMillis { job.join() } println("Took $time") // Took 5006 }

Unconfined dispatcher

The last dispatcher we need to discuss is Dispatchers.Unconfined. This dispatcher is different from the previous one, as it is not changing a thread at all. When it is started, it runs on the thread on which it was started. If it is resumed, it runs on the thread that resumed it.

import kotlinx.coroutines.* import kotlin.coroutines.* //sampleStart suspend fun main(): Unit = withContext(newSingleThreadContext("Name1")) { var continuation: Continuation<Unit>? = null launch(newSingleThreadContext("Name2")) { delay(1000) continuation?.resume(Unit) } launch(Dispatchers.Unconfined) { println(Thread.currentThread().name) // Name1 suspendCoroutine<Unit> { continuation = it } println(Thread.currentThread().name) // Name2 delay(1000) println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor // (used by delay) } } //sampleEnd

It is sometimes useful for unit testing. Imagine that you need to test a function that calls launch. Synchronizing the time might not be easy. One solution is to use Dispatchers.Unconfined instead of all other dispatchers. If it is used in all scopes, everything runs on the same thread, and we can more easily control what is the order of operations. This trick is not needed if we use runTest from kotlinx-coroutines-test. We will discuss it later in the book.

From the performance point of view, this dispatcher is the cheapest, as it never requires thread switching. So we might choose it if we do not care at all on which thread our code is running. In practice, it is not considered good to use it so recklessly. What if by accident we miss a blocking call, and we are running on the Main thread on Android? This could lead to blocking the entire application.

Immediate main dispatching

Dispatching a coroutine costs something. When withContext is called, the coroutine needs to be suspended and resumed, and on the way we check if the thread we are looking for can be used. This is a small cost, but unnecessary if we already are on this thread. Think of the below function:

suspend fun showUser(user: User) = withContext(Dispatchers.Main) { userNameElement.text = user.name // ... }

If this function would be already called on the main dispatcher, we would have an unnecessary cost of re-dispatching. What is more, if there would be a long queue for the Main thread, because of withContext, the user data might be shown with some delay (they would need to wait until other coroutines do their job first). To prevent that, there is Dispatchers.Main.immediate, which dispatches only if it is needed. So if the below function is called on the Main thread, it won't be re-dispatched, it will be called immediately.

suspend fun showUser(user: User) = withContext(Dispatchers.Main.immediate) { userNameElement.text = user.name // ... }

We prefer Dispatchers.Main.immediate as the withContext argument, whenever the function might be called from the main dispatcher already. Currently, the other dispatchers do not support immediate dispatching.

Continuation interceptor

Dispatching works based on the mechanism of continuation interception, which is built into the language. There is a coroutine context named ContinuationInterceptor, whose method interceptContinuation is used to modify a continuation when a coroutine is suspended (​​It caches the resulting continuation, so the wrapping of the continuation will not happen again if we use the same dispatcher). It also has a method releaseInterceptedContinuation that is called when a continuation is resumed or ended.

public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> fun <T> interceptContinuation( continuation: Continuation<T> ): Continuation<T> fun releaseInterceptedContinuation( continuation: Continuation<*> ) { } //... }

The capability to wrap a continuation gives a lot of control. Dispatchers use interceptContinuation to wrap a continuation with DispatchedContinuation, which runs on a specific pool of threads. This is how dispatchers work.

The problem is, that the same context is also used by many testing libraries, for instance by runBlockingTest from kotlinx-coroutines-test. So we have a conflict - there can be only one element with some key in a context. This is why we might need to inject dispatchers, to replace them with test scope context in unit tests. We will get back to this topic in the chapter dedicated to coroutines testing.

class DiscUserRepository( private val discReader: DiscReader, private val dispatcher: CoroutineContext = Dispatchers.IO, ) : UserRepository { override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } } class UserReaderTests { @Test fun `some test`() = runTest { // given val discReader = FakeDiscReader() val repo = DiscUserRepository( discReader, // one of coroutines testing practices this.coroutineContext[ContinuationInterceptor]!! ) //... } }

Performance of dispatchers against different tasks

To show how different dispatchers perform against different tasks, I made some benchmarks. In all the cases, the task is to run 100 independent coroutines with the same task. Different columns represent different tasks: suspending for a second, blocking for a second, two CPU-intensive operations, one operation that is memory-intensive (where the majority of time is spent on accessing, allocating, and freeing memory). Different rows represent different dispatchers used for running those coroutines. The table shows the average execution time in milliseconds.

SuspendingBlockingCPU 1CPU 2Memory
Single thread1 002100 00339 10366 18494 358
Default (8 threads)1 00213 0038 47315 22221 461
IO (64 threads)1 0022 0039 89315 53920 776
100 threads1 0021 00316 37916 07821 004

There are a few important observations you can make:

  1. When we are just suspending, it doesn't really matter how many threads we are using.
  2. When we are blocking, the more threads we are using, the faster all those coroutines will be finished.
  3. With CPU-intensive operations, Dispatchers.Default is the best option2.
  4. If we are dealing with a memory-intensive problem, more threads might lead to some (but not significant) improvement.

Here is how the tested functions look1:

fun cpu1(order: Order): Coffee { var i = Int.MAX_VALUE while (i > 0) { i -= if (i % 2 == 0) 1 else 2 } return Coffee(order.copy(customer = order.customer + i)) } fun cpu2(order: Order): Coffee { var isPrime = true for (numberToCheck in 1..23774) { isPrime = true for (i in 1..numberToCheck) { if (numberToCheck % i == 0) isPrime = false } } return Coffee( order.copy(customer = order.customer + isPrime) ) } fun memory(order: Order): Coffee { val list = List(1_000) { it } val list2 = List(1_000) { list } val list3 = List(1_000) { list2 } return Coffee( order.copy( customer = order.customer + list3.hashCode() ) ) } fun blocking(order: Order): Coffee { Thread.sleep(1000) return Coffee(order) } suspend fun suspending(order: Order): Coffee { delay(1000) return Coffee(order) }

Summary

Dispatchers determine which thread or thread pool a coroutine will be running (starting and resuming). The most important options are:

  • Dispatchers.Default, that we use for CPU-intensive operations;
  • Dispatchers.Main, that we use to access the Main thread on Android, Swing, or JavaFX;
  • Dispatchers.Main.immediate, runs on the same thread as Dispatchers.Main, but is not re-dispatched if it is not necessary;
  • Dispatchers.IO, that we use when we need to do some blocking operations;
  • Dispatchers.IO with limited parallelism or a custom dispatcher with a pool of threads, that we use when we might have a big number of blocking calls;
  • Dispatchers.Default or Dispatchers.IO with parallelism limited to 1, or a custom dispatcher with a single thread, that is used when we need to secure shared state modifications;
  • Dispatchers.Unconfined, that we use when we want to have no thread switching at all.
1:

The whole code can be found on https://bit.ly/3vpky9F

2:

Dispatchers.IO has a similar result as Dispatchers.Default, but it should not be used for CPU-intensive operations, because it is used for blocking operations, and some other process might block all its threads. I am not sure why a dispatcher with 100 threads is so much slower for CPU 1, and not for CPU 2, but multiple benchmarks on different machines confirm that.