Job and children awaiting in Kotlin Coroutines

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

In the chapter Structured Concurrency, we mentioned the following consequences of the parent-child relationship:

  • children inherit context from their parent,
  • a parent suspends until all the children are finished,
  • when the parent is canceled, its child coroutines are canceled too,
  • when a child is destroyed, it destroys a parent as well.

Inheriting a context from parent to child is a part of basic coroutine builders behavior.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking(CoroutineName("main")) { val name = coroutineContext[CoroutineName]?.name println(name) // main launch { delay(1000) val name = coroutineContext[CoroutineName]?.name println(name) // main } } //sampleEnd

The other three important consequences of the structured concurrency depend fully on the Job context. Furthermore, Job can be used to cancel coroutines, to track their state, and much more. It is really important and useful, so this and the next two chapters are dedicated to the Job and the essential Kotlin Coroutines mechanisms that are connected to it.

What is Job?

Conceptually, a job represents a cancellable thing with a lifecycle. Formally Job is an interface, but it has a concrete contract and state, so might be treated similarly to an abstract class.

A job lifecycle is represented by its state. There are the following states and transitions:

A diagram of job (so also coroutine) states.

In the "Active" state, a job is running and doing its job. If the job is created with a coroutine builder, this is the state where the body of this coroutine will be executed. In this state, we can start child coroutines. Most coroutines will start in the "Active" state. Only those that are started lazily, will start with the "New" state and need to be started to move to the "Active" state. If the job can finish them all uninterrupted, then it is moved to the "Completing" state, where it can wait for its children. Once the children are done, the job changes its state to "Completed", which is a terminal one. Alternatively, if a job cancels or fails when running (in "Active" or "Completing" state), its state will change to "Cancelling". In this state, we have the last chance to do some clean-up, like closing connections or freeing resources (we will see how to do it in the next chapter). Once it is done, the job will move to the "Cancelled" state.

The state is printed with job’s toString2. In the example below, we see different jobs as their states change. The last one is started lazily, which means it does not start automatically. All the others will immediately become active once created.

import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch suspend fun main() = coroutineScope { val job = Job() println(job) // JobImpl{Active}@ADD job.complete() println(job) // JobImpl{Completed}@ADD val activeJob = launch { // no-op } println(activeJob) // StandaloneCoroutine{Active}@ADD activeJob.join() println(activeJob) // StandaloneCoroutine{Completed}@ADD val lazyJob = launch(start = CoroutineStart.LAZY) { // no-op } println(lazyJob) // LazyStandaloneCoroutine{New}@ADD lazyJob.start() println(lazyJob) // LazyStandaloneCoroutine{Active}@ADD lazyJob.join() println(lazyJob) //LazyStandaloneCoroutine{Completed}@ADD }

To check the state in code, we use the properties: isActive, isCompleted, and isCancelled.

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

Coroutine builders create their jobs based on their parent job

The coroutine builders from the Kotlin coroutines library create their own job. Most of them return it, so it can be used outside. It is clearly visible for launch, where Job is an explicit result type.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val job: Job = launch { delay(1000) println("Test") } } //sampleEnd

The type returned by async is Deferred<T>, but it also implements the Job interface, so can be used the same way as well.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val ret: Deferred<String> = async { delay(1000) "Test" } val job: Job = ret } //sampleEnd

Since Job is a coroutine context, we can access it using coroutineContext[Job]. Although, there is also an extension property job , that lets us access a job more easily.

val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't...")

The way how a job is passed from parent to child is different from other contexts. Other coroutine contexts are just passed unchanged. Every coroutine builder creates its Job, that has a relationship to the parents' Job. The parent can reference all its children and the children can refer to the parent. This is how waiting for children’s coroutines, cancellation, and exception handling are implemented.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val job: Job = launch { delay(1000) } val parentJob: Job = coroutineContext.job // or coroutineContext[Job]!! println(job == parentJob) // false val parentChildren: Sequence<Job> = parentJob.children println(parentChildren.first() == job) // true } //sampleEnd

Structured concurrency mechanisms will not work if a new Job context replaces the one from the parent. To see that, we might use the Job() function, which creates a Job context (it will be explained later).

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { launch(Job()) { // the new job replaces one from parent delay(1000) println("Will not be printed") } } // (prints nothing, finishes immediately) //sampleEnd

In the above example, the parent does not wait for its children, because it has no relation with them. It is because children have their own jobs that replace the parent's jobs.

When a coroutine cuts off from a parent job, there is nearly no relation, except for other contexts being passed. We are losing a structured concurrency, so we need to be very careful when doing that.

Children awaiting

The first important advantage of a job is that it can be used to wait until the coroutine is completed. For that, we use the join method. It is a suspending function, that suspends until a concrete job reaches a final state (either Completed or Cancelled).

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { val job1 = launch { delay(1000) println("Test1") } val job2 = launch { delay(1000) println("Test2") } job1.join() job2.join() println("All tests are done") } // Test1 // Test2 // All tests are done //sampleEnd

Job interface also exposes a children property that lets us reference all its children. We might as well use it to wait until all children are in a final state.

import kotlinx.coroutines.* //sampleStart fun main(): Unit = runBlocking { launch { delay(1000) println("Test1") } launch { delay(1000) println("Test2") } coroutineContext[Job] ?.children ?.forEach { it.join() } println("All tests are done") } // Test1 // Test2 // All tests are done //sampleEnd

Job factory function

A Job can be also created without a coroutine, using the Job() factory function. This creates a job that isn't associated with any coroutine and can be used as a context. This also means that we can use such a job as a parent of many coroutines.

A common mistake is to create a job using the Job() factory function, start some coroutines on it, and then use join on the job. Such a program will never end, because Job is still in an active state, even when its children are finished. It is because this context is still ready to be used by other coroutines.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.join() // Here we will await forever } // (1 sec) // Text 1 // (1 sec) // Text 2 // (runs forever) //sampleEnd

A better approach would be to join on all the current children of the job.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.children.forEach { it.join() } } // (1 sec) // Text 1 // (1 sec) // Text 2 //sampleEnd

Job() is a great example of a factory function. At first, you might think that you're calling a constructor of the Job class. The reality is that it is instead a fake constructor1 - a simple function that looks like a constructor. Moreover, the actual type returned by this function is not a Job, but its subinterface CompletableJob.

public fun Job(parent: Job? = null): CompletableJob

CompletableJob interface adds two additional methods to the Job:

  • complete(): Boolean - used to complete a job. Once it is used, all the child coroutines will keep running until they are all done, but new coroutines cannot be started on this job. The result is true if this job was completed as a result of this invocation and false otherwise (if it was already completed, for instance, because of an exception in a child).
  • completeExceptionally(exception: Throwable): Boolean - Completes this job exceptionally with a given exception. This means that all children will be canceled immediately (with CancellationException wrapping the exception provided as an argument). The result, just like in the above function, responds to the question: “Was this job finished because of the invocation?”.
import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking //sampleStart fun main() = runBlocking { val job = Job() launch(job) { repeat(5) { num -> delay(200) println("Rep$num") } } launch { delay(500) job.complete() } job.join() launch(job) { println("Will not be printed") } println("Done") } // Rep0 // Rep1 // Rep2 // Rep3 // Rep4 // Done //sampleEnd
import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.lang.Error //sampleStart fun main() = runBlocking { val job = Job() launch(job) { repeat(5) { num -> delay(200) println("Rep$num") } } launch { delay(500) job.completeExceptionally(Error("Some error")) } job.join() launch(job) { println("Will not be printed") } println("Done") } // Rep0 // Rep1 // Done //sampleEnd

The function complete is often used after we start the last coroutine on a job. Thanks to that, we can just wait for the job completion using the join function.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val job = Job() launch(job) { // the new job replaces one from parent delay(1000) println("Text 1") } launch(job) { // the new job replaces one from parent delay(2000) println("Text 2") } job.complete() job.join() } // (1 sec) // Text 1 // (1 sec) // Text 2 //sampleEnd

You can pass a reference to the parent as an argument of the Job function. Thanks to that, for instance, such a job will be canceled when the parent is.

import kotlinx.coroutines.* //sampleStart suspend fun main(): Unit = coroutineScope { val parentJob = Job() val job = Job(parentJob) launch(job) { delay(1000) println("Text 1") } launch(job) { delay(2000) println("Text 2") } delay(1100) parentJob.cancel() job.children.forEach { it.join() } } // Text 1 //sampleEnd

The next two chapters describe cancellation and exception handling in Kotlin Coroutines. Those two important mechanisms fully depend on the child-parent relationship created using Job.

1:

A pattern described well in the Effective Kotlin Item 33: Consider factory functions instead of constructors.

2:

I hope I do not need to remind, that toString should be used for debugging and logging purposes, and should not be parsed in code. This would be breaking this function contract, as I’ve described in the Effective Kotlin.