article banner (priority)

Coroutines under the hood

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub.

There is a certain kind of person who cannot accept that a car can just be driven. They need to open its hood to understand how it works. I am one of these people, so I just had to find out how coroutines work. If you’re like this too, you will enjoy this chapter. If not, you can just skip it.

This chapter won't introduce any new tools that you might use. It is purely explanatory. It tries to explain to a satisfactory level how coroutines work. The key lessons are:

  • Suspending functions are like state machines, with a possible state at the beginning of the function and after each suspending function call.
  • Both the number identifying the state and the local data are kept in the continuation object.
  • Continuation of a function decorates a continuation of its caller function; as a result, all these continuations represent a call stack that is used when we resume or a resumed function completes.

If you are interested in learning some internals (simplified, of course), read on.

Continuation-passing style

There are a few ways in which suspending functions could have been implemented, but the Kotlin team decided on an option called continuation-passing style. This means that continuations (explained in the previous chapter) are passed from function to function as arguments. By convention, a continuation takes the last parameter position.

suspend fun getUser(): User? suspend fun setUser(user: User) suspend fun checkAvailability(flight: Flight): Boolean // under the hood is fun getUser(continuation: Continuation<*>): Any? fun setUser(user: User, continuation: Continuation<*>): Any fun checkAvailability( flight: Flight, continuation: Continuation<*> ): Any

You might have also noticed that the result type under the hood is different from the originally declared one. It has changed to Any or Any?. Why so? The reason is that a suspending function might be suspended, and so it might not return a declared type. In such a case, it returns a special COROUTINE_SUSPENDED marker, which we will later see in practice. For now, just notice that since getUser might return User? or COROUTINE_SUSPENDED (which is of type Any), its result type must be the closest supertype of User? and Any, so it is Any?. Perhaps one day Kotlin will introduce union types, in which case we will have User? | COROUTINE_SUSPENDED instead.

A very simple function

To dive deeper, let's start with a very simple function that prints something before and after a delay.

suspend fun myFunction() { println("Before") delay(1000) // suspending println("After") }

You can already deduce how the myFunction function signature will look under the hood:

fun myFunction(continuation: Continuation<*>): Any

The next thing is that this function needs its own continuation in order to remember its state. Let's name it MyFunctionContinuation (the actual continuation is an object expression and has no name, but it will be easier to explain this way). At the beginning of its body, myFunction will wrap the continuation (the parameter) with its own continuation (MyFunctionContinuation).

val continuation = MyFunctionContinuation(continuation)

This should be done only if the continuation isn't wrapped already. If it is, this is part of the resume process, and we should keep the continuation unchanged1 (this might be confusing now, but later you will better see why).

val continuation = if (continuation is MyFunctionContinuation) continuation else MyFunctionContinuation(continuation)

This condition can be simplified to:

val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation)

Finally, let's talk about the body of our function.

suspend fun myFunction() { println("Before") delay(1000) // suspending println("After") }

The function could be started from two places: either from the beginning (in the case of a first call) or from the point after suspension (in the case of resuming from continuation). To identify the current state, we use a field called label. At the start, it is 0, therefore the function will start from the beginning. However, it is set to the next state before each suspension point so that we start from just after the suspension point after a resume.

// A simplified picture of how myFunction looks under the hood fun myFunction(continuation: Continuation<Unit>): Any { val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation) if (continuation.label == 0) { println("Before") continuation.label = 1 if (delay(1000, continuation) == COROUTINE_SUSPENDED){ return COROUTINE_SUSPENDED } } if (continuation.label == 1) { println("After") return Unit } error("Impossible") }

The last important piece is also presented in the snippet above. When delay is suspended, it returns COROUTINE_SUSPENDED, then myFunction returns COROUTINE_SUSPENDED; the same is done by the function that called it, and the function that called this function, and all other functions until the top of the call stack4. This is how a suspension ends all these functions and leaves the thread available for other runnables (including coroutines) to be used.

Before we go any further, let's analyze the code above. What would happen if this delay call didn’t return COROUTINE_SUSPENDED? What if it just returned Unit instead (we know it won't, but let's hypothesize)? Notice that if the delay just returned Unit, we would just move to the next state, and the function would behave like any other.

Now, let's talk about the continuation, which is implemented as an anonymous class. Simplified, it looks like this:

cont = object : ContinuationImpl(continuation) { var result: Any? = null var label = 0 override fun invokeSuspend(`$result$: Any?): Any? { this.result = $result`; return myFunction(this); } };

To improve the readability of our function, I decided to represent it as a class named MyFunctionContinuation. I also decided to hide the inheritance by inlining the ContinuationImpl body. The resulting class is simplified: I’ve skipped many optimizations and functionalities so as to keep only what is essential.

In JVM, type arguments are erased during compilation; so, for instance, both Continuation<Unit> or Continuation<String> become just Continuation. Since everything we present here is Kotlin representation of JVM bytecode, you should not worry about these type arguments at all.

The code below presents a complete simplification of how our function looks under the hood:

import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.resume //sampleStart fun myFunction(continuation: Continuation<Unit>): Any { val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation) if (continuation.label == 0) { println("Before") continuation.label = 1 if (delay(1000, continuation) == COROUTINE_SUSPENDED){ return COROUTINE_SUSPENDED } } if (continuation.label == 1) { println("After") return Unit } error("Impossible") } class MyFunctionContinuation( val completion: Continuation<Unit> ) : Continuation<Unit> { override val context: CoroutineContext get() = completion.context var label = 0 var result: Result<Any>? = null override fun resumeWith(result: Result<Unit>) { this.result = result val res = try { val r = myFunction(this) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) } } //sampleEnd private val executor = Executors .newSingleThreadScheduledExecutor { Thread(it, "scheduler").apply { isDaemon = true } } fun delay(timeMillis: Long, continuation: Continuation<Unit>): Any { executor.schedule({ continuation.resume(Unit) }, timeMillis, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun main() { val EMPTY_CONTINUATION = object : Continuation<Unit> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { // This is root coroutine, we don't need anything in this example } } myFunction(EMPTY_CONTINUATION) Thread.sleep(2000) // Needed to don't let the main finish immediately. } val COROUTINE_SUSPENDED = Any()

If you want to analyze by yourself what suspending functions are under the hood, open the function in IntelliJ IDEA, use Tools > Kotlin > Show Kotlin bytecode, and click the "Decompile" button. As a result, you will see this code decompiled to Java (so more or less how this code would look if it were written in Java).

How to show the bytecode generated from the file.
The bytecode generated from the file. Notice the "Decompile" button, which lets us decompile this bytecode to Java.
Bytecode from the Kotlin suspending function decompiled into Java.

A function with a state

If a function has some state (like local variables or parameters) that needs to be restored after suspension, this state needs to be kept in this function's continuation. Let's consider the following function:

suspend fun myFunction() { println("Before") var counter = 0 delay(1000) // suspending counter++ println("Counter: $counter") println("After") }

Here counter is needed in two states (for a label equal to 0 and 1), so it needs to be kept in the continuation. It will be stored right before suspension. Restoring these kinds of properties happens at the beginning of the function. So, this is how the (simplified) function looks under the hood:

import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.* //sampleStart fun myFunction(continuation: Continuation<Unit>): Any { val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation) var counter = continuation.counter if (continuation.label == 0) { println("Before") counter = 0 continuation.counter = counter continuation.label = 1 if (delay(1000, continuation) == COROUTINE_SUSPENDED){ return COROUTINE_SUSPENDED } } if (continuation.label == 1) { counter = (counter as Int) + 1 println("Counter: $counter") println("After") return Unit } error("Impossible") } class MyFunctionContinuation( val completion: Continuation<Unit> ) : Continuation<Unit> { override val context: CoroutineContext get() = completion.context var result: Result<Unit>? = null var label = 0 var counter = 0 override fun resumeWith(result: Result<Unit>) { this.result = result val res = try { val r = myFunction(this) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) } } //sampleEnd private val executor = Executors.newSingleThreadScheduledExecutor { Thread(it, "scheduler").apply { isDaemon = true } } fun delay(timeMillis: Long, continuation: Continuation<Unit>): Any { executor.schedule({ continuation.resume(Unit) }, timeMillis, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun main() { val EMPTY_CONTINUATION = object : Continuation<Unit> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { // This is root coroutine, we don't need anything in this example } } myFunction(EMPTY_CONTINUATION) Thread.sleep(2000) // Needed to prevent main() from finishing immediately. } private val COROUTINE_SUSPENDED = Any()

A function resumed with a value

The situation is slightly different if we actually expect some data from the suspension. Let’s analyze the function below:

suspend fun printUser(token: String) { println("Before") val userId = getUserId(token) // suspending println("Got userId: $userId") val userName = getUserName(userId, token) // suspending println(User(userId, userName)) println("After") }

Here there are two suspending functions: getUserId and getUserName. We also added a parameter token, and our suspending function also returns some values. This all needs to be stored in the continuation:

  • token, because it is needed in states 0 and 1,
  • userId, because it is needed in states 1 and 2,
  • result of type Result, which represents how this function was resumed.

If the function was resumed with a value, the result will be Result.Success(value). In such a case, we can get and use this value. If it was resumed with an exception, the result will be Result.Failure(exception). In such a case, this exception will be thrown.

import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.* //sampleStart fun printUser( token: String, continuation: Continuation<*> ): Any { val continuation = continuation as? PrintUserContinuation ?: PrintUserContinuation( continuation as Continuation<Unit>, token ) var result: Result<Any>? = continuation.result var userId: String? = continuation.userId val userName: String if (continuation.label == 0) { println("Before") continuation.label = 1 val res = getUserId(token, continuation) if (res == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } result = Result.success(res) } if (continuation.label == 1) { userId = result!!.getOrThrow() as String println("Got userId: $userId") continuation.label = 2 continuation.userId = userId val res = getUserName(userId, continuation) if (res == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } result = Result.success(res) } if (continuation.label == 2) { userName = result!!.getOrThrow() as String println(User(userId as String, userName)) println("After") return Unit } error("Impossible") } class PrintUserContinuation( val completion: Continuation<Unit>, val token: String ) : Continuation<String> { override val context: CoroutineContext get() = completion.context var label = 0 var result: Result<Any>? = null var userId: String? = null override fun resumeWith(result: Result<String>) { this.result = result val res = try { val r = printUser(token, this) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) } } //sampleEnd fun main() { toStart() } private val executor = Executors.newSingleThreadScheduledExecutor { Thread(it, "scheduler").apply { isDaemon = true } } data class User(val id: String, val name: String) object ApiException : Throwable("Fake API exception") fun getUserId(token: String, continuation: Continuation<String>): Any { executor.schedule({ continuation.resume("SomeId") }, 1000, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun getUserName(userId: String, continuation: Continuation<String>): Any { executor.schedule({ continuation.resume("SomeName") // continuation.resumeWithException(ApiException) }, 1000, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun toStart() { val EMPTY_CONTINUATION = object : Continuation<Unit> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: kotlin.Result<Unit>) { if (result.isFailure) { result.exceptionOrNull()?.printStackTrace() } } } printUser("SomeToken", EMPTY_CONTINUATION) Thread.sleep(3000) // Needed to prevent the function from finishing immediately. } private fun Result<*>.throwOnFailure() { if (isFailure) throw exceptionOrNull()!! } private val COROUTINE_SUSPENDED = Any()

The call stack

When function a calls function b, the virtual machine needs to store the state of a somewhere, as well as the address where execution should return once b is finished. All this is stored in a structure called call stack2. The problem is that when we suspend, we free a thread; as a result, we clear our call stack. Therefore, the call stack is not useful when we resume. Instead, the continuations serve as a call stack. Each continuation keeps the state where we suspended (as a label) the function’s local variables and parameters (as fields), and the reference to the continuation of the function that called this function. One continuation references another, which references another, etc. As a result, our continuation is like a huge onion: it keeps everything that is generally kept on the call stack. Take a look at the following example:

suspend fun a() { val user = readUser() b() b() b() println(user) } suspend fun b() { for (i in 1..10) { c(i) } } suspend fun c(i: Int) { delay(i * 100L) println("Tick") }

A sample continuation could be represented as follows:

CContinuation(
  i = 4,
  label = 1,
  completion = BContinuation(
      i = 4,
      label = 1,
      completion = AContinuation(
          label = 2,
          user = User@1234,
          completion = ...
      )
  )
)

Looking at the above representation, how many times was "Tick" already printed (assume readUser is not a suspending function)3?

When a continuation is resumed, each continuation first calls its function; once this is done, that continuation resumes the continuation of the function that called the function. This continuation calls its function, and the process repeats until the top of the stack is reached.

override fun resumeWith(result: Result<String>) { this.result = result val res = try { val r = printUser(token, this) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) }

For example, think of a situation where function a calls function b, which calls function c, which is suspended. During resuming, the c continuation first resumes the c function. Once this function is done, the c continuation resumes the b continuation that calls the b function. Once it is done, the b continuation resumes the a continuation, which calls the a function.

The whole process can be visualized with the following sketch:

It’s similar with exceptions: an uncaught exception is caught in resumeWith and then wrapped with Result.failure(e), and then the function that called our function is resumed with this result.

I hope this all gives you some picture of what is going on when we suspend. The state needs to be stored in a continuation, and the suspension mechanism needs to be supported. When we resume, we need to restore the state from the continuation and either use the result or throw an exception.

The actual code

The actual code that continuations and suspending functions are compiled to is more complicated as it includes optimizations and some additional mechanisms, like:

  • constructing a better exceptions stack trace;
  • adding coroutine suspension interception (we will talk about this feature later);
  • optimizations on different levels (like removing unused variables or tail-call optimization).

Here is a part of the BaseContinuationImpl from Kotlin version "1.5.30"; it shows the actual resumeWith implementation (other methods and some comments skipped):

internal abstract class BaseContinuationImpl( val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to // unroll resumeWith recursion. final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in // current.resumeWith(param) to make saner and // shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed // continuation, so that a debugging library // infrastructure can precisely track what part // of suspended call stack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast // when trying to resume continuation // without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- // invoke and return completion.resumeWith(outcome) return } } } } // ... }

As you can see, it uses a loop instead of recursion. This change allows the actual code to make some optimizations and simplifications.

The performance of suspending functions

What is the cost of using suspending functions instead of regular ones? When looking under the hood, many people might have an impression that the cost is significant, but this is not true. Dividing a function into states is cheap as number comparison and execution jumping costs nearly nothing. Saving a state in a continuation is also cheap. We do not copy local variables: we make new variables point to the same points in memory. The only operation that costs something is creating a continuation class, but this is still not a big deal. If you are not worried about the performance of RxJava or callbacks, you should definitely not worry about the performance of suspending functions.

Summary

What is actually under the hood is more complicated than what I’ve described, but I hope that you’ve got some idea of the internals of coroutines. The key lessons are:

  • Suspending functions are like state machines, with a possible state at the beginning of the function and after each suspending function call.
  • Both the label identifying the state and the local data are kept in the continuation object.
  • Continuation of one function decorates a continuation of its caller function; as a result, all these continuations represent a call stack that is used when we resume or a resumed function completes.
1:

The actual mechanism here is a little more complicated as the first bit of the label is also changed, and this change is checked by the suspending function. This mechanism is needed for suspending functions in order to support recurrence. This has been skipped for the sake of simplicity.

2:

The call stack has limited space. When it has all been used, StackOverflowError occurs. Does this remind you of some popular website we use to ask or answer technical questions?

3:

The answer is 13. Since the label on AContinuation is 2, one b function call has already finished (this means 10 ticks). Since i equals 4, three ticks have already been printed in this b function.

4:

More concretely, COROUTINE_SUSPENDED is propagated until it reaches either the builder function or the 'resume' function.