article banner

Coroutines answer to the problem with the mutable state

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Before we start, take a look at the UserDownloader class below. It allows us to fetch a user by id or to get all the users that were downloaded before. What is wrong with this implementation?

class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() fun downloaded(): List<User> = users.toList() suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) users.add(newUser) } }

Notice the use of the defensive copy toList. This is done to avoid a conflict between reading the object returned by downloaded and adding an element to the mutable list. We could also represent users using the read-only list (List<User>) and the read-write property (var). Then, we would not need to make a defensive copy, and downloaded would not need to be protected at all, but we would decrease the performance of adding elements to the collection. I personally prefer the second approach, but I decided to show the one using a mutable collection as I see it more often in real-life projects.

The above implementation is not prepared for concurrent use. Each fetchUser call modifies users. This is fine as long as this function is not started on more than one thread at the same time. Since it can be started on more than one thread at the same time, we say users is a shared state, therefore it needs to be secured. This is because concurrent modifications can lead to conflicts. This problem is presented below:

import kotlinx.coroutines.* class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() fun downloaded(): List<User> = users.toList() suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) users += newUser } } class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // ~998242 }

Because there are multiple threads interacting with the same instance, the above code will print a number smaller than 1,000,000 (like 998,242 for example), or it might throw an exception.

Exception in thread "main"
java.lang.ArrayIndexOutOfBoundsException: 22
    at java.util.ArrayList.add(ArrayList.java:463)
    ...

This is a typical problem with shared state modifications. To see it more clearly, I will present a simpler example: multiple threads incrementing an integer. I am using massiveRun to call an operation 1,000 times on 1,000 coroutines using Dispatchers.Default. After these operations, the number should be 1,000,000 (1,000 * 1,000). However, without any synchronization, the real result will be smaller because of the conflicts.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext //sampleStart var counter = 0 fun main() = runBlocking { massiveRun { counter++ } println(counter) // ~567231 } suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } } //sampleEnd

To understand why the result is not 1,000,000, imagine a scenario in which two threads try to increment the same number at the same time. Let's say that the initial value is 0. The first thread takes the current value 0, and then the processor decides to switch to the second thread. The second thread takes 0 as well, increments it to 1, and stores it in the variable. We switch to the first thread where it has finished: it has 0, so it increments it to 1 and stores it. As a result, the variable is 1, but it should be 2. This is how some operations are lost.

The consequences of conflicts can be more severe than just loosing some operations. They can lead to data corruption, exceptions, or even deadlocks. That is why we need to secure access to shared state.

Blocking synchronization

The problem above can be solved using classic tools we know from Java, like synchronized block or concurrent collections (like ConcurrentHashMap)2.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext //sampleStart var counter = 0 fun main() = runBlocking { val lock = Any() massiveRun { synchronized(lock) { // We are blocking threads! counter++ } } println("Counter = $counter") // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

synchronized block is a classic tool to secure shared state. It is based on a lock. When a thread enters the block, it takes the lock and can execute the code inside. If another thread tries to enter the block, it is suspended until the first thread leaves the block. This is a simple and effective solution, but it has a few disadvantages. The biggest one is that it blocks threads, so if our code is executed on Dispatchers.Main or Dispatchers.Default, we might block threads that should not be blocked. The second disadvantage is that inside the synchronized block, we cannot use suspending functions. It is good we cannot do that, because that would mean blocking other processes waiting for the lock, but it effectively means that synchronized block must be used fine-grained (precisely around shared state modifications), which is harder to implement correctly. Finally, synchronized block introduces costs related to context switching and thread blocking. Most of the tools designed for coroutines can be more efficient if used properly.

Atomics

There is another Java solution that can help us in some simple cases. Java has a set of atomic values. All their operations are fast and guaranteed to be "thread-safe". They are called atomic. Their operations are implemented at a low level without locks, so this solution is efficient and appropriate for us. There are different atomic values we can use. For our case, we can use AtomicInteger.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.atomic.AtomicInteger //sampleStart private var counter = AtomicInteger() fun main() = runBlocking { massiveRun { counter.incrementAndGet() } println(counter.get()) // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

It works perfectly here, but the utility of atomic values is generally very limited, therefore, we need to be careful: just knowing a single operation will be atomic does not help us when we have a bundle of operations.

import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.atomic.AtomicInteger //sampleStart private var counter = AtomicInteger() fun main() = runBlocking { massiveRun { counter.set(counter.get() + 1) } println(counter.get()) // ~430467 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

To secure our UserDownloader, we could use the AtomicReference wrapping around the read-only list of users. We can use the getAndUpdate atomic function to update its value without conflicts. It can be used both to correct the above implementation and to secure the counter variable, and to properly implement the UserDownloader class.

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.util.concurrent.atomic.AtomicReference //sampleStart class UserDownloader( private val api: NetworkService ) { private val users = AtomicReference(listOf<User>()) fun downloaded(): List<User> = users.get() suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) users.getAndUpdate { it + newUser } } } //sampleEnd class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // 1000000 }

We often use atomics to secure a single primitive or a single reference, but for more complicated cases we still need better tools.

A dispatcher limited to a single thread

We saw a dispatcher with parallelism limited to a single thread in the Dispatchers chapter. This is the easiest solution for most problems with shared states.

import kotlinx.coroutines.* import java.util.concurrent.Executors //sampleStart val dispatcher = Dispatchers.IO .limitedParallelism(1) var counter = 0 fun main() = runBlocking { massiveRun { withContext(dispatcher) { counter++ } } println(counter) // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

In practice, this approach can be used in two ways. The first approach is known as coarse-grained thread confinement. This is an easy approach whereby we just wrap the whole function with withContext, with a dispatcher limited to a single thread. This solution is simple, and it effectively eliminates conflicts. The only problem is that we lose the multithreading capabilities of the whole function. Let's take a look at the example below. api.fetchUser(id) could be started concurrently on many threads, but its body will be running on a dispatcher that is limited to a single thread. That is typically not a problem, this function execution is still concurrent: when one coroutine is suspended, another one can be executed. We just lose the ability to use multiple threads at the same time, so we might lose some performance, but that is not a problem in most classes that modify shared state.

import kotlinx.coroutines.* import java.util.concurrent.Executors //sampleStart class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() private val dispatcher = Dispatchers.IO .limitedParallelism(1) suspend fun downloaded(): List<User> = withContext(dispatcher) { users.toList() } suspend fun fetchUser(id: Int) = withContext(dispatcher) { val newUser = api.fetchUser(id) users += newUser } } //sampleEnd class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // ~1000000 }

The second approach is known as fine-grained thread confinement. In this approach, we wrap only those statements which modify the state. In our example, these are all the lines where users is used. This approach is more demanding, but it offers us better performance if the functions excluded from our critical section (like fetchUser in our example) are blocking or CPU-intensive. If they are just plain suspending functions, the performance improvement is unlikely to be seen.

import kotlinx.coroutines.* import java.util.concurrent.Executors //sampleStart class UserDownloader( private val api: NetworkService ) { private val users = mutableListOf<User>() private val dispatcher = Dispatchers.IO .limitedParallelism(1) suspend fun downloaded(): List<User> = withContext(dispatcher) { users.toList() } suspend fun fetchUser(id: Int) { val newUser = api.fetchUser(id) withContext(dispatcher) { users += newUser } } } //sampleEnd class User(val name: String) interface NetworkService { suspend fun fetchUser(id: Int): User } class FakeNetworkService : NetworkService { override suspend fun fetchUser(id: Int): User { delay(2) return User("User$id") } } suspend fun main() { val downloader = UserDownloader(FakeNetworkService()) coroutineScope { repeat(1_000_000) { launch { downloader.fetchUser(it) } } } print(downloader.downloaded().size) // ~1000000 }

In most cases, using a dispatcher with a single thread is not only easy, but also efficient, thanks to the fact that standard dispatchers share the same pool of threads

Mutex

The last option is using Mutex. It is a more advanced tool that must be used with care. Mutex is efficient and allows us to implement patterns that are hard or impossible to implement using other tools, but it is also easy to make mistakes when using it. That is why Mutex is primarily used in libraries, not in business logic.

You can imagine Mutex as a room with a single key (or maybe a toilet at a cafeteria). Its most important function is lock. When the first coroutine calls it, it takes the key and passes through lock without suspension. If another coroutine then calls lock, it will be suspended until the first coroutine calls unlock (like a person waiting for a key to the toilet1). If another coroutine reaches the lock function, it is suspended and put in a queue, just after the second coroutine. When the first coroutine finally calls the unlock function, it gives back the key, so the second coroutine (the first one in the queue) is now resumed and can finally pass through the lock function. This way only one coroutine is allowed between lock and unlock calls.

import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex //sampleStart suspend fun main() = coroutineScope { repeat(5) { launch { delayAndPrint() } } } val mutex = Mutex() suspend fun delayAndPrint() { mutex.lock() delay(1000) println("Done") mutex.unlock() } // (1 sec) // Done // (1 sec) // Done // (1 sec) // Done // (1 sec) // Done // (1 sec) // Done //sampleEnd

Using lock and unlock directly is risky, as any exception (or premature return) in between would lead to the key never being given back (unlock never been called), and as a result, no other coroutines would be able to pass through the lock. This is a serious problem known as a deadlock (imagine a toilet that cannot be used because someone was in a hurry and forgot to give back the key). So, instead we can use the withLock function, which starts with lock but calls unlock on the finally block so that any exceptions thrown inside the block will successfully release the lock. In use, it is similar to a synchronized block.

import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock //sampleStart val mutex = Mutex() var counter = 0 fun main() = runBlocking { massiveRun { mutex.withLock { counter++ } } println(counter) // 1000000 } //sampleEnd suspend fun massiveRun(action: suspend () -> Unit) = withContext(Dispatchers.Default) { repeat(1000) { launch { repeat(1000) { action() } } } }

The important advantage of mutex over a synchronized block is that we suspend a coroutine instead of blocking a thread. This is a safer and lighter approach. Compared to using a dispatcher with parallelism limited to a single thread, mutex is lighter, and in some cases it might offer better performance. On the other hand, it is also harder to use it properly. It has one important danger: a coroutine cannot get past the lock twice (maybe the key stays in the door, so another door requiring the same key would be impossible to get past). Execution of the code below will result in a program state called deadlock - it will be blocked forever. Such a situation is common when developers use mutex incorrectly.

import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock //sampleStart suspend fun main() { val mutex = Mutex() println("Started") mutex.withLock { mutex.withLock { println("Will never be printed") } } } // Started // (runs forever) //sampleEnd

The second problem with mutex is that it is not unlocked when a coroutine is suspended. Take a look at the code below. It takes over 5 seconds because mutex is still locked during delay. Mutex makes the section of code between lock and unlock synchronous, so it is not a good idea to use it with suspending functions, unless that is our goal—to synchronize the execution of coroutines.

import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlin.system.measureTimeMillis class MessagesRepository { private val messages = mutableListOf<String>() private val mutex = Mutex() suspend fun add(message: String) = mutex.withLock { delay(1000) // we simulate network call messages.add(message) } } suspend fun main() { val repo = MessagesRepository() val timeMillis = measureTimeMillis { coroutineScope { repeat(5) { launch { repo.add("Message$it") } } } } println(timeMillis) // ~5120 }

When we use a dispatcher that is limited to a single thread, we don’t have such a problem. When a delay or a network call suspends a coroutine, the thread can be used by other coroutines.

import kotlinx.coroutines.* import kotlin.system.measureTimeMillis class MessagesRepository { private val messages = mutableListOf<String>() private val dispatcher = Dispatchers.IO .limitedParallelism(1) suspend fun add(message: String) = withContext(dispatcher) { delay(1000) // we simulate network call messages.add(message) } } suspend fun main() { val repo = MessagesRepository() val timeMillis = measureTimeMillis { coroutineScope { repeat(5) { launch { repo.add("Message$it") } } } } println(timeMillis) // 1058 }

This is why we avoid using mutex to wrap whole functions (coarse-grained approach). If we decide to use it, we need to do so with great care to avoid locking twice and calling suspending functions.

class MongoUserRepository( //... ) : UserRepository { private val mutex = Mutex() override suspend fun updateUser( userId: String, userUpdate: UserUpdate ): Unit = mutex.withLock { val currentUser = getUser(userId) // Deadlock! deleteUser(userId) // Deadlock! addUser(currentUser.updated(userUpdate)) // Deadlock! } override suspend fun getUser( userId: String ): User = mutex.withLock { // ... } override suspend fun deleteUser( userId: String ): Unit = mutex.withLock { // ... } override suspend fun addUser( user: User ): User = mutex.withLock { // ... } }

Of course, in real-life applications we would prefer to make compound transformations on database level, using one request (not three), the above example is just to show the problem with mutex.

Fine-grained thread confinement (wrapping only the place where we modify the shared state) would help, but in the above example, I would prefer to use a dispatcher that is limited to a single thread.

Semaphore

If we mentioned Mutex, we should also mention Semaphore, which works in similar way, but can have more than one permit. Regarding Mutex, we speak of a single lock, so it has functions lock, unlock and withLock. Regarding Semaphore, we speak of permits, so it has functions acquire, release and withPermit.

import kotlinx.coroutines.* import kotlinx.coroutines.sync.* suspend fun main() = coroutineScope { val semaphore = Semaphore(2) repeat(5) { launch { semaphore.withPermit { delay(1000) print(it) } } } } // 01 // (1 sec) // 23 // (1 sec) // 4

Semaphore with more than one permit does not help us with the problem of shared state, but it can be used to limit the number of concurrent requests, so to implement rate limiting or throttling. For example, we can limit the number of concurrent requests to a network service to avoid overloading it.

class LimitedNetworkUserRepository( private val api: UserApi ) { // We limit to 10 concurrent requests private val semaphore = Semaphore(10) suspend fun requestUser(userId: String) = semaphore.withPermit { api.requestUser(userId) } }

Summary

There are many ways in which coroutines can be orchestrated to avoid conflicts when modifying a shared state. The most practical solution is to modify a shared state in a dispatcher that is limited to a single thread. This can be a fine-grained thread confinement that only wraps concrete places where synchronization is needed; alternatively, it can be a coarse-grained thread confinement that wraps the whole function. The second approach is easier, but it might be slower. We might also use atomic values, concurrent collections or other tools from Java stdlib. If we need more advanced synchronization, we can use Mutex or Semaphore.

1:

To avoid giving a false impression of my home country, asking for the key to a toilet is something I mainly experience outside of Poland. For instance, in Poland practically every petrol station has a toilet available for everyone, no key required (and they are generally clean and tidy). However, in many other European countries, especially in Switzerland, toilets are better protected from people who might try to use them without buying anything.

2:

I explain how to use those tools in Effective Kotlin, Item 2: Eliminate critical sections, but they are not the best choice for coroutines, because they are based on blocking threads. Let's see how we can use synchronized block to secure the counter variable.