article banner (priority)

Kotlin Coroutines dispatchers

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

An important functionality offered by the Kotlin Coroutines library is letting us decide on which thread (or pool of threads) a coroutine should be running (starting and resuming). This is done using dispatchers.

In the English dictionary, a dispatcher is defined as "a person who is responsible for sending people or vehicles to where they are needed, especially emergency vehicles". In Kotlin coroutines, CoroutineContext determines on which thread a certain coroutine will run.

Dispatchers in Kotlin Coroutines are a similar concept to RxJava Schedulers.

Default dispatcher

If you don't set any dispatcher, the one chosen by default is Dispatchers.Default, which is designed to run CPU-intensive operations. It has a pool of threads with a size equal to the number of cores in the machine your code is running on (but not less than two). At least theoretically, this is the optimal number of threads, assuming you are using these threads efficiently, i.e., performing CPU-intensive calculations and not blocking them.

To see this dispatcher in action, run the following code:

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.random.Random //sampleStart suspend fun main() = coroutineScope { repeat(1000) { launch { // or launch(Dispatchers.Default) { // To make it busy List(1000) { Random.nextLong() }.maxOrNull() val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } //sampleEnd

Example result on my machine (I have 12 cores, so there are 12 threads in the pool):

Running on thread: DefaultDispatcher-worker-1
Running on thread: DefaultDispatcher-worker-5
Running on thread: DefaultDispatcher-worker-7
Running on thread: DefaultDispatcher-worker-6
Running on thread: DefaultDispatcher-worker-11
Running on thread: DefaultDispatcher-worker-2
Running on thread: DefaultDispatcher-worker-10
Running on thread: DefaultDispatcher-worker-4
...

Warning: runBlocking sets its own dispatcher if no other one is set; so, inside it, the Dispatcher.Default is not the one that is chosen automatically. So, if we used runBlocking instead of coroutineScope in the above example, all coroutines would be running on "main".

Limiting the default dispatcher

Let's say that you have an expensive process, and you suspect that it might use all Dispatchers.Default threads and starve other coroutines using the same dispatcher. In such cases, we can use limitedParallelism on Dispatchers.Default to make a dispatcher that runs on the same threads but is limited to using not more than a certain number of them at the same time.

private val dispatcher = Dispatchers.Default .limitedParallelism(5)

This mechanism is used not to limit Dispatchers.Default but it is worth remembering it, because soon we will present limitedParallelism for Dispatchers.IO, which is much more important and common.

limitedParallelism was introduced in kotlinx-coroutines version 1.6.

Main dispatcher

Android and many other application frameworks have a concept of a main or UI thread, which is generally the most important thread. On Android, it is the only one that can be used to interact with the UI. Therefore, it needs to be used very often but also with great care. When the Main thread is blocked, the whole application is frozen. To run a coroutine on the Main thread, we use Dispatchers.Main.

Dispatchers.Main is available on Android if we use the kotlinx-coroutines-android artifact. Similarly, it’s available on JavaFX if we use kotlinx-coroutines-javafx, and on Swing if we use kotlinx-coroutines-swing. If you do not have a dependency that defines the main dispatcher, it is not available and cannot be used.

Notice that frontend libraries are typically not used in unit tests, so Dispatchers.Main is not defined there. To be able to use it, you need to set a dispatcher using Dispatchers.setMain(dispatcher) from kotlinx-coroutines-test.

class SomeTest { private val dispatcher = Executors .newSingleThreadExecutor() .asCoroutineDispatcher() @Before fun setup() { Dispatchers.setMain(dispatcher) } @After fun tearDown() { // reset main dispatcher to // the original Main dispatcher Dispatchers.resetMain() dispatcher.close() } @Test fun testSomeUI() = runBlocking { launch(Dispatchers.Main) { // ... } } }

On Android, we typically use the Main dispatcher as the default one. If you use libraries that are suspending instead of blocking, and you don't do any complex calculations, in practice you can often use only Dispatchers.Main. If you do some CPU-intensive operations, you should run them on Dispatchers.Default. These two are enough for many applications, but what if you need to block the thread? For instance, if you need to perform long I/O operations (e.g., read big files) or if you need to use a library with blocking functions. You cannot block the Main thread, because your application would freeze. If you block your default dispatcher, you risk blocking all the threads in the thread pool, in which case you wouldn't be able to do any calculations. This is why we need a dispatcher for such a situation, and it is Dispatchers.IO.

IO dispatcher

Dispatchers.IO is designed to be used when we block threads with I/O operations, for instance when we read/write files, use Android shared preferences, or call blocking functions. The code below takes around 1 second because Dispatchers.IO allows more than 50 active threads at the same time.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.system.measureTimeMillis suspend fun main() { val time = measureTimeMillis { coroutineScope { repeat(50) { launch(Dispatchers.IO) { Thread.sleep(1000) } } } } println(time) // ~1000 }

How does it work? Imagine an unlimited pool of threads. Initially it is empty, but as we need more threads, they are created and kept active until they are not used for some time. Such a pool exists, but it wouldn't be safe to use it directly. With too many active threads, the performance degrades in a slow but unlimited manner, eventually causing out-of-memory errors. This is why we create dispatchers that have a limited number of threads they can use at the same time. Dispatchers.Default is limited by the number of cores in your processor. The limit of Dispatchers.IO is 64 (or the number of cores if there are more).

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch //sampleStart suspend fun main() = coroutineScope { repeat(1000) { launch(Dispatchers.IO) { Thread.sleep(200) val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } // Running on thread: DefaultDispatcher-worker-1 //... // Running on thread: DefaultDispatcher-worker-53 // Running on thread: DefaultDispatcher-worker-14 //sampleEnd

As we mentioned, both Dispatchers.Default and Dispatchers.IO share the same pool of threads. This is an important optimization. Threads are reused, and often redispatching is not needed. For instance, let's say you are running on Dispatchers.Default and then execution reaches withContext(Dispatchers.IO) { ... }. Most often, you will stay on the same thread4, but what changes is that this thread counts not towards the Dispatchers.Default limit but towards the Dispatchers.IO limit. Their limits are independent, so they will never starve each other.

import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { println(Thread.currentThread().name) withContext(Dispatchers.IO) { println(Thread.currentThread().name) } } } // DefaultDispatcher-worker-2 // DefaultDispatcher-worker-2

To see this more clearly, imagine that you use both Dispatchers.Default and Dispatchers.IO to the maximum. As a result, your number of active threads will be the sum of their limits. If you allow 64 threads in Dispatchers.IO and you have 8 cores, you will have 72 active threads in the shared pool. This means we have efficient thread reuse and both dispatchers have strong independence.

The most typical case where we use Dispatchers.IO is when we need to call blocking functions from libraries. The best practice is to wrap them with withContext(Dispatchers.IO) to make them suspending functions. Such functions can be used without any special care: they can be treated like all other properly implemented suspending functions.

class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { override suspend fun getUser(): UserData = withContext(Dispatchers.IO) { UserData(discReader.read("userName")) } }

The only problem is when such functions are blocking too many threads. Dispatchers.IO is limited to 64. One service that is massively blocking threads might make all others wait for their turn. To help us deal with this, we again use limitedParallelism.

IO dispatcher with a custom pool of threads

Dispatchers.IO has a special behavior defined for the limitedParallelism function. It creates a new dispatcher with an independent pool of threads. What is more, this pool is not limited to 64 as we can decide to limit it to as many threads as we want.

For example, imagine you start 100 coroutines, each of which blocks a thread for a second. If you run these coroutines on Dispatchers.IO, it will take 2 seconds. If you run them on Dispatchers.IO with limitedParallelism set to 100 threads, it will take 1 second. Execution time for both dispatchers can be measured at the same time because the limits of these two dispatchers are independent anyway.

import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun main(): Unit = coroutineScope { launch { printCoroutinesTime(Dispatchers.IO) // Dispatchers.IO took: 2074 } launch { val dispatcher = Dispatchers.IO .limitedParallelism(100) printCoroutinesTime(dispatcher) // LimitedDispatcher@XXX took: 1082 } } suspend fun printCoroutinesTime( dispatcher: CoroutineDispatcher ) { val test = measureTimeMillis { coroutineScope { repeat(100) { launch(dispatcher) { Thread.sleep(1000) } } } } println("$dispatcher took: $test") }

Conceptually, you might imagine it in the following way:

// Dispatcher with an unlimited pool of threads
private val pool = ...

Dispatchers.IO = pool.limitedParallelism(64)
Dispatchers.IO.limitedParallelism(x) =
   pool.limitedParallelism(x)

limitedParallelism used on Dispatchers.Default makes a dispatcher with an additional limit. Using limitedParallelism on Dispatcher.IO makes a dispatcher independent of Dispatcher.IO. However, they all share the same infinite pool of threads.

The best practice for classes that might intensively block threads is to define their own dispatchers that have their own independent limits. How big should this limit be? You need to decide for yourself. Too many threads are an inefficient use of our resources. On the other hand, waiting for an available thread is not good for performance. What is most essential is that this limit is independent from Dispatcher.IO and other dispatchers' limits. Thanks to that, one service will not block another.

class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { private val dispatcher = Dispatchers.IO .limitParallelism(5) override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } }

Dispatcher with a fixed pool of threads

Some developers like to have more control over the pools of threads they use, and Java offers a powerful API for that. For example, we can create a fixed or cached pool of threads with the Executors class. These pools implement the ExecutorService or Executor interfaces, which we can transform into a dispatcher using the asCoroutineDispatcher function.

val NUMBER_OF_THREADS = 20 val dispatcher = Executors .newFixedThreadPool(NUMBER_OF_THREADS) .asCoroutineDispatcher()

limitedParallelism was introduced in kotlinx-coroutines version 1.6; in previous versions, we often created dispatchers with independent pools of threads using the Executors class.

The biggest problem with this approach is that a dispatcher created with ExecutorService.asCoroutineDispatcher() needs to be closed with the close function. Developers often forget about this, which leads to leaking threads. Another problem is that when you create a fixed pool of threads, you are not using them efficiently. You will keep unused threads alive without sharing them with other services.

Dispatcher limited to a single thread

For all dispatchers using multiple threads, we need to consider the shared state problem. Notice that in the example below 10,000 coroutines are increasing i by 1. So, its value should be 10,000, but it is a smaller number. This is a result of a shared state (i property) modification on multiple threads at the same time.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { repeat(10_000) { launch(Dispatchers.IO) { // or Default i++ } } delay(1000) println(i) // ~9930 } //sampleEnd

There are many ways to solve this problem (most will be described in the The problem with state chapter), but one option is to use a dispatcher with just a single thread. If we use just a single thread at a time, we do not need any other synchronization. The classic way to do this was to create such a dispatcher using Executors.

val dispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() // previously: // val dispatcher = newSingleThreadContext("My name")

The problem is that this dispatcher keeps an extra thread active, and it needs to be closed when it is not used anymore. A modern solution is to use Dispatchers.Default or Dispatchers.IO (if we block threads) with parallelism limited to 1.

import kotlinx.coroutines.* //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { val dispatcher = Dispatchers.Default .limitedParallelism(1) repeat(10000) { launch(dispatcher) { i++ } } delay(1000) println(i) // 10000 } //sampleEnd

The biggest disadvantage is that because we have only one thread, our calls will be handled sequentially if we block it.

import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun main(): Unit = coroutineScope { val dispatcher = Dispatchers.Default .limitedParallelism(1) val job = Job() repeat(5) { launch(dispatcher + job) { Thread.sleep(1000) } } job.complete() val time = measureTimeMillis { job.join() } println("Took $time") // Took 5006 }

Unconfined dispatcher

The last dispatcher we need to discuss is Dispatchers.Unconfined. This dispatcher is different from the previous one as it does not change any threads. When it is started, it runs on the thread on which it was started. If it is resumed, it runs on the thread that resumed it.

import kotlinx.coroutines.* import kotlin.coroutines.* //sampleStart suspend fun main(): Unit = withContext(newSingleThreadContext("Thread1")) { var continuation: Continuation<Unit>? = null launch(newSingleThreadContext("Thread2")) { delay(1000) continuation?.resume(Unit) } launch(Dispatchers.Unconfined) { println(Thread.currentThread().name) // Thread1 suspendCancellableCoroutine<Unit> { continuation = it } println(Thread.currentThread().name) // Thread2 delay(1000) println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor // (used by delay) } } //sampleEnd

This is sometimes useful for unit testing. Imagine that you need to test a function that calls launch. Synchronizing the time might not be easy. One solution is to use Dispatchers.Unconfined instead of all other dispatchers. If it is used in all scopes, everything runs on the same thread, and we can more easily control the order of operations. This trick is not needed if we use runTest from kotlinx-coroutines-test. We will discuss this later in the book.

From the performance point of view, this dispatcher is the cheapest as it never requires thread switching. So, we might choose it if we do not care at all on which thread our code is running. However, in practice, it is not considered good to use it so recklessly. What if by accident we miss a blocking call and we are running on the Main thread? This could lead to blocking the entire application.

Immediate main dispatching

There is a cost associated with dispatching a coroutine. When withContext is called, the coroutine needs to be suspended, possibly wait in a queue, and then resumed. This is a small but unnecessary cost if we are already on this thread. Look at the function below:

suspend fun showUser(user: User) = withContext(Dispatchers.Main) { userNameElement.text = user.name // ... }

If this function had already been called on the main dispatcher, we would have an unnecessary cost of re-dispatching. What is more, if there were a long queue for the Main thread because of withContext, the user data might be shown with some delay (this coroutine would need to wait for other coroutines to do their job first). To prevent this, there is Dispatchers.Main.immediate, which dispatches only if it is needed. So, if the function below is called on the Main thread, it won't be re-dispatched, it will be called immediately.

suspend fun showUser(user: User) = withContext(Dispatchers.Main.immediate) { userNameElement.text = user.name // ... }

We prefer Dispatchers.Main.immediate as the withContext argument whenever this function might have already been called from the main dispatcher. Currently, the other dispatchers do not support immediate dispatching.

Continuation interceptor

Dispatching works based on the mechanism of continuation interception, which is built into the Kotlin language. There is a coroutine context named ContinuationInterceptor, whose interceptContinuation method is used to modify a continuation when a coroutine is suspended3. It also has a releaseInterceptedContinuation method that is called when a continuation is ended.

public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> fun <T> interceptContinuation( continuation: Continuation<T> ): Continuation<T> fun releaseInterceptedContinuation( continuation: Continuation<*> ) { } //... }

The capability to wrap a continuation gives a lot of control. Dispatchers use interceptContinuation to wrap a continuation with DispatchedContinuation, which runs on a specific pool of threads. This is how dispatchers work.

The problem is that the same context is also used by many testing libraries, for instance by runTest from kotlinx-coroutines-test. Each element in a context has to have a unique key. This is why we sometimes inject dispatchers to replace them in unit tests with test dispatchers. We will get back to this topic in the chapter dedicated to coroutine testing.

class DiscUserRepository( private val discReader: DiscReader, private val dispatcher: CoroutineContext = Dispatchers.IO, ) : UserRepository { override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } } class UserReaderTests { @Test fun `some test`() = runTest { // given val discReader = FakeDiscReader() val repo = DiscUserRepository( discReader, // one of coroutines testing practices this.coroutineContext[ContinuationInterceptor]!! ) //... } }

Performance of dispatchers against different tasks

To show how different dispatchers perform against different tasks, I made some benchmarks. In all these cases, the task is to run 100 independent coroutines with the same task. Different columns represent different tasks: suspending for a second, blocking for a second, CPU-intensive operation, and memory-intensive operation (where the majority of the time is spent on accessing, allocating, and freeing memory). Different rows represent the different dispatchers used for running these coroutines. The table below shows the average execution time in milliseconds.

SuspendingBlockingCPUMemory
Single thread1 002100 00339 10394 358
Default (8 threads)1 00213 0038 47321 461
IO (64 threads)1 0022 0039 89320 776
100 threads1 0021 00316 37921 004

There are a few important observations you can make:

  • When we are just suspending, it doesn't really matter how many threads we are using.
  • When we are blocking, the more threads we are using, the faster all these coroutines will be finished.
  • With CPU-intensive operations, Dispatchers.Default is the best option2.
  • If we are dealing with a memory-intensive problem, more threads might provide some (but not a significant) improvement.

Here is how the tested functions look1:

fun cpu(order: Order): Coffee { var i = Int.MAX_VALUE while (i > 0) { i -= if (i % 2 == 0) 1 else 2 } return Coffee(order.copy(customer = order.customer + i)) } fun memory(order: Order): Coffee { val list = List(1_000) { it } val list2 = List(1_000) { list } val list3 = List(1_000) { list2 } return Coffee( order.copy( customer = order.customer + list3.hashCode() ) ) } fun blocking(order: Order): Coffee { Thread.sleep(1000) return Coffee(order) } suspend fun suspending(order: Order): Coffee { delay(1000) return Coffee(order) }

Summary

Dispatchers determine on which thread or thread pool a coroutine will be running (starting and resuming). The most important options are:

  • Dispatchers.Default, which we use for CPU-intensive operations;
  • Dispatchers.Main, which we use to access the Main thread on Android, Swing, or JavaFX;
  • Dispatchers.Main.immediate, which runs on the same thread as Dispatchers.Main but is not re-dispatched if it is not necessary;
  • Dispatchers.IO, which we use when we need to do some blocking operations;
  • Dispatchers.IO with limited parallelism or a custom dispatcher with a pool of threads, which we use when we might have a big number of blocking calls;
  • Dispatchers.Default or Dispatchers.IO with parallelism limited to 1, or a custom dispatcher with a single thread, which is used when we need to secure shared state modifications;
  • Dispatchers.Unconfined, which we use when we do not care where the coroutine will be executed.
1:

The whole code can be found on https://bit.ly/3vqMpYf

2:

The main reason is that the more threads we use, the more time the processor needs to spend on switching between them, and so it has less time to do meaningful operations. Also Dispatchers.IO should not be used for CPU-intensive operations because it is used to block operations, and some other process might block all its threads.

3:

Wrapping needs to happen only once per continuation thanks to the caching mechanism.

4:

This mechanism is not deterministic.