Coroutines answer to the problem with the mutable state

This is a chapter from the book Kotlin Coroutines. You can find Early Access on LeanPub.

Before we start, take a look at the below class UserDownloader. It allows to fetch a user by id or to get all the users that were downloaded before. What is wrong with this implementation?

class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() fun downloaded(): List<User> = users.toList() suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) users.add(newUser) } }

Notice the use of defensive copy toList. It is to avoid a conflict between reading the object returned by downloaded, and adding an element to the mutable list. We could also represent users using the read-only list (List<User>) and read-write property (var). Then we would not need to make a defensive copy, and downloaded would not need to be protected at all, but we would decrease the performance of element addition to the collection. I personally prefer the second approach, but I decided to show the one using mutable collection, as I see it more often in real-life projects.

The implementation is not prepared for concurrent use. Each fetchUser call modifies users. This can conflict with other calls of the same function. The problem is presented below:

import kotlinx.coroutines.* //sampleStart class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() fun downloaded(): List<User> = users.toList() suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) users += newUser } } class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // ~998242 } //sampleEnd

Because of conflicts, the above code will print some number smaller than 1 000 000 (like 998 242 for example), or might throw an exception.

Exception in thread "main" 
java.lang.ArrayIndexOutOfBoundsException: 22
    at java.util.ArrayList.add(ArrayList.java:463)
    ...

Here we are facing a problem with a shared state. To see it more clearly, we will test our solutions using the massiveRun function. Below you can see, how using it without any synchronization leads to many numbers being lost.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext //sampleStart var counter = 0 fun main() = runBlocking { massiveRun { counter++ } println(counter) // ~567231 } suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } } //sampleEnd

Blocking synchronization

The problem can be solved using classic tools we know from Java, like synchronized block or synchronized collections.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext //sampleStart var counter = 0 fun main() = runBlocking { val lock = Any() massiveRun { synchronized(lock) { // We are blocking threads! counter++ } } println("Counter = $counter") // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

This solution works, but there is a problem: we are blocking threads when they are waiting for their turn. I hope that after the chapter about dispatchers you understand, that we do not want to block threads. What if it is the Main thread? What if we have only a limited pool of threads? Why wasting those resources? We should use coroutines-specific tools instead. Ones that will not block, but instead suspend or avoid conflict. So let's set aside this solution and explore some others.

Atomics

There is another Java solution that can help us in some simple cases. Java has a set of atomic values. All their operations are fast and guaranteed to be "thread-safe". They are called atomic. Their operations are implemented in a low level, without locks, so this solution is efficient and appropriate for us. There are different atomic values we can use. For our case we can use AtomicInteger.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.atomic.AtomicInteger //sampleStart private var counter = AtomicInteger() fun main() = runBlocking { massiveRun { counter.incrementAndGet() } println(counter.get()) // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

It works perfectly here, but in general, the utility of atomic values is very limited. We need to be careful because just because we know a single operation will be atomic, does not help us when we have a bundle of operations.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.atomic.AtomicInteger //sampleStart private var counter = AtomicInteger() fun main() = runBlocking { massiveRun { counter.set(counter.get() + 1) } println(counter.get()) // ~430467 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

We often use atomics to secure a single primitive, but for more complicated cases we still need a better tool.

Single thread dispatcher

We've met a dispatcher with a single thread in the chapter Dispatchers. This is the easiest solution for most problems with shared states.

import kotlinx.coroutines.* import java.util.concurrent.Executors //sampleStart val dispatcher = Executors.newSingleThreadExecutor() .asCoroutineDispatcher() var counter = 0 fun main() = runBlocking { massiveRun { withContext(dispatcher) { counter++ } } println(counter) // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

In practice, this approach can be used in two ways.

The first approach is known as coarse-grained thread confinement. It is an easy approach, where we just wrap the whole function with withContext with a single-thread dispatcher. This solution is easy and leads to correct applications, but the problem is that we are losing multithreading capabilities on the whole functions. Let's take a look at the example below. api.fetchUser(id) could happen concurrently on many threads, but it will not because the whole body will be running on a single-thread dispatcher. It would make a bigger difference if it is blocking or CPU-intensive.

import kotlinx.coroutines.* import java.util.concurrent.Executors //sampleStart class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() private val dispatcher = Executors .newSingleThreadExecutor() .asCoroutineDispatcher() suspend fun downloaded(): List<User> = withContext(dispatcher) { users.toList() } suspend fun fetchUser(id: Int) = withContext(dispatcher) { val newUser = api.fetchUser(id) users += newUser } } //sampleEnd class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // ~1000000 }

The second approach is known as fine-grained thread confinement. In this approach we wrap (using withContext with a single-thread dispatcher) only those statements which modify state. In our example, those are all the lines where users is used. This approach is more demanding. In return, it offers us better performance if the functions we managed to not call on a single-thread dispatcher (like fetchUser in our example) are blocking or CPU-intensive. If they are just plan and simple suspending functions, the performance improvement is unlikely to be seen, or might be negative (because of the additional cost of thread-switching).

import kotlinx.coroutines.* import java.util.concurrent.Executors //sampleStart class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() private val dispatcher = Executors .newSingleThreadExecutor() .asCoroutineDispatcher() suspend fun downloaded(): List<User> = withContext(dispatcher) { users.toList() } suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) withContext(dispatcher) { users += newUser } } } //sampleEnd class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // ~1000000 }

Mutex

The last popular approach is to use a Mutex. You can imagine it like a room with a single key (or maybe a toilet at some cafeteria). Its most important function is lock. When a first coroutine calls it, it kind of takes the key and passes that function without suspension. If now another coroutine call lock, it will be suspended until the first one calls unlock (like a person waiting for a key to the toilet1). If another coroutine reaches lock function, it is suspended waiting in a queue, just after the second coroutine. When the first coroutine finally calls unlock function, it is giving back the key, so the second coroutine (the first one in the queue) is now resumed, and can finally pass the lock function. This way only one coroutine will be between lock and unlock.

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex //sampleStart suspend fun main() = coroutineScope { repeat(5) { launch { delayAndPrint() } } } val mutex = Mutex() suspend fun delayAndPrint() { mutex.lock() delay(1000) println("Done") mutex.unlock() } // (1 sec) // Done // (1 sec) // Done // (1 sec) // Done // (1 sec) // Done // (1 sec) // Done //sampleEnd

Using lock and unlock directly is risky, as any exception (or premature return) in between would lead to the key never being given back (unlock never been called), and as a result, no other coroutines would be able to pass through the lock. This is a serious problem known as a deadlock (imagine a toilet, that cannot be used, because someone was in hurry and forgave to give back the key). So instead we can use the withLock function, which starts with lock, but calls unlock on the finally block, so that any exceptions thrown inside the block will successfully release the lock. In use, it is similar to the synchronized block or a single-thread dispatcher.

import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock //sampleStart val mutex = Mutex() var counter = 0 fun main() = runBlocking { massiveRun { mutex.withLock { counter++ } } println(counter) // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

The important advantage of mutex comparing to synchronized block is that we are suspending a coroutine instead of blocking a thread. This is a safer and lighter approach. Comparing to using a dispatcher with a single thread, the mutex is lighter, and so it is preferred when we optimize for performance. On the other hand, it is also harder to use it properly. It has one important danger: a coroutine cannot pass the lock twice (maybe the key stays in the door, so another door requiring the same key would be impossible to pass). This is why the below code will never end (will be in a state known as a deadlock).

import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock //sampleStart suspend fun main() { val mutex = Mutex() println("Started") mutex.withLock { mutex.withLock { println("Will never be printed") } } } // Started // (runs forever) //sampleEnd

This is why we avoid using a mutex to wrap whole functions (coarse-grained approach). And if we do, we need to do it with great care, because if one such function calls another, a deadlock will happen.

class MongoUserRepository( //... ): UserRepository { private val mutex = Mutex() override suspend fun updateUser( userId: String, userUpdate: UserUpdate ): Unit = mutex.withLock { // Yes, update should happen on db, // not via multiple functions, // this is just an example. val currentUser = getUser(userId) // Deadlock! deleteUser(userId) // Deadlock! addUser(currentUser.updated(userUpdate)) // Deadlock! } override suspend fun getUser( userId: String ): User = mutex.withLock { // ... } override suspend fun deleteUser( userId: String ): Unit = mutex.withLock { // ... } override suspend fun addUser( user: User ): User = mutex.withLock { // ... } }

Fine-grained thread confinement (wrapping only the place where we modify the shared state) would help, but in the above example, I would prefer to use a single-thread dispatcher.

Summary

There are many ways how coroutines can be orchestrated to avoid conflicts when modifying a shared state. The most important solution is to modify a shared state in a single-thread dispatcher. This can be fine-grained thread confinement, that is wrapping only concrete places where synchronization is needed, or coarse-grained thread confinement, which is wrapping the whole functions. The second approach is easier, but might be slower. We might also use atomic values or a mutex. Both those approaches are better in terms of performance, but are also more demanding and need more carefulness.

1:

To not give a false impression of my country, asking for a key to the toilet is something I mainly experience outside of Poland. For instance, in Poland practically every petrol stations have a toilet available for everyone, no key required (and they are generally clean and good-looking). Although in many other European countries, those toilets are better protected from people, who might try to use them without buying anything.