article banner

Hot and cold data sources

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Kotlin Coroutines initially had only Channel, but creators noticed that this was not enough. Channels are a hot stream of values, but we often need a stream that is cold.

Understanding the difference between hot and cold streams of data is useful software-craftsmanship knowledge because most data sources you use daily fall into one of these two categories. Collections (List, Set, etc.) are hot, while Sequence and Java Stream are cold. Channel is hot, while Flow and RxJava streams (Observable, Single, etc.) are cold1.

HotCold
Collections (List, Set)Sequence, Stream
ChannelFlow, RxJava streams

Hot vs cold

Hot data streams are eager, produce elements independently of their consumption, and store the elements. Cold data streams are lazy, perform their operations on-demand, and store nothing.

We can observe these differences when we use lists (hot) and sequences (cold). Builders and operations on hot data streams start immediately. On cold data streams, they are not started until the elements are needed.

import kotlin.* //sampleStart fun main() { val l = buildList { repeat(3) { println("L: Adding User$it") add("User$it") } } val l2 = l.map { println("L: Processing $it") "Processed $it" } val s = sequence { repeat(3) { println("S: Adding User$it") yield("User$it") } } val s2 = s.map { println("S: Processing $it") "Processed $it" } } // L: Added User0 // L: Added User1 // L: Added User2 // L: Processing User0 // L: Processing User1 // L: Processing User2 //sampleEnd

As a result, cold data streams (like Sequence, Stream or Flow):

  • can be infinite;
  • do a minimal number of operations;
  • use less memory (no need to allocate all the intermediate collections).

Sequence processing does fewer operations because it processes elements lazily. The way it works is very simple. Each intermediate operation (like map or filter) just decorates the previous sequence with a new operation. The terminal operation2 does all the work. Think of the example below. In the case of a sequence, find asks the result of the map for the first element. It asks the sequence returned from the sequenceOf (returns 1), then maps it (to 1) and returns it to the filter. filter checks if this is an element that fulfills its criteria. If the element does not fulfill the criteria, filter asks again and again until the proper element is found.

This is very different from list processing, which at every intermediate step calculates and returns a fully processed collection. This is why the order of element processing is different and collection processing takes more memory and might require more operations (like in the example below).

import kotlin.* //sampleStart fun m(i: Int): Int { print("m$i ") return i * i } fun f(i: Int): Boolean { print("f$i ") return i >= 10 } fun main() { listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } .find { f(it) } .let { print(it) } // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1 f4 f9 f16 16 println() sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } .find { f(it) } .let { print(it) } // m1 f1 m2 f4 m3 f9 m4 f16 16 } //sampleEnd

This means that a list is a collection of elements, but a sequence is just a definition of how these elements should be calculated. Hot data streams:

  • are always ready to be used (each operation can be a terminal operation);
  • create a new collection at each operation;
  • do not need to recalculate the result when used multiple times.
import kotlin.* //sampleStart fun m(i: Int): Int { print("m$i ") return i * i } fun main() { val l = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 println(l) // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] println(l.find { it > 10 }) // 16 println(l.find { it > 10 }) // 16 println(l.find { it > 10 }) // 16 val s = sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } println(s.toList()) // [1, 4, 9, 16, 25, 36, 49, 64, 81, 100] println(s.find { it > 10 }) // m1 m2 m3 m4 16 println(s.find { it > 10 }) // m1 m2 m3 m4 16 println(s.find { it > 10 }) // m1 m2 m3 m4 16 } //sampleEnd

Java Stream shares characteristics with Kotlin's Sequence. They are both cold streams of values.

Hot channels, cold flow

Time to get back to coroutines. The most typical way to create a flow is by using a builder, which is similar to the produce function. It is called flow.

val channel = produce { while (true) { val x = computeNextValue() send(x) } } val flow = flow { while (true) { val x = computeNextValue() emit(x) } }

These builders are conceptually equivalent, but since the behavior of channel and flow is very different, there are also important differences between these two functions. Take a look at the example below. Channels are hot, so they immediately start calculating the values. This calculation starts in a separate coroutine. This is why produce needs to be a coroutine builder that is defined as an extension function on CoroutineScope. The calculation starts immediately, but since the default buffer size is 0 (rendezvous) it will soon be suspended until the receiver is ready in the example below. Note that there is a difference between stopping production when there is no receiver and producing on-demand. Channels, as hot data streams, produce elements independently of their consumption and then keep them. They do not care how many receivers there are. Since each element can be received only once, after the first receiver consumes all the elements, the second one will find a channel that is empty and closed already. This is why it will receive no elements at all.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* //sampleStart private fun CoroutineScope.makeChannel() = produce { println("Channel started") for (i in 1..3) { delay(1000) send(i) } } suspend fun main(): Unit = coroutineScope { val channel = makeChannel() delay(1000) println("Calling channel...") for (value in channel) { println(value) } println("Consuming again...") for (value in channel) { println(value) } } // Channel started // (1 sec) // Calling channel... // 1 // (1 sec) // 2 // (1 sec) // 3 // Consuming again... //sampleEnd

The same processing using Flow is very different. Since it is a cold data source, the production happens on demand. This means that flow is not a builder and does no processing. It is only a definition of how elements should be produced that will be used when a terminal operation (like collect) is used. This is why the flow builder does not need a CoroutineScope. It will run in the scope from the terminal operation that executed it (it takes the scope from the suspending function's continuation, just like coroutineScope and other coroutine scope functions). Each terminal operation on a flow starts processing from scratch. Compare the examples above and below because they show the key differences between Channel and Flow.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart private fun makeFlow() = flow { println("Flow started") for (i in 1..3) { delay(1000) emit(i) } } suspend fun main(): Unit = coroutineScope { val flow = makeFlow() delay(1000) println("Calling flow...") flow.collect { value -> println(value) } println("Consuming again...") flow.collect { value -> println(value) } } // (1 sec) // Calling flow... // Flow started // (1 sec) // 1 // (1 sec) // 2 // (1 sec) // 3 // Consuming again... // Flow started // (1 sec) // 1 // (1 sec) // 2 // (1 sec) // 3 //sampleEnd

RxJava streams share most characteristics with Kotlin's Flow. Some even say that Flow could be called "RxCoroutines"3.

Summary

Most data sources are either hot or cold:

  • Hot data sources are eager. They produce elements as soon as possible and store them. They create elements independently of their consumption. These are collections (List, Set) and Channel.
  • Cold data sources are lazy. They process elements on-demand on the terminal operation. All intermediate functions just define what should be done (most often using the Decorator pattern). They generally do not store elements and create them on demand. They do the minimal number of operations and can be infinite. Their creation and processing of elements is typically the same process as consumption. These elements are Sequence, Java Stream, Flow and RxJava streams (Observable, Single, etc).

This explains the essential difference between Channel and Flow. Now it is time to discuss all the different features supported by the latter.

1:

This is true in general, but there are exceptions. Some functions and builders, like buffer or channelFlow, introduce some hotness into Flow. Also, SharedFlow and StateFlow are hot.

2:

An operation on a sequence that returns a different type. Typically find or toList.

3:

I heard this first from Alex Piotrowski during Kotlin/Everywhere Warsaw 21.11.2019, https://youtu.be/xV1XRakSoWI. Who knows, maybe he is the one who popularized this term.