Coroutines under the hood

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

There is a certain kind of person who cannot just accept that a car can be driven. They need to open its hood to understand how it is working. I am one of those people, so I just needed to find out how coroutines work. If you are too, you will enjoy this chapter. If you are not, you can just skip it.

This chapter won't introduce any new tools that you might use. It is purely explanatory. It tries to explain how coroutines work, to a satisfactory level. The key lessons are:

  • Suspending functions are like state machines, with a possible state at the beginning of the function and after each suspending function call.
  • Both the number identifying the state, and the local data are kept in the continuation object.
  • Continuation of one function decorates a continuation of another, and as a result, all those continuations represent a call stack that is used when we resume.

If you are interested in learning some internals (in some simplification, of course), follow me.

Continuation-passing style

There are a few ways how suspending functions could have been implemented, but the Kotlin team decided on an option called continuation-passing style. It means that the continuations (we've explained in the previous chapter) are passed from function to function as arguments. By convention, a continuation takes the last parameter position.

suspend fun getUser(): User? suspend fun setUser(user: User) suspend fun checkAvailability(flight: Flight): Boolean // under the hood is fun getUser(continuation: Continuation<*>): Any? fun setUser(user: User, continuation: Continuation<*>): Any fun checkAvailability( flight: Flight, continuation: Continuation<*> ): Any

You might have also noticed that the result type under the hood is different from the originally declared. It changed from Unit and Boolean to Any, and from User? to Any?. Why so? The reason is that a suspending function might be suspended, and so it might not return a declared type. In such a case, it returns a special marker COROUTINE_SUSPENDED. We will later see it in practice. For now just notice that since getUser might return User? or COROUTINE_SUSPENDED (that is of type Any), its result type must be the closest supertype of User? and Any, so it is Any?. Perhaps one day Kotlin will introduce union types, and here we will have User? | COROUTINE_SUSPENDED instead.

A very simple function

To dive deeper, let's start with a very simple function that prints something before and after delay.

suspend fun myFunction() { println("Before") delay(1000) // suspending println("After") }

You can already deduce now, how its signature will look like under the hood:

fun myFunction(continuation: Continuation<*>): Any

The next thing is that this function needs its own continuation, to remember its state. Let's name it MyFunctionContinuation (the actual continuation is an object expression and has no name, but this way it will be easier to explain). At the beginning of its body, myFunction will wrap the continuation (the parameter) with its own continuation (MyFunctionContinuation).

val continuation = MyFunctionContinuation(continuation)

This should be done only if the continuation isn't wrapped already. If it is, this is part of the resume process, and we should keep the continuation unchanged1 (it might be confusing now, but later you will better see why).

val continuation = if (continuation is MyFunctionContinuation) continuation else MyFunctionContinuation(continuation)

This condition can be simplified to:

val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation)

Finally, let's talk about the body of our function.

suspend fun myFunction() { println("Before") delay(1000) // suspending println("After") }

The function could be started from two places: either from the beginning (in case of a first call) or from the point after suspension (in case of resuming from continuation). To identify the current state, we use a field called label. At the start, it is 0, so the function will start from the beginning. Although before each suspension point, it is set to the next state, so that after resume we start from just after suspension.

// A simplified picture of how myFunction looks under the hood fun myFunction(continuation: Continuation<Unit>): Any { val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation) if (continuation.label == 0) { println("Before") continuation.label = 1 if (delay(1000, continuation) == COROUTINE_SUSPENDED){ return COROUTINE_SUSPENDED } } if (continuation.label == 1) { println("After") return Unit } error("Impossible") }

The last important piece is also presented in the snippet above. When delay is suspended, it returns COROUTINE_SUSPENDED, then myFunction returns COROUTINE_SUSPENDED, the same is done by the function that called it, and the function that called this function, and all other functions until the top of the call stack. This is how a suspension ends all those functions, and leaves the thread available for other runnables (including coroutines) to be used.

Before we go any further, let's analyze the code above. What would happen if this delay call would return COROUTINE_SUSPENDED? What if it would just return Unit instead (we know it won't, but let's hypothesize)? Notice that if delay would just return Unit we would just move to the next state, and the function would behave like any other.

Now, let's talk about the continuation. Actual MyFunctionContinuation implementation would be quite short thanks to the inheritance from ContinuationImpl.

class MyFunctionContinuation( continuation: Continuation<*> ) : ContinuationImpl(continuation) { var result: Any? = null // in this case it is not needed, // as we resume with Unit, but later we will see how it // is useful var label = 0 override fun invokeSuspend(`$result$: Any?): Any? { this.result = $result`; return myFunction(this); } };

You might have noticed that our continuation extends from ContinuationImpl. This class, together with its superclasses, is responsible for the whole resuming process. In their full form, they are quite complicated. We will see a closer and closer simplification over time, but for now, we will just make a minimal continuation required for our simple function (this is why in later examples, our continuation will inherit from Continuation instead of from ContinuationImpl). In this next simplification, we assume that:

  • this continuation needs no state except for where we suspended (label),
  • the value passed to resume is not important (it is Unit),
  • we know that continuation will not be resumed with an exception.

The below code presents a complete simplification.

import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.resume //sampleStart fun myFunction(continuation: Continuation<Unit>): Any { val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation) if (continuation.label == 0) { println("Before") continuation.label = 1 if (delay(1000, continuation) == COROUTINE_SUSPENDED){ return COROUTINE_SUSPENDED } } if (continuation.label == 1) { println("After") return Unit } error("Impossible") } class MyFunctionContinuation( val completion: Continuation<Unit> ) : Continuation<Unit> { override val context: CoroutineContext get() = completion.context var label = 0 override fun resumeWith(result: Result<Unit>) { if (result.isSuccess) { val res = myFunction(this) completion.resume(res as Unit) } // ... (we will talk about it later) } } //sampleEnd private val executor = Executors .newSingleThreadScheduledExecutor { Thread(it, "scheduler").apply { isDaemon = true } } fun delay(timeMillis: Long, continuation: Continuation<Unit>): Any { executor.schedule({ continuation.resume(Unit) }, timeMillis, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun main() { val EMPTY_CONTINUATION = object : Continuation<Unit> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { // This is root coroutine, we don't need anything in this example } } myFunction(EMPTY_CONTINUATION) Thread.sleep(2000) // Needed to don't let the main finish immediately. } val COROUTINE_SUSPENDED = Any()

If you want to analyze what are suspending functions under the hood, open the function in IntelliJ IDEA, use Tools > Kotlin > Show Kotlin bytecode, and click the "Decompile" button. As a result, you will see this code decompiled to Java (so more or less how this code would look like if it were written in Java).

How to show the bytecode generated from the file. The bytecode generated from the file. Notice the "Decompile" button, that lets us decompile this bytecode to Java. Bytecode from Kotlin suspending function decompiled into Java.

A function with a state

If a function has some state (like local variables) that needs to be restored after suspension, this state needs to be kept in its continuation. Let's consider the following function:

suspend fun myFunction() { println("Before") var counter = 0 delay(1000) // suspending counter++ println("Counter: $counter") println("After") }

Here counter is needed in two states (for label equal to 0 and 1), so it needs to be kept in the continuation. It will be stored straight before suspension. Restoring this kind or properties happens at the beginning of the function. So this is how the function (in simplification) will look like under the hood:

import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.* //sampleStart fun myFunction(continuation: Continuation<Unit>): Any { val continuation = continuation as? MyFunctionContinuation ?: MyFunctionContinuation(continuation) var counter = continuation.counter if (continuation.label == 0) { println("Before") counter = 0 continuation.counter = counter continuation.label = 1 if (delay(1000, continuation) == COROUTINE_SUSPENDED){ return COROUTINE_SUSPENDED } } if (continuation.label == 1) { counter = (counter as Int) + 1 println("Counter: $counter") println("After") return Unit } error("Impossible") } class MyFunctionContinuation( val completion: Continuation<Unit> ) : Continuation<Unit> { override val context: CoroutineContext get() = completion.context var label = 0 var counter: Int? = null override fun resumeWith(result: Result<Unit>) { if (result.isSuccess) { val res = myFunction(this) completion.resume(res as Unit) } // ... (we will talk about it later) } } //sampleEnd private val executor = Executors.newSingleThreadScheduledExecutor { Thread(it, "scheduler").apply { isDaemon = true } } fun delay(timeMillis: Long, continuation: Continuation<Unit>): Any { executor.schedule({ continuation.resume(Unit) }, timeMillis, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun main() { val EMPTY_CONTINUATION = object : Continuation<Unit> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: Result<Unit>) { // This is root coroutine, we don't need anything in this example } } myFunction(EMPTY_CONTINUATION) Thread.sleep(2000) // Needed to don't let the main finish immediately. } private val COROUTINE_SUSPENDED = Any()

A function resumed with a value

There is a slightly different situation, if in suspension we actually expect some data. Let’s analyse the below function:

suspend fun printUser(token: String) { println("Before") val userId = getUserId(token) // suspending println("Got userId: $userId") val userName = getUserName(userId) // suspending println(User(userId, userName)) println("After") }

Here there are two suspending functions: getUserId and getUserName. We also added a parameter token, and our suspending function also returns some values. This all needs to be stored in the continuation:

  • token, that is required in the continuation, because it needs to be used when the printUser function is called,
  • userId, because it is needed in other states straight after resume,
  • result of type Result, that represents how this function was resumed.

If the function was resumed with a value, the result will be Result.Success(value). In such a case, we can get and use this value. If it was resumed with an exception, the result will be Result.Failure(exception). In such a case, we will throw this exception.

import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import kotlin.coroutines.* //sampleStart fun printUser( token: String, continuation: Continuation<Nothing> ): Any { val continuation = if (continuation is MyFunctionContinuation) continuation else MyFunctionContinuation( continuation as Continuation<Unit>, token ) var result: Result<Any>? = continuation.result var userId: String? = continuation.userId val userName: String if (continuation.label == 0) { println("Before") continuation.label = 1 val res = getUserId(token, continuation) if (res == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } result = Result.success(res) } if (continuation.label == 1) { userId = result.getOrThrow() as String println("Got userId: $userId") continuation.label = 2 continuation.userId = userId val res = getUserName(userId, continuation) if (res == COROUTINE_SUSPENDED) { return COROUTINE_SUSPENDED } result = Result.success(res) } if (continuation.label == 2) { userName = result.getOrThrow() as String println(User(userId as String, userName)) println("After") return Unit } error("Impossible") } class MyFunctionContinuation( val completion: Continuation<Unit>, val token: String ) : Continuation<String> { override val context: CoroutineContext get() = completion.context var label = 0 var result: Result<Any>? = null var userId: String? = null override fun resumeWith(result: Result<String>) { this.result = result val res = try { val r = printUser(token, this) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) } } //sampleEnd fun main() { toStart() } private val executor = Executors.newSingleThreadScheduledExecutor { Thread(it, "scheduler").apply { isDaemon = true } } data class User(val id: String, val name: String) object ApiException : Throwable("Fake API exception") fun getUserId(token: String, continuation: Continuation<String>): Any { executor.schedule({ continuation.resume("SomeId") }, 1000, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun getUserName(userId: String, continuation: Continuation<String>): Any { executor.schedule({ // continuation.resume("SomeName") continuation.resumeWithException(ApiException) }, 1000, TimeUnit.MILLISECONDS) return COROUTINE_SUSPENDED } fun toStart() { val EMPTY_CONTINUATION = object : Continuation<Unit> { override val context: CoroutineContext = EmptyCoroutineContext override fun resumeWith(result: kotlin.Result<Unit>) { if (result.isFailure) { result.exceptionOrNull()?.printStackTrace() } } } printUser("SomeToken", EMPTY_CONTINUATION) Thread.sleep(3000) // Needed to don't let the main finish immediately. } private fun Result<*>.throwOnFailure() { if (isFailure) throw exceptionOrNull()!! } private val COROUTINE_SUSPENDED = Any()

Call stack

When a function a calls function b, a virtual machine needs to store somewhere the state of a, and the address where execution should return once b is finished. All that is stored on a structure called call stack2. The problem is that when we suspend, we free a thread and as a result we clear our call stack. As a result, so it is not useful when we resume. Instead, the continuations serve us as a call stack. Each continuation keeps the state where we suspended (as a label), function local variables and parameters (as fields), and the reference to the continuation of the function that called us. One continuation references another, that references another etc. As a result, our continuation is like a huge onion, keeping everything that is generally kept on the call stack. Take a look at the following example:

suspend fun a() { val user = readUser() b() b() b() println(user) } suspend fun b() { for (i in 1..10) { c(i) } } suspend fun c(i: Int) { delay(i * 100L) println("Tick") }

A sample continuation could be represented as follows:

CContinuation(
   label = 1,
   completion = BContinuation(
       i = 4,
       label = 1,
       completion = AContinuation(
           label = 2,
           user = User@1234
       )
   )
)

Looking at the above representation, how many times "Tick" was already printed (assume readUser is not a suspending function)3?

When a continuation is resumed, each continuation first calls its function, and once it is done, it resumes the continuation of the function that called it. This one calls its function, and the process repeats until the top of the stack.

override fun resumeWith(result: Result<String>) { this.result = result val res = try { val r = printUser(token, this) if (r == COROUTINE_SUSPENDED) return Result.success(r as Unit) } catch (e: Throwable) { Result.failure(e) } completion.resumeWith(res) }

The whole process can be visualized with the following sketch:

Similarly with exceptions, they are thrown from each function one by one, unless they are caught somewhere.

I hope this all gives you some picture of what is going on when we suspend. The state needs to be stored in continuation, and suspension mechanism needs to be supported. When we resume, we need to restore the state from continuation and either use result or throw an exception.

The actual code

The actual continuations and suspending functions code is more complicated, as it includes optimizations and some additional mechanisms, like:

  • constructing a better exceptions stack trace,
  • adding coroutine suspension interception (we will talk about this feature later).

Here is a part of the BaseContinuationImpl from Kotlin version "1.5.30", that shows the actual resumeWith implementation (other methods and some comments skipped):

internal abstract class BaseContinuationImpl( val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { // This implementation is final. This fact is used to // unroll resumeWith recursion. final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in // current.resumeWith(param) to make saner and // shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed // continuation, so that a debugging library // infrastructure can precisely track what part // of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast // when trying to resume continuation // without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- // invoke and return completion.resumeWith(outcome) return } } } } // ... }

As you can see, it uses a loop instead of recursion. This change allows code to make some optimizations and simplifications.

Summary

What is actually under the hood is more complicated, but I hope that you end up with some idea of the internals of coroutines. The key lessons are:

  • Suspending functions are like state machines, with a possible state at the beginning of the function and after each suspending function call.
  • Both the label identifying the state, and the local data are kept in the continuation object.
  • Continuation of one function decorates a continuation of another, and as a result, all those continuations represent a call stack that is used when we resume.
1:

The actual mechanism here is a bit more complicated, as the label is additionally changed on the first bit, and on the other side this change is checked. This mechanism is needed for suspending functions to support recurrence. This has been skipped for the sake of simplicity.

2:

Call stack has limited space. When it is all used, we are dealing with StackOverflowError. Does it remind you of some popular website we use, to ask or answer technical questions?

3:

The answer is 13. Since label on AContinuation is 1, one b call has finished already (this means 10 ticks). Since i equals 4, three ticks were printed before in this b.