Suspending functions or flows into callbacks
Many legacy APIs use callbacks to report results of asynchronous operations. In Kotlin, we can use suspending functions or flows to achieve the same effect in a more readable way. However, sometimes we need to convert these suspending functions or flows into callback-based APIs, especially when integrating with existing libraries or frameworks that expect callbacks. In this article, we will explore how to do that.
This article is a part of a series about interoperability between Kotlin Coroutines and other libraries. This series includes:
Suspending functions must be started in a coroutine. The simplest way to start a coroutine is to use launch
on a coroutine scope. Once this suspending function is completed with a result, we can call a callback with this result. If the suspending function throws an exception, we can call a callback with the exception instead. To allow the process started by the suspending function to be cancelled, we can return Job
from the launch
call, which can be used to cancel the coroutine. This is the most basic way to convert a suspending function to a callback-based API.
suspend fun fetchData(): String {
// ...
}
val scope = CoroutineScope(SupervisorJob())
fun fetchData(
onSuccess: (String) -> Unit,
onError: (Throwable) -> Unit,
): Job = scope.launch {
try {
val result = fetchData()
onSuccess(result)
} catch (e: Throwable) {
onError(e)
}
}
In real-life applications scope is usually provided by a dependency injection framework. We also often wrap our Job
with some interface that exposes cancellation method. We might also define some wrapper function to simply transform a suspending function into a callback-based API.
class AnkiConnectorCallback(
private val connector: AnkiConnector,
private val scope: CoroutineScope,
) {
fun checkConnection(
onSuccess: (Boolean) -> Unit,
onError: (Throwable) -> Unit,
): Cancellable = scope.asyncWithCallback(onSuccess, onError) {
connector.checkConnection()
}
fun pushDeck(
deckName: String,
markdown: String,
onSuccess: (AnkiConnectorResult) -> Unit,
onError: (Throwable) -> Unit,
): Cancellable = scope.asyncWithCallback(onSuccess, onError) {
connector.pushDeck(deckName, markdown)
}
// ...
}
fun <T> CoroutineScope.asyncWithCallback(
onSuccess: (T) -> Unit,
onError: (Throwable) -> Unit,
body: suspend () -> T
): Cancellable {
val job = launch {
try {
val result = body()
onSuccess(result)
} catch (t: Throwable) {
onError(t)
}
}
return Cancellable(job)
}
class Cancellable(private val job: Job) {
fun cancel() {
job.cancel()
}
}
We observe a flow using collect
method, which is a suspending function, so it cannot be used in non-suspending code. To convert a flow to a callback-based API, we can the following wrapper function:
/*
* This function allows you to subscribe to a Flow with a CoroutineScope.
*
* Flow gets completed when the coroutine scope is cancelled or when the Flow completes.
* Flow completes when it has an exception.
*
* @param scope The CoroutineScope in which the Flow will be collected.
* @param onEach A lambda function that will be called for each emitted item from the Flow.
* @param onError A lambda function that will be called if an error occurs during the collection of the Flow.
* @param onStart A lambda function that will be called when the Flow starts collecting.
* @param onCompletion A lambda function that will be called when the Flow completes (either successfully or with an error).
*/
fun <T> Flow<T>.subscribe(
scope: CoroutineScope,
onEach: ((T) -> Unit)? = null,
onError: ((Throwable) -> Unit)? = null,
onStart: (() -> Unit)? = null,
onCompletion: (() -> Unit)? = null,
): Job {
return this
.let { flow -> onEach?.let { flow.onEach { onEach(it) } } ?: flow }
.let { flow -> onStart?.let { flow.onStart { onStart() } } ?: flow }
.let { flow -> onCompletion?.let { flow.onCompletion { onCompletion() } } ?: flow }
.let { flow -> onError?.let { flow.catch { onError(it) } } ?: flow }
.launchIn(scope)
}
You can also define a wrapper class for a flow, to simplify the usage of this function.
class FlowCallback<T>(
private val flow: Flow<T>,
private val scope: CoroutineScope,
) {
fun subscribe(
scope: CoroutineScope,
onEach: ((T) -> Unit)? = null,
onError: ((Throwable) -> Unit)? = null,
onStart: (() -> Unit)? = null,
onCompletion: (() -> Unit)? = null,
): Job {
return flow
.let { flow -> onEach?.let { flow.onEach { onEach(it) } } ?: flow }
.let { flow -> onStart?.let { flow.onStart { onStart() } } ?: flow }
.let { flow -> onCompletion?.let { flow.onCompletion { onCompletion() } } ?: flow }
.let { flow -> onError?.let { flow.catch { onError(it) } } ?: flow }
.launchIn(scope)
}
}
In this article, we explored how to convert suspending functions and flows into callback-based APIs. This is useful when integrating with legacy code or libraries that expect callbacks. We also discussed how to handle errors and cancellation in these conversions. By using the provided examples, you can easily adapt your suspending functions and flows to work with callback-based APIs in Kotlin.

Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.