Hot and cold data sources

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

Kotlin coroutines initially had only Channel, but creators noticed that it is not enough. Channels are a hot stream of values, but we often need a stream that is cold.

Understanding the difference between hot and cold streams of data is useful software-craftsmanship knowledge, because most data sources you use daily fall into one of those two categories. Collections (List, Set, etc.) are hot, while Sequence and Java Stream are cold. Channel is hot, while Flow and RxJava streams (Observable, Single, etc.) are cold.

HotCold
Collections (List, Set)Sequence, Stream
ChannelFlow, RxJava streams

Hot vs cold

The first and the biggest difference is that hot data streams are eager, and cold are lazy. Builders and operations on hot data streams will start immediately when on cold data streams will not be started until they are needed. Take a look at the examples below, remembering that lists are hot, and sequences are cold.

import kotlin.* //sampleStart @OptIn(ExperimentalStdlibApi::class) fun main() { val l = buildList { repeat(3) { add("User$it") println("L: Added User") } } // L: Added User // L: Added User // L: Added User val l2 = l.map { println("L: Processing") "Processed $it" } // L: Processing // L: Processing // L: Processing val s = sequence { repeat(3) { yield("User$it") println("S: Added User") } } val s2 = s.map { println("S: Processing") "Processed $it" } } //sampleEnd

As a result, cold data streams (like Sequence, Stream or Flow):

  • can be infinite,
  • make a minimal number of operations,
  • are using less memory (no need to allocate all the intermediate collections).

Take a look at the example below. Sequence processing makes fewer operations because it processes elements by element. Because of that, also the order of those operations is different.

import kotlin.* //sampleStart fun m(i: Int): Int { print("m$i ") return i * i } fun f(i: Int): Boolean { print("f$i ") return i >= 10 } fun main() { listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } .find { f(it) } .let { print(it) } // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1 f4 f9 f16 16 println() sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } .find { f(it) } .let { print(it) } // m1 f1 m2 f4 m3 f9 m4 f16 16 } //sampleEnd

The way how cold data streams are implemented is that all the operations are executed by the last, terminal operation. The intermediate operations are just decorating the previous stream with a new operation.

On the other hand, hot data streams:

  • are always ready to be used (each operation can be a terminal operation),
  • do not need to recalculate the result when used multiple times.
import kotlin.* //sampleStart fun m(i: Int): Int { print("m$i ") return i * i } fun main() { val l = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } // m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 println() println(l.find { it > 10 }) // 16 println(l.find { it > 10 }) // 16 println(l.find { it > 10 }) // 16 val s = sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .map { m(it) } println(s.find { it > 10 }) // m1 m2 m3 m4 16 println(s.find { it > 10 }) // m1 m2 m3 m4 16 println(s.find { it > 10 }) // m1 m2 m3 m4 16 } //sampleEnd

Java Stream shares characteristics with Kotlin Sequence. They are both cold streams of values.

Hot channels, cold flow

Time to get back to coroutines. We create a flow using a builder similar to this one used by channel.

val channel = produce { while (true) { val x = computeNextValue() send(x) } } val flow = flow { while (true) { val x = computeNextValue() emit(x) } }

The way how they work is different. Channels are hot, so they will immediately start calculating the values. This will run on a separate coroutine. This is why produce needs to be a coroutine builder, defined as an extension function on CoroutineScope. Since channels have buffer size 0 by default (rendezvous), it will soon stop. Though stopping production when there is no receiver is different from producing on demand. Finally, as a hot data stream, channels will not produce elements again, as another listener starts observing. The process of production is considered independent of the process of consumption.

import kotlinx.coroutines.* import kotlinx.coroutines.channels.* //sampleStart private fun CoroutineScope.makeChannel() = produce { println("Channel started") for (i in 1..3) { delay(1000) send(i) } } suspend fun main() = coroutineScope { val channel = makeChannel() delay(1000) println("Calling channel...") channel.consumeEach { value -> println(value) } println("Consuming again...") channel.consumeEach { value -> println(value) } } // Channel started // (1 sec) // Calling channel... // 1 // (1 sec) // 2 // (1 sec) // 3 // Consuming again... //sampleEnd

The same processing using Flow is very different. Since it is cold data flow, the production happens on demand. It is started for each consumer. Thanks to that, each has the most recent data. Flow builder does not need context, as it runs on the terminal operation (collect in this case) scope (it takes the scope from suspending function continuation, just like coroutineScope).

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* //sampleStart private fun makeFlow() = flow { println("Flow started") for (i in 1..3) { delay(1000) emit(i) } } suspend fun main() = coroutineScope { val flow = makeFlow() delay(1000) println("Calling flow...") flow.collect { value -> println(value) } println("Consuming again...") flow.collect { value -> println(value) } } // (1 sec) // Calling flow... // Flow started // (1 sec) // 1 // (1 sec) // 2 // (1 sec) // 3 // Consuming again... // Flow started // (1 sec) // 1 // (1 sec) // 2 // (1 sec) // 3 //sampleEnd

RxJava streams share most characteristics with Kotlin Flow. Some even say that Flow could be called "RxCoroutines".

Summary

Most data sources are either hot or cold:

  • Hot data sources are eager, making elements as soon as possible and storing them. Those are collections (List, Set) and Channel.
  • Cold data sources are lazy. They process elements on demand, on the terminal operation. All intermediate functions are just defining what should be done (most often using the Decorator pattern). They generally do not store elements and create them on demand. They do the minimal number of operations, and can be infinite. Cold are Sequence, Java Stream, Flow and RxJava streams (Observable, Single, etc).

This explains the essential difference between the Channel and Flow. Now it is time to discuss all the different features supported by the letter.