article banner (priority)

Job and children awaiting in Kotlin Coroutines

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub.

In the Structured Concurrency chapter, we mentioned the following consequences of the parent-child relationship:

  • children inherit context from their parent;
  • a parent suspends until all the children are finished;
  • when the parent is cancelled, its child coroutines are also cancelled;
  • when a child is destroyed, it also destroys the parent.

The fact that a child inherits its context from its parent is a basic part of a coroutine builder's behavior.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking(CoroutineName("main")) { val name = coroutineContext[CoroutineName]?.name println(name) // main launch { delay(1000) val name = coroutineContext[CoroutineName]?.name println(name) // main } } //sampleEnd

The other three important consequences of structured concurrency depend fully on the Job context. Furthermore, Job can be used to cancel coroutines, track their state, and much more. It is really important and useful, so this and the next two chapters are dedicated to the Job context and the essential Kotlin Coroutine mechanisms that are connected to it.

What is Job?

Conceptually, a job represents a cancellable thing with a lifecycle. Formally, Job is an interface, but it has a concrete contract and state, so it might be treated similarly to an abstract class.

A job lifecycle is represented by its state. Here is a graph of states and the transitions between them:

A diagram of job (so also coroutine) states.

In the "Active" state, a job is running and doing its job. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the "Active" state. Only those that are started lazily will start with the "New" state. These need to be started in order for them to move to the "Active" state. When a coroutine is executing its body, it is surely in the "Active" state. When it is done, its state changes to "Completing", where it waits for its children. Once all its children are done, the job changes its state to "Completed", which is a terminal one. Alternatively, if a job cancels or fails when running (in the "Active" or "Completing" state), its state will change to "Cancelling". In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do this in the next chapter). Once this is done, the job will move to the "Cancelled" state.

The state is displayed in a job’s toString2. In the example below, we see different jobs as their states change. The last one is started lazily, which means it does not start automatically. All the others will immediately become active once created.

The code below presents Job in different states. I use join to await coroutine completion. This will be explained later.

import kotlinx.coroutines.* suspend fun main() = coroutineScope { // Job created with a builder is active val job = Job() println(job) // JobImpl{Active}@ADD // until we complete it with a method job.complete() println(job) // JobImpl{Completed}@ADD // launch is initially active by default val activeJob = launch { delay(1000) } println(activeJob) // StandaloneCoroutine{Active}@ADD // here we wait until this job is done activeJob.join() // (1 sec) println(activeJob) // StandaloneCoroutine{Completed}@ADD // launch started lazily is in New state val lazyJob = launch(start = CoroutineStart.LAZY) { delay(1000) } println(lazyJob) // LazyStandaloneCoroutine{New}@ADD // we need to start it, to make it active lazyJob.start() println(lazyJob) // LazyStandaloneCoroutine{Active}@ADD lazyJob.join() // (1 sec) println(lazyJob) //LazyStandaloneCoroutine{Completed}@ADD }

To check the state in code, we use the properties isActive, isCompleted, and isCancelled.

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

As mentioned above, each coroutine has its own job. Let's see how we can access and use it.

Coroutine builders create their jobs based on their parent job

Every coroutine builder from the Kotlin Coroutines library creates its own job. Most coroutine builders return their jobs, so it can be used elsewhere. This is clearly visible for launch, where Job is an explicit result type.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val job: Job = launch { delay(1000) println("Test") } } //sampleEnd

The type returned by the async function is Deferred<T>, and Deferred<T> also implements the Job interface, so it can also be used in the same way.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val deferred: Deferred<String> = async { delay(1000) "Test" } val job: Job = deferred } //sampleEnd

Since Job is a coroutine context, we can access it using coroutineContext[Job]. However, there is also an extension property job, which lets us access the job more easily.

// extension val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't...") // usage fun main(): Unit = runBlocking { print(coroutineContext.job.isActive) // true }

There is a very important rule: Job is the only coroutine context that is not inherited by a coroutine from a coroutine. Every coroutine creates its own Job, and the job from an argument or parent coroutine is used as a parent of this new job.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val name = CoroutineName("Some name") val job = Job() launch(name + job) { val childName = coroutineContext[CoroutineName] println(childName == name) // true val childJob = coroutineContext[Job] println(childJob == job) // false println(childJob == job.children.first()) // true } }

The parent can reference all its children, and the children can refer to the parent. This parent-child relationship (Job reference storing) enables the implementation of cancellation and exception handling inside a coroutine’s scope.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val job: Job = launch { delay(1000) } val parentJob: Job = coroutineContext.job // or coroutineContext[Job]!! println(job == parentJob) // false val parentChildren: Sequence<Job> = parentJob.children println(parentChildren.first() == job) // true }

Structured concurrency mechanisms will not work if a new Job context replaces the one from the parent. To see this, we might use the Job() function, which creates a Job context (this will be explained later).

import kotlinx.coroutines.* fun main(): Unit = runBlocking { launch(Job()) { // the new job replaces one from parent delay(1000) println("Will not be printed") } } // (prints nothing, finishes immediately)

In the above example, the parent does not wait for its children because it has no relation with them. This is because the child uses the job from the argument as a parent, so it has no relation to the runBlocking.

When a coroutine has its own (independent) job, it has nearly no relation to its parent. It only inherits other contexts, but other results of the parent-child relationship will not apply. This causes us to lose structured concurrency, which is a problematic situation that should be avoided.

Waiting for children

The first important advantage of a job is that it can be used to wait until the coroutine is completed. For that, we use the join method. This is a suspending function that suspends until a concrete job reaches a final state (either Completed or Cancelled).

import kotlinx.coroutines.* fun main(): Unit = runBlocking { val job1 = launch { delay(1000) println("Test1") } val job2 = launch { delay(2000) println("Test2") } job1.join() job2.join() println("All tests are done") } // (1 sec) // Test1 // (1 sec) // Test2 // All tests are done

The Job interface also exposes a children property that lets us reference all its children. We might as well use it to wait until all children are in a final state.

import kotlinx.coroutines.* fun main(): Unit = runBlocking { launch { delay(1000) println("Test1") } launch { delay(2000) println("Test2") } val children = coroutineContext[Job] ?.children val childrenNum = children?.count() println("Number of children: $childrenNum") children?.forEach { it.join() } println("All tests are done") } // Number of children: 2 // (1 sec) // Test1 // (1 sec) // Test2 // All tests are done

Job factory function

A Job can be created without a coroutine using the Job() factory function. It creates a job that isn't associated with any coroutine and can be used as a context. This also means that we can use such a job as a parent of many coroutines.

A common mistake is to create a job using the Job() factory function, use it as a parent for some coroutines, and then use join on the job. Such a program will never end because Job is still in an active state, even when all its children are finished. This is because this context is still ready to be used by other coroutines.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.join() // Here we will await forever println("Will not be printed") } // (1 sec) // Text 1 // (1 sec) // Text 2 // (runs forever) //sampleEnd

A better approach would be to join all the current children of the job.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.children.forEach { it.join() } } // (1 sec) // Text 1 // (1 sec) // Text 2 //sampleEnd

Job() is a great example of a factory function. At first, you might think that you're calling a constructor of Job, but you might then realize that Job is an interface, and interfaces cannot have constructors. The reality is that it is a fake constructor1 - a simple function that looks like a constructor. Moreover, the actual type returned by this function is not a Job but its subinterface CompletableJob.

public fun Job(parent: Job? = null): CompletableJob

The CompletableJob interface extends the functionality of the Job interface by providing two additional methods:

  • complete(): Boolean - used to complete a job. Once it is used, all the child coroutines will keep running until they are all done, but new coroutines cannot be started in this job. The result is true if this job was completed as a result of this invocation; otherwise, it is false (if it was already completed).
import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking //sampleStart fun main() = runBlocking { val job = Job() launch(job) { repeat(5) { num -> delay(200) println("Rep$num") } } launch { delay(500) job.complete() } job.join() launch(job) { println("Will not be printed") } println("Done") } // Rep0 // Rep1 // Rep2 // Rep3 // Rep4 // Done //sampleEnd
  • completeExceptionally(exception: Throwable): Boolean - Completes this job with a given exception. This means that all children will be cancelled immediately (with CancellationException wrapping the exception provided as an argument). The result, just like in the above function, responds to the question: “Was this job finished because of the invocation?".
import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Error //sampleStart fun main() = runBlocking { val job = Job() launch(job) { repeat(5) { num -> delay(200) println("Rep$num") } } launch { delay(500) job.completeExceptionally(Error("Some error")) } job.join() launch(job) { println("Will not be printed") } println("Done") } // Rep0 // Rep1 // Done //sampleEnd

The complete function is often used after we start the last coroutine on a job. Thanks to this, we can just wait for the job completion using the join function.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.complete() job.join() } // (1 sec) // Text 1 // (1 sec) // Text 2 //sampleEnd

You can pass a reference to the parent as an argument of the Job function. Thanks to this, such a job will be cancelled when the parent is.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val parentJob = Job() val job = Job(parentJob) launch(job) { delay(1000) println("Text 1") } launch(job) { delay(2000) println("Text 2") } delay(1100) parentJob.cancel() job.children.forEach { it.join() } } // Text 1 //sampleEnd

The next two chapters describe cancellation and exception handling in Kotlin Coroutines. These two important mechanisms fully depend on the child-parent relationship created using Job.

1:

A pattern that is well described in Effective Kotlin Item 33: Consider factory functions instead of constructors.

2:

I hope I do not need to remind the reader that toString should be used for debugging and logging purposes; it should not be parsed in code as this would break this function’s contract, as I described in Effective Kotlin.