Kotlin Coroutines dispatchers

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

An important functionality that Kotlin Coroutines library offers is letting us decide on what thread (or pool of threads) a coroutine should be running (starting and resuming). This is done using dispatchers.

In the English dictionary, a dispatcher is defined as "a person who is responsible for sending out people or vehicles where they are needed, especially emergency vehicles". In Kotlin coroutines, it is a CoroutineContext, that determines on which thread a certain coroutine will run.

Default dispatcher

If you don't set any dispatcher, the one chosen by default is Dispatchers.Default. It is designed to run CPU-intensive operations. It has a pool of threads with a size equal to the number of cores on the machine your code is running on (but not less than two). At least theoretically, this is the optimal number of threads, assuming you are using these threads efficiently - making CPU-intensive calculations and not blocking them.

To see this dispatcher in action, run the following code:

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.random.Random //sampleStart suspend fun main() = coroutineScope { repeat(1000) { launch { // or launch(Dispatchers.Default) { // To make it busy List(1000) { Random.nextLong() }.maxOrNull() val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } //sampleEnd

Example result on my machine (I got 12 cores, so there are 12 threads on the pool):

Running on thread: DefaultDispatcher-worker-1
Running on thread: DefaultDispatcher-worker-5
Running on thread: DefaultDispatcher-worker-7
Running on thread: DefaultDispatcher-worker-6
Running on thread: DefaultDispatcher-worker-11
Running on thread: DefaultDispatcher-worker-2
Running on thread: DefaultDispatcher-worker-10
Running on thread: DefaultDispatcher-worker-4
...

Warning: runBlocking is setting its own dispatcher if no other is set, so inside it, the Dispatcher.Default is not the one chosen automatically.

Main dispatcher

Android and many other application frameworks have a concept of main or UI thread. That is generally the most important thread. On Android, it is the only one that can be used to interact with the UI. Because of that, it needs to be used very often, but also with great care. When the Main thread is blocked, the whole application is frozen. To run a coroutine on the Main thread, we use Dispatchers.Main.

Dispatchers.Main is available on Android if we use the kotlinx-coroutines-android artifact. Similarly, on JavaFX if we use kotlinx-coroutines-javafx, and on Swing if we use kotlinx-coroutines-swing. There are probably some other libraries that set it. If you don't use any of them, this dispatcher is not configured, and it cannot be used.

For unit testing, you can set another dispatcher instead of this one using Dispatchers.setMain(dispatcher) from kotlinx-coroutines-test.

class SomeTest { private val dispatcher = Executors .newSingleThreadExecutor() .asCoroutineDispatcher() @Before fun setup() { Dispatchers.setMain(dispatcher) } @After fun tearDown() { // reset main dispatcher to // the original Main dispatcher Dispatchers.resetMain() dispatcher.close() } @Test fun testSomeUI() = runBlocking { launch(Dispatchers.Main) { // ... } } }

The typical practice on Android is that this dispatcher is used as the default one. If you use libraries that are suspending instead of blocking, and you don't do any complex calculations, you can practically use only Dispatchers.Main. If you do some CPU-intensive operations, you should run them on the Dispatchers.Default. Those two are enough for many applications, but what if you need to block the thread? For instance, if you need to perform long I/O operations (e.g. read big files) or if you need to use some library with blocking functions. You cannot block the Main thread, because your application would freeze. If you block your default dispatcher, you could risk blocking all the threads in the thread pool. Then you wouldn't be able to do any calculations. That is why we need a dispatcher for such a situation, and it is Dispatchers.IO.

IO dispatcher

Dispatchers.IO is designed to be used when we block threads with longer I/O operations. For instance, when we read files, shared preferences, or calling blocking functions. This dispatcher also has a pool of threads, but it is much bigger. Additional threads in this pool are created and are shut down on demand. The number of threads used by tasks in this dispatcher is limited by the value of "kotlinx.coroutines.io.parallelism" ([IO_PARALLELISM_PROPERTY_NAME]) system property. It defaults to the limit of 64 threads (or the number of cores if this number is larger).

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch //sampleStart suspend fun main() = coroutineScope { repeat(1000) { launch(Dispatchers.IO) { Thread.sleep(200) val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } // Running on thread: DefaultDispatcher-worker-1 //... // Running on thread: DefaultDispatcher-worker-53 // Running on thread: DefaultDispatcher-worker-14 //sampleEnd

This dispatcher shares threads with a Dispatchers.Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread.

import kotlinx.coroutines.* suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { println(Thread.currentThread().name) withContext(Dispatchers.IO) { println(Thread.currentThread().name) } } } // DefaultDispatcher-worker-2 // DefaultDispatcher-worker-2

As a result of the thread sharing, more than 64 (default parallelism) threads can be created (but not used) during operations over the IO dispatcher (64 plus the number of threads on the default dispatcher). Although Dispatchers.IO will never use more threads than its limit allows, so Dispatchers.Default is guaranteed to have enough threads available.

In our applications, we should rarely block threads, and so this dispatcher should not be used often. There is no need to use it when we call functions that are suspending instead of blocking. We mainly use them, when we need to block threads - for instance we do operations on files, or we use blocking libraries.

class DiscUserRepository( private val discReader: DiscReader ) : UserRepository { override suspend fun getUser(): UserData = withContext(Dispatchers.IO) { UserData(discReader.read("userName")) } }

Dispatcher with a pool of threads

There is a different situation if blocking threads might often happen. For instance, if we implement a library that uses blocking calls. If all the libraries would use IO dispatcher, those 64 threads (most likely) might all be blocked. So the best practice for libraries that are (probably) intensively blocking threads, is to define their own dispatcher with their independent pools of threads. We can do that in the following way:

val NUMBER_OF_THREADS = 20 val dispatcher = Executors .newFixedThreadPool(NUMBER_OF_THREADS) .asCoroutineDispatcher()

Such a dispatcher is used when our code might do several blocking calls. If we use it intensively, we might still have all those threads blocked, but at least those are different threads than those used by Dispatchers.IO and Dispatchers.Default. Our calls might be waiting one for another, however, the rest of the application is not affected.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import java.util.concurrent.Executors //sampleStart suspend fun main() = coroutineScope { val dispatcher = Executors.newFixedThreadPool(5) .asCoroutineDispatcher() repeat(1000) { launch(dispatcher) { Thread.sleep(200) val threadName = Thread.currentThread().name println("Running on thread: $threadName") } } } // Running on thread: pool-1-thread-1 // Running on thread: pool-1-thread-2 // Running on thread: pool-1-thread-4 // Running on thread: pool-1-thread-5 // Running on thread: pool-1-thread-3 // Running on thread: pool-1-thread-1 // Running on thread: pool-1-thread-2 // ... //sampleEnd

Such a dispatcher is also used when we just need a huge pool of threads, because we expect many blocking and time-consuming calls.

Dispatcher with a single thread

For all the dispatchers using multiple threads, we need to consider the problem of sharing the state. Notice, that in the below example, 10 000 coroutines are increasing i by 1. So its value should be 10 000, but it is a smaller number. This is a result of shared state (i property) modification on multiple threads at the same time.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { repeat(10_000) { launch(Dispatchers.IO) { // or Default i++ } } delay(1000) println(i) // ~9930 } //sampleEnd

There are many ways how this problem can be solved (most will be described in the chapter "The problem with the state"), but one of the options is to use a dispatcher with just a single thread. We can make such a dispatcher by making a single-thread executor, and transforming it into a coroutine dispatcher:

val dispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() // previously: // val dispatcher = newSingleThreadContext("My name")

The biggest advantage of such a dispatcher is that it does not require any synchronization.

import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.util.concurrent.Executors //sampleStart var i = 0 suspend fun main(): Unit = coroutineScope { val dispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() repeat(10000) { launch(dispatcher) { i++ } } delay(1000) println(i) // 10000 } //sampleEnd

The biggest disadvantage is that due to having only one thread, it should not be blocked.

import kotlinx.coroutines.* import java.util.concurrent.Executors import kotlin.system.measureTimeMillis suspend fun main(): Unit = coroutineScope { val dispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() val job = Job() repeat(5) { launch(dispatcher + job) { Thread.sleep(1000) } } job.complete() val time = measureTimeMillis { job.join() } println("Took $time") // Took 5006 }

Unconfined dispatcher

The last dispatcher we need to discuss is Dispatchers.Unconfined. This dispatcher is different from the previous one, as it is not changing a thread at all. When it is started, it runs on the thread on which it was started. If it was resumed, it runs on the thread that resumed it.

import kotlinx.coroutines.* import kotlin.coroutines.* //sampleStart suspend fun main(): Unit = withContext(newSingleThreadContext("Name1")) { var continuation: Continuation<Unit>? = null launch(newSingleThreadContext("Name2")) { delay(1000) continuation?.resume(Unit) } launch(Dispatchers.Unconfined) { println(Thread.currentThread().name) // Name1 suspendCoroutine<Unit> { continuation = it } println(Thread.currentThread().name) // Name2 delay(1000) println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor // (used by delay) } } //sampleEnd

It is sometimes useful for unit testing. Imagine that you need to test a function, that calls launch. Synchronizing the time might not be easy. One solution is to use Dispatchers.Unconfined instead of all other dispatchers. If it is used in all scopes, everything runs on the same thread, and we can more easily control what is the order of operations. This trick is not needed if we use runBlockingTest from kotlinx-coroutines-test. We will discuss it later in the book.

From the performance point of view, this dispatcher is the cheapest, as it never requires thread switching. So we might choose it if we do not care at all on which thread our code is running. In practice, it is not considered good to use it so recklessly. What if by accident we miss a blocking call, and we are running on the Main thread on Android? This could lead to blocking the entire application.

Immediate main dispatching

Dispatching a coroutine costs something. When withContext is called, the coroutine needs to be suspended and resumed, and on the way we check if the thread we are looking for can be used. This is a small cost, but unnecessary if we already are on this thread. Think of the below function:

suspend fun showUser(user: User) = withContext(Dispatchers.Main) { userNameElement.text = user.name // ... }

If this function would be already called on the main dispatcher, we would have an unnecessary cost of re-dispatching. What is more, if there would be a long queue for the Main thread, because of withContext, the user data might be shown with some delay (they would need to wait until other coroutines do their job first). To prevent that, there is Dispatchers.Main.immediate, that dispatches only if it is needed. So if the below function is called on the Main thread, it won't be re-dispatched, it will be called immediately.

suspend fun showUser(user: User) = withContext(Dispatchers.Main.immediate) { userNameElement.text = user.name // ... }

We prefer Dispatchers.Main.immediate on withContext whenever the function might be called from the main dispatcher already. Currently, the other dispatchers do not support immediate dispatching.

Continuation interceptor

Dispatching works based on the mechanism of continuation interception, that is built-into the language. There is a coroutine context named ContinuationInterceptor, whose method interceptContinuation is used to modify a continuation when a coroutine is suspended. It also has a method releaseInterceptedContinuation that is called when a continuation is resumed.

public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> fun <T> interceptContinuation( continuation: Continuation<T> ): Continuation<T> fun releaseInterceptedContinuation( continuation: Continuation<*> ) {} //... }

The capability to wrap a continuation gives a lot of control. Dispatchers use it to wrap it with DispatchedContinuation, that changes thread when resumed. This is how dispatchers work.

The problem is, that the same context is also used by many testing libraries, for instance by runBlockingTest from kotlinx-coroutines-test. So we have a conflict - there can be only one element with some key in a context. This is why we might need to inject dispatchers, to replace them with test scope context in unit tests. We will get back to this topic in the chapter dedicated to coroutines testing.

class DiscUserRepository( private val discReader: DiscReader, private val dispatcher: CoroutineContext = Dispatchers.IO, ) : UserRepository { override suspend fun getUser(): UserData = withContext(dispatcher) { UserData(discReader.read("userName")) } } class UserReaderTests { @Test fun `some test`() = runBlockingTest { // given val discReader = FakeDiscReader() val repo = DiscUserRepository( discReader, // one of coroutines testing practices this.coroutineContext[ContinuationInterceptor]!! ) //... } }

Performance of dispatchers against different tasks

To show how different dispatchers perform against different tasks, I made a benchmark on my computer. In all the cases, the task is to run 100 independent coroutines with the same task. Different columns represent different tasks: suspending for a second, blocking for a second, and two CPU-intensive operations. Different rows represent different dispatchers used to for those coroutines. The table shows average execution time in seconds, rounded to the first decimal place.

SuspendingBlockingCPU 1CPU 2
Single thread1.0100.014.9200.0
Default (8 threads)1.013.06.3111.0
IO (64 threads)1.02.05.483.3
100 threads1.01.05.783.3

There are a few important observations you can make:

  1. When we are just suspending, it doesn't really matter how many threads we are using.
  2. When we are blocking, the more threads we are using, the faster all those coroutines will be finished.
  3. With CPU-intensive operations there is no big difference between Dispatchers.Default and those using more threads. Although this table shows that Dispatchers.Default is slower. I believe it is because there were other threads running on my computer (for IntelliJ I used to run it on, for Chrome etc.), and more threads used by the program could more effectively fight for CPU attention. I think you should treat those three options equally fast for CPU-intensive task.

Summary

Dispatchers determine which thread or thread pool a coroutine will be running (starting and resuming). The most important dispatchers are:

  • Dispatchers.Default, that we use for CPU-intensive operations;
  • Dispatchers.Main, that we use to access the Main thread on Android, Swing, or JavaFX;
  • Dispatchers.Main.immediate, runs on the same thread as Dispatchers.Main, but is not re-dispatched if it is not necessary;
  • Dispatchers.IO, that we use when we need to do some blocking operations;
  • custom dispatcher with a pool of threads, that we use when we might intensively block threads;
  • custom dispatcher with a single thread, that is used as one of the solutions to the problem of shared state;
  • Dispatchers.Unconfined, that we use when we want to have no thread switching at all.