article banner

Job and children awaiting in Kotlin Coroutines

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

When a coroutine is suspended, the only thing that remains is its continuation. This continuation includes references to local variables, labels marking where each suspending function has stopped, and this coroutine context. That is it. However, coroutines need to keep more information than that. They need to know their state, their relationships (parent and children), and more. So they keep it in the special context called Job.

Job is the most important context for each coroutine. Every coroutine has its own job, and it is the only context not inherited from the parent. It cannot be, every coroutine has its own state and its own relationships, so Job cannot be shared. It also cannot be set from the outside, as every coroutine builder creates and controls its own job.

Job is a context identified by Job key, and implementing Job interface. It is a cancellable thing with a lifecycle. It has a state, and it can be used to cancel coroutines, track their state, and much more. It is really important, so this and the next two chapters are dedicated to the Job context and the essential Kotlin Coroutine mechanisms that are connected to it.

Job and relationships

Every coroutine has its own job, and it can be accessed from its context using Job key.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { print(coroutineContext[Job]?.isActive) // true }

There is also an extension property job, which lets us access the job more easily.

// extension val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't...") // usage fun main(): Unit = runBlocking { print(coroutineContext.job.isActive) // true }

Asynchronous coroutine builders return their jobs, so it can be used elsewhere. This is clearly visible for launch, where Job is an explicit result type.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val job: Job = launch { delay(1000) println("Test") } } //sampleEnd

The type returned by the async function is Deferred<T>, and Deferred<T> also implements the Job interface, so it can also be used in the same way.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val deferred: Deferred<String> = async { delay(1000) "Test" } val job: Job = deferred } //sampleEnd

Job is the only coroutine context that is not inherited by a coroutine from a coroutine. Every coroutine creates its own Job, and the job from an argument or parent coroutine is used as a parent of this new job0.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val name = CoroutineName("Some name") val job = Job() launch(name + job) { val childName = coroutineContext[CoroutineName] println(childName == name) // true val childJob = coroutineContext[Job] println(childJob == job) // false println(childJob == job.children.first()) // true } }

The parent can reference all its children, and the children can refer to the parent. This parent-child relationship enables the implementation of cancellation and exception handling inside a coroutine’s scope. In most cases, Job is passed implicitly, in the scope of a coroutine builder, like in the below example, runBlocking is a parent of launch, because launch can find its job in the scope provided by runBlocking.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val job: Job = launch { delay(1000) } val parentJob: Job = coroutineContext.job println(job == parentJob) // false val parentChildren: Sequence<Job> = parentJob.children println(parentChildren.first() == job) // true }

Structured concurrency mechanisms will not work if a new Job context replaces the one from the parent.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { launch(Job()) { // the new job replaces one from parent delay(1000) println("Will not be printed") } } // (prints nothing, finishes immediately)

In the above example, runBlocking does not wait for launch, because it has no relation with it. This is because launch uses the job from the argument as a parent.

When a coroutine has its own (independent) job, it has nearly no relation to its parent. It inherits other contexts, but other consequences of the parent-child relationship will not apply. This causes us to lose structured concurrency, which is a problematic situation that should be avoided.

Coroutine lifecycle

Every coroutine has its own state, and this state is managed by its job. State lifecycle is essential for the basic mechanisms of coroutines, like cancellation and synchronization. Here is a graph of states and the transitions between them:

A diagram of job (so also coroutine) states.

In the "Active" state, a job is running. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the "Active" state. Only those that are started lazily will start with the "New" state. These need to be started in order for them to move to the "Active" state. When a coroutine is executing its body, it is surely in the "Active" state. When body execution is finished, its state changes to "Completing", where this coroutine waits for its children completion. Once all its children are completed, the job (coroutine) changes its state to "Completed", which is a terminal state. Alternatively, if a job cancels or fails during the "Active" or "Completing" state, its state will change to "Cancelling". In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do this in the next chapter). Once this is done, the job will move to the "Cancelled" state.

The state is displayed in a job’s toString2. In the example below, we see different jobs as their states change. The last one is started lazily, which means it does not start automatically. All the others will immediately become active once created.

import kotlinx.coroutines.* suspend fun main() = coroutineScope { // Job created with a builder is active val job = Job() println(job) // JobImpl{Active}@ADD // until we complete it with a method job.complete() println(job) // JobImpl{Completed}@ADD // launch is initially active by default val activeJob = launch { delay(1000) } println(activeJob) // StandaloneCoroutine{Active}@ADD // here we wait until this job is done activeJob.join() // (1 sec) println(activeJob) // StandaloneCoroutine{Completed}@ADD // launch started lazily is in New state val lazyJob = launch(start = CoroutineStart.LAZY) { delay(1000) } println(lazyJob) // LazyStandaloneCoroutine{New}@ADD // we need to start it, to make it active lazyJob.start() println(lazyJob) // LazyStandaloneCoroutine{Active}@ADD lazyJob.join() // (1 sec) println(lazyJob) //LazyStandaloneCoroutine{Completed}@ADD }

To check the state in code, we use the properties isActive, isCompleted, and isCancelled.

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

Job interface also offers us some useful functions, that can be used to interact with the job. Let's start from join, which is used to wait for the job to complete.

Awaiting job completion

Coroutine's job can be used to wait until it completes. For that, we use the join method, that suspends until a concrete job reaches a final state (either "Cancelled" or "Completed").

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val job1 = launch { delay(1000) println("Test1") } val job2 = launch { delay(2000) println("Test2") } job1.join() job2.join() println("All tests are done") } // (1 sec) // Test1 // (1 sec) // Test2 // All tests are done

The Job interface also exposes a children property that lets us reference all its children. We might as well use it to wait until all children are in a final state.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { launch { delay(1000) println("Test1") } launch { delay(2000) println("Test2") } val children = coroutineContext[Job] ?.children val childrenNum = children?.count() println("Number of children: $childrenNum") children?.forEach { it.join() } println("All tests are done") } // Number of children: 2 // (1 sec) // Test1 // (1 sec) // Test2 // All tests are done

It is not uncommon to use join for synchronizing coroutines. For instance, if you could use it to make sure that a coroutine is done before you start another one:

private val refreshJob: Job? = null suspend fun refresh() { refreshJob?.join() refreshJob = launch { refreshData() } }

This use-case can also be solved using Mutex.

We can also use it to cancel the previous job before starting a new one. This is especially useful when we want to make sure that only one coroutine is running at a time.

private val refreshJob: Job? = null suspend fun refresh() { refreshJob?.cancel() refreshJob?.join() refreshJob = launch { refreshData() } }

Here is an example of a more complex use-case. We have an order that needs to be completed. We need to create an order, create an invoice, deliver the order, and send an email. We want to make sure that order is created before we mark order as invoiced. We also want to make sure that the invoice is created before we mark the order as delivered. We also want to make sure that the order is marked as invoiced and delivered before we send an email. We can use join to synchronize these operations.

suspend fun completeOrder(order: Order) = coroutineScope { val createOrderJob = launch { orderService.createOrder(order) } val invoiceJob = launch { val invoiceId = invoiceService.createInvoice(order) createOrderJob.join() orderService.markOrderAsInvoiced(order, invoiceId) } val deliveryJob = launch { val deliveryId = deliveryService.orderDelivery(order) invoiceJob.join() orderService.markOrderAsDelivered(order, deliveryId) } invoiceJob.join() deliveryJob.join() sendEmail(order) }

Instead of using join, you might also use await from async to wait for the result of a coroutine. The only difference is that await returns the result of the coroutine, while join returns Unit.

Job factory function

A Job can be created without a coroutine using the Job() factory function. Job() creates a job that isn't associated with any coroutine and can be used as a context. This also means that we can use such a job as a parent of many coroutines. However, using such a job as a parent is tricky, and I recommend avoiding it.

A common mistake is to create a job using the Job() factory function, use it as a parent for some coroutines, and then use join on the job. Such a program will never end because Job is still in the "Active" state, even when all its children are finished. This is because this context is still ready to be used by other coroutines.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.join() // Here we will await forever println("Will not be printed") } // (1 sec) // Text 1 // (1 sec) // Text 2 // (runs forever) //sampleEnd

A better approach would be to join all the current children of the job.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.children.forEach { it.join() } } // (1 sec) // Text 1 // (1 sec) // Text 2 //sampleEnd

Job() is an example of fake constructor pattern1. At first, you might think that you're calling a constructor of Job, but you might then realize that Job is an interface, and interfaces cannot have constructors. The reality is that it is a simple function that looks like a constructor. Moreover, the actual type returned by this function is not a Job but its subinterface CompletableJob.

public fun Job(parent: Job? = null): CompletableJob

The CompletableJob interface extends the functionality of the Job interface by providing two additional methods:

  • complete(): Boolean - used to change this job’s state to "Completing". In this state, the job waits for all its children to complete, and once they are done, it changes its state to "Completed". Once a coroutine is "Completing" or "Completed", it cannot move back to "Active" state. The result of complete is true if this job was completed as a result of this invocation; otherwise, it is false (if it was already completed).
import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking //sampleStart fun main() = runBlocking { val job = Job() launch(job) { repeat(5) { num -> delay(200) println("Rep$num") } } launch { delay(500) job.complete() } job.join() launch(job) { println("Will not be printed") } println("Done") } // Rep0 // Rep1 // Rep2 // Rep3 // Rep4 // Done //sampleEnd
  • completeExceptionally(exception: Throwable): Boolean - Completes this job with a given exception. This means that all children will be cancelled immediately (with CancellationException wrapping the exception provided as an argument). The result of complete is true if this job was completed as a result of this invocation; otherwise, it is false (if it was already completed).
import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Error //sampleStart fun main() = runBlocking { val job = Job() launch(job) { repeat(5) { num -> delay(200) println("Rep$num") } } launch { delay(500) job.completeExceptionally(Error("Some error")) } job.join() launch(job) { println("Will not be printed") } println("Done") } // Rep0 // Rep1 // Done //sampleEnd

The complete function can be used after we start the last coroutine on a job. Thanks to this, we can just wait for the job completion using the join function.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.complete() job.join() } // (1 sec) // Text 1 // (1 sec) // Text 2 //sampleEnd

You can pass a reference to the parent as an argument of the Job function. Thanks to this, such a job will be cancelled when the parent is.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val parentJob = Job() val job = Job(parentJob) launch(job) { delay(1000) println("Text 1") } launch(job) { delay(2000) println("Text 2") } delay(1100) parentJob.cancel() job.children.forEach { it.join() } } // Text 1 //sampleEnd

Synchronizing coroutines

It is not uncommon to use join from Job for synchronizing coroutines. For instance, if you want to make sure that an operation is started after another coroutine is finished, you can use join from the job of the first coroutine.

class SomeService( private val scope: CoroutineScope ) { fun startTasks() { val job = scope.launch { // ... } scope.launch { // ... job.join() // ... } } }

In a similar way, we could collect a whole collection of jobs and wait for all of them to finish. The same can be done with async and await. The result of await is Deferred, which is a subtype of Job, so we can also use join, but more often we use await, that additionally returns the result of the coroutine.

An exceptionally useful class for synchronizing coroutines is CompletableDeferred. It represents a deferred value with a completion function. So it is like a box for a value, that can be completed with a value (complete) or an exception (completeExceptionally), and that has a waiting point, where coroutine can wait using await until this CompletableDeferred is completed.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val deferred = CompletableDeferred<String>() launch { println("Starting first") delay(1000) deferred.complete("Test") delay(1000) println("First done") } launch { println("Starting second") println(deferred.await()) // Wait for deferred to complete println("Second done") } } // Starting first/Starting second // (1 sec) // Test // Second done // (1 sec) // First done

CompletableDeferred is useful when some coroutines need to await some value or event, that is produced by another coroutine. CompletableDeferred accepts only one value that can be awaited multiple times by multiple coroutines. If you want to have multiple values, you should use Channel instead. Channel is explained in a dedicated chapter.

Summary

In this chapter, we learned that:

  • Job is the most important context for each coroutine. It is a cancellable thing with a lifecycle. It has a state, and it can be used to cancel coroutines, track their state, and much more.
  • Every coroutine has its own job, and it is the only context not inherited from the parent. Job from an argument or parent coroutine is used as a parent of this new job.
  • Coroutines can be in one of the following states: "New", "Active", "Completing", "Completed", "Cancelling", and "Cancelled". Regular coroutines start in the "Active" state, when they finish their body execution, they move to the "Completing" state, and then once their children are completed, they move to the "Completed" state.
  • You should avoid using Job() as an explicit parent of coroutines, as it can lead to unexpected behavior.
  • Job can be used to synchronize coroutines. We can use join to wait for a coroutine to complete, or we can use CompletableDeferred to wait for a value produced by another coroutine.

The next two chapters describe cancellation and exception handling in Kotlin Coroutines. These two important mechanisms fully depend on the child-parent relationship created using Job.

0:

Yes, I repeat myself, but if there is one thing that I want you to remember, it is that Job is not inherited.

1:

A pattern that is well described in Effective Kotlin Item 32: Consider factory functions instead of constructors.

2:

I hope I do not need to remind the reader that toString should be used for debugging and logging purposes; it should not be parsed in code as this would break this function’s contract, as I described in Effective Kotlin.