To change domain objects to use primitives and inline classes where possible.
Optimizing observeUpdates to filter only if necessary, and to use a set to store tickers.
In my case, the code initially took around 20 671 ms, after changing domain objects to use primitives and inline classes it took around 18 750 ms, and after optimizing observeUpdates it took around 14 128 ms
import effective.efficient.Filter.*
import effective.efficient.Filter.Relation.*
import effective.efficient.Filter.SnapshotPart.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
import kotlin.system.measureTimeMillis
data class TickerSnapshot(
val ticker: Ticker,
val snapshot: Snapshot,
)
data class Snapshot(
var bid: PriceSizeTime?,
var ask: PriceSizeTime?,
var last: PriceSizeTime?,
)
data class PriceSizeTime(
val price: Price,
val size: Int = -1,
val time: Long = -1,
)
@JvmInline
value class Ticker(val value: String)
@JvmInline
value class Price(val value: Float)
sealed interface Event {
val ticker: String
}
data class BidEvent(
override val ticker: String,
val price: Float = Float.NaN,
val size: Int = -1,
val time: Long = -1
) : Event
data class AskEvent(
override val ticker: String,
val price: Float = Float.NaN,
val size: Int = -1,
val time: Long = -1
) : Event
data class TradeEvent(
override val ticker: String,
val price: Float = Float.NaN,
val size: Int = -1,
val time: Long = -1
) : Event
val tickers = List(1000) { Ticker("Ticker$it") }
// Do not touch this one
class MarketClient {
fun observe() = flow {
val random = Random(123456789)
while (true) {
val event = when ((0..2).random(random)) {
0 -> BidEvent(
tickers.random(random).value,
if (random.nextInt(100) == 1) Float.NaN else (0..100).random(random).toFloat(),
if (random.nextInt(100) == 1) -1 else (0..100).random(random),
if (random.nextInt(100) == 1) -1 else System.currentTimeMillis()
)
1 -> AskEvent(
tickers.random(random).value,
if (random.nextInt(100) == 1) Float.NaN else (0..100).random(random).toFloat(),
if (random.nextInt(100) == 1) -1 else (0..100).random(random),
if (random.nextInt(100) == 1) -1 else System.currentTimeMillis()
)
else -> TradeEvent(
tickers.random(random).value,
if (random.nextInt(100) == 1) Float.NaN else (0..100).random(random).toFloat(),
if (random.nextInt(100) == 1) -1 else (0..100).random(random),
if (random.nextInt(100) == 1) -1 else System.currentTimeMillis()
)
}
emit(event)
}
}
}
class MarketRepository(
private val client: MarketClient,
backgroundScope: CoroutineScope,
) {
private val snapshots = ConcurrentHashMap<Ticker, TickerSnapshot>()
private val updates = MutableSharedFlow<TickerSnapshot>()
fun observeUpdates() = updates
.onStart { snapshots.forEach { emit(it.value) } }
init {
backgroundScope.launch {
client.observe().collect {
when (it) {
is BidEvent -> {
val snapshot = snapshots.getOrPut(Ticker(it.ticker)) { TickerSnapshot(Ticker(it.ticker), Snapshot(null, null, null)) }
snapshot.snapshot.bid = PriceSizeTime(Price(it.price), it.size, it.time)
val ticker = Ticker(it.ticker)
snapshots[ticker] = snapshot
updates.emit(snapshot)
}
is AskEvent -> {
val snapshot = snapshots.getOrPut(Ticker(it.ticker)) { TickerSnapshot(Ticker(it.ticker), Snapshot(null, null, null)) }
snapshot.snapshot.ask = PriceSizeTime(Price(it.price), it.size, it.time)
val ticker = Ticker(it.ticker)
snapshots[ticker] = snapshot
updates.emit(snapshot)
}
is TradeEvent -> {
val snapshot = snapshots.getOrPut(Ticker(it.ticker)) { TickerSnapshot(Ticker(it.ticker), Snapshot(null, null, null)) }
snapshot.snapshot.last = PriceSizeTime(Price(it.price), it.size, it.time)
val ticker = Ticker(it.ticker)
snapshots[ticker] = snapshot
updates.emit(snapshot)
}
}
}
}
}
}
sealed class Filter {
data object All : Filter()
class Or(val filters: List<Filter>) : Filter()
class And(val filters: List<Filter>) : Filter()
class PrizeCondition(
val snapshotPart: SnapshotPart,
val relation: Relation,
val value: Float,
) : Filter()
class TickerIs(val tickers: List<Ticker>) : Filter()
class Not(val filter: Filter) : Filter()
enum class SnapshotPart {
Ask, Bid, Last, Spread
}
enum class Relation {
GreaterThan, LessThan, Equal
}
fun check(tickerSnapshot: TickerSnapshot): Boolean = when (this) {
All -> true
is Or -> filters.any { it.check(tickerSnapshot) }
is And -> filters.all { it.check(tickerSnapshot) }
is PrizeCondition -> run {
val snapshotPrize = when (snapshotPart) {
Ask -> tickerSnapshot.snapshot.ask?.price?.value.takeUnless { it == Float.NaN } ?: return@run false
Bid -> tickerSnapshot.snapshot.bid?.price?.value.takeUnless { it == Float.NaN } ?: return@run false
Last -> tickerSnapshot.snapshot.last?.price?.value.takeUnless { it == Float.NaN } ?: return@run false
Spread -> {
val bid =
tickerSnapshot.snapshot.bid?.price?.value.takeUnless { it == Float.NaN } ?: return@run false
val ask =
tickerSnapshot.snapshot.ask?.price?.value.takeUnless { it == Float.NaN } ?: return@run false
ask - bid
}
}
when (relation) {
GreaterThan -> snapshotPrize > value
LessThan -> snapshotPrize < value
Equal -> snapshotPrize == value
}
}
is TickerIs -> tickers.contains(tickerSnapshot.ticker)
is Not -> !filter.check(tickerSnapshot)
}
}
class TradeService(
private val repository: MarketRepository,
) {
fun observeUpdates(
filter: Filter,
tickers: List<Ticker>? = null,
): Flow<TickerSnapshot> {
val tickersSet = tickers.orEmpty().toSet()
return repository.observeUpdates()
.filerIf(tickers != null) { it.ticker in tickersSet }
.filter { filter.check(it) }
}
}
inline fun <T> Flow<T>.filerIf(addFilterIf: Boolean, crossinline condition: suspend (T) -> Boolean) =
if (!addFilterIf) this else this.filter(condition)
suspend fun main() {
val client = MarketClient()
val repository = MarketRepository(client, backgroundScope = CoroutineScope(SupervisorJob()))
val service = TradeService(repository)
val filter = Or(
listOf(
And(listOf(TickerIs(tickers.take(1)), PrizeCondition(Ask, GreaterThan, 99f))),
And(listOf(PrizeCondition(Spread, GreaterThan, 99f))),
)
)
measureTimeMillis {
service.observeUpdates(
filter = filter,
tickers = tickers.take(70)
).take(1_000)
.collect { println(it) }
}.let { println("Took $it") }
}
Beyond that, there are some changes that can be applied, but they would mean possible safety issues, that are are only partially possible to secure. Those possibilities include:
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.