Solution: Market data processing optimization

The key optimizations that should be applied are:

  • To change domain objects to use primitives and inline classes where possible.
  • Optimizing observeUpdates to filter only if necessary, and to use a set to store tickers.

In my case, the code initially took around 20 671 ms, after changing domain objects to use primitives and inline classes it took around 18 750 ms, and after optimizing observeUpdates it took around 14 128 ms

import effective.efficient.Filter.* import effective.efficient.Filter.Relation.* import effective.efficient.Filter.SnapshotPart.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random import kotlin.system.measureTimeMillis data class TickerSnapshot( val ticker: Ticker, val snapshot: Snapshot, ) data class Snapshot( var bid: PriceSizeTime?, var ask: PriceSizeTime?, var last: PriceSizeTime?, ) data class PriceSizeTime( val price: Price, val size: Int = -1, val time: Long = -1, ) @JvmInline value class Ticker(val value: String) @JvmInline value class Price(val value: Float) sealed interface Event { val ticker: String } data class BidEvent( override val ticker: String, val price: Float = Float.NaN, val size: Int = -1, val time: Long = -1 ) : Event data class AskEvent( override val ticker: String, val price: Float = Float.NaN, val size: Int = -1, val time: Long = -1 ) : Event data class TradeEvent( override val ticker: String, val price: Float = Float.NaN, val size: Int = -1, val time: Long = -1 ) : Event val tickers = List(1000) { Ticker("Ticker$it") } // Do not touch this one class MarketClient { fun observe() = flow { val random = Random(123456789) while (true) { val event = when ((0..2).random(random)) { 0 -> BidEvent( tickers.random(random).value, if (random.nextInt(100) == 1) Float.NaN else (0..100).random(random).toFloat(), if (random.nextInt(100) == 1) -1 else (0..100).random(random), if (random.nextInt(100) == 1) -1 else System.currentTimeMillis() ) 1 -> AskEvent( tickers.random(random).value, if (random.nextInt(100) == 1) Float.NaN else (0..100).random(random).toFloat(), if (random.nextInt(100) == 1) -1 else (0..100).random(random), if (random.nextInt(100) == 1) -1 else System.currentTimeMillis() ) else -> TradeEvent( tickers.random(random).value, if (random.nextInt(100) == 1) Float.NaN else (0..100).random(random).toFloat(), if (random.nextInt(100) == 1) -1 else (0..100).random(random), if (random.nextInt(100) == 1) -1 else System.currentTimeMillis() ) } emit(event) } } } class MarketRepository( private val client: MarketClient, backgroundScope: CoroutineScope, ) { private val snapshots = ConcurrentHashMap<Ticker, TickerSnapshot>() private val updates = MutableSharedFlow<TickerSnapshot>() fun observeUpdates() = updates .onStart { snapshots.forEach { emit(it.value) } } init { backgroundScope.launch { client.observe().collect { when (it) { is BidEvent -> { val snapshot = snapshots.getOrPut(Ticker(it.ticker)) { TickerSnapshot(Ticker(it.ticker), Snapshot(null, null, null)) } snapshot.snapshot.bid = PriceSizeTime(Price(it.price), it.size, it.time) val ticker = Ticker(it.ticker) snapshots[ticker] = snapshot updates.emit(snapshot) } is AskEvent -> { val snapshot = snapshots.getOrPut(Ticker(it.ticker)) { TickerSnapshot(Ticker(it.ticker), Snapshot(null, null, null)) } snapshot.snapshot.ask = PriceSizeTime(Price(it.price), it.size, it.time) val ticker = Ticker(it.ticker) snapshots[ticker] = snapshot updates.emit(snapshot) } is TradeEvent -> { val snapshot = snapshots.getOrPut(Ticker(it.ticker)) { TickerSnapshot(Ticker(it.ticker), Snapshot(null, null, null)) } snapshot.snapshot.last = PriceSizeTime(Price(it.price), it.size, it.time) val ticker = Ticker(it.ticker) snapshots[ticker] = snapshot updates.emit(snapshot) } } } } } } sealed class Filter { data object All : Filter() class Or(val filters: List<Filter>) : Filter() class And(val filters: List<Filter>) : Filter() class PrizeCondition( val snapshotPart: SnapshotPart, val relation: Relation, val value: Float, ) : Filter() class TickerIs(val tickers: List<Ticker>) : Filter() class Not(val filter: Filter) : Filter() enum class SnapshotPart { Ask, Bid, Last, Spread } enum class Relation { GreaterThan, LessThan, Equal } fun check(tickerSnapshot: TickerSnapshot): Boolean = when (this) { All -> true is Or -> filters.any { it.check(tickerSnapshot) } is And -> filters.all { it.check(tickerSnapshot) } is PrizeCondition -> run { val snapshotPrize = when (snapshotPart) { Ask -> tickerSnapshot.snapshot.ask?.price?.value.takeUnless { it == Float.NaN } ?: return@run false Bid -> tickerSnapshot.snapshot.bid?.price?.value.takeUnless { it == Float.NaN } ?: return@run false Last -> tickerSnapshot.snapshot.last?.price?.value.takeUnless { it == Float.NaN } ?: return@run false Spread -> { val bid = tickerSnapshot.snapshot.bid?.price?.value.takeUnless { it == Float.NaN } ?: return@run false val ask = tickerSnapshot.snapshot.ask?.price?.value.takeUnless { it == Float.NaN } ?: return@run false ask - bid } } when (relation) { GreaterThan -> snapshotPrize > value LessThan -> snapshotPrize < value Equal -> snapshotPrize == value } } is TickerIs -> tickers.contains(tickerSnapshot.ticker) is Not -> !filter.check(tickerSnapshot) } } class TradeService( private val repository: MarketRepository, ) { fun observeUpdates( filter: Filter, tickers: List<Ticker>? = null, ): Flow<TickerSnapshot> { val tickersSet = tickers.orEmpty().toSet() return repository.observeUpdates() .filerIf(tickers != null) { it.ticker in tickersSet } .filter { filter.check(it) } } } inline fun <T> Flow<T>.filerIf(addFilterIf: Boolean, crossinline condition: suspend (T) -> Boolean) = if (!addFilterIf) this else this.filter(condition) suspend fun main() { val client = MarketClient() val repository = MarketRepository(client, backgroundScope = CoroutineScope(SupervisorJob())) val service = TradeService(repository) val filter = Or( listOf( And(listOf(TickerIs(tickers.take(1)), PrizeCondition(Ask, GreaterThan, 99f))), And(listOf(PrizeCondition(Spread, GreaterThan, 99f))), ) ) measureTimeMillis { service.observeUpdates( filter = filter, tickers = tickers.take(70) ).take(1_000) .collect { println(it) } }.let { println("Took $it") } }

Beyond that, there are some changes that can be applied, but they would mean possible safety issues, that are are only partially possible to secure. Those possibilities include:

  • Using MutableMap instead of ConcurrentHashMap.
  • Making domain objects mutable.