article banner

Suspending functions or flows into callbacks

Many legacy APIs use callbacks to report results of asynchronous operations. In Kotlin, we can use suspending functions or flows to achieve the same effect in a more readable way. However, sometimes we need to convert these suspending functions or flows into callback-based APIs, especially when integrating with existing libraries or frameworks that expect callbacks. In this article, we will explore how to do that.

This article is a part of a series about interoperability between Kotlin Coroutines and other libraries. This series includes:

Converting a suspending function to a callback

Suspending functions must be started in a coroutine. The simplest way to start a coroutine is to use launch on a coroutine scope. Once this suspending function is completed with a result, we can call a callback with this result. If the suspending function throws an exception, we can call a callback with the exception instead. To allow the process started by the suspending function to be cancelled, we can return Job from the launch call, which can be used to cancel the coroutine. This is the most basic way to convert a suspending function to a callback-based API.

suspend fun fetchData(): String { // ... } val scope = CoroutineScope(SupervisorJob()) fun fetchData( onSuccess: (String) -> Unit, onError: (Throwable) -> Unit, ): Job = scope.launch { try { val result = fetchData() onSuccess(result) } catch (e: Throwable) { onError(e) } }

In real-life applications scope is usually provided by a dependency injection framework. We also often wrap our Job with some interface that exposes cancellation method. We might also define some wrapper function to simply transform a suspending function into a callback-based API.

class AnkiConnectorCallback( private val connector: AnkiConnector, private val scope: CoroutineScope, ) { fun checkConnection( onSuccess: (Boolean) -> Unit, onError: (Throwable) -> Unit, ): Cancellable = scope.asyncWithCallback(onSuccess, onError) { connector.checkConnection() } fun pushDeck( deckName: String, markdown: String, onSuccess: (AnkiConnectorResult) -> Unit, onError: (Throwable) -> Unit, ): Cancellable = scope.asyncWithCallback(onSuccess, onError) { connector.pushDeck(deckName, markdown) } // ... } fun <T> CoroutineScope.asyncWithCallback( onSuccess: (T) -> Unit, onError: (Throwable) -> Unit, body: suspend () -> T ): Cancellable { val job = launch { try { val result = body() onSuccess(result) } catch (t: Throwable) { onError(t) } } return Cancellable(job) } class Cancellable(private val job: Job) { fun cancel() { job.cancel() } }

Converting a flow to a callback function

We observe a flow using collect method, which is a suspending function, so it cannot be used in non-suspending code. To convert a flow to a callback-based API, we can the following wrapper function:

/* * This function allows you to subscribe to a Flow with a CoroutineScope. * * Flow gets completed when the coroutine scope is cancelled or when the Flow completes. * Flow completes when it has an exception. * * @param scope The CoroutineScope in which the Flow will be collected. * @param onEach A lambda function that will be called for each emitted item from the Flow. * @param onError A lambda function that will be called if an error occurs during the collection of the Flow. * @param onStart A lambda function that will be called when the Flow starts collecting. * @param onCompletion A lambda function that will be called when the Flow completes (either successfully or with an error). */ fun <T> Flow<T>.subscribe( scope: CoroutineScope, onEach: ((T) -> Unit)? = null, onError: ((Throwable) -> Unit)? = null, onStart: (() -> Unit)? = null, onCompletion: (() -> Unit)? = null, ): Job { return this .let { flow -> onEach?.let { flow.onEach { onEach(it) } } ?: flow } .let { flow -> onStart?.let { flow.onStart { onStart() } } ?: flow } .let { flow -> onCompletion?.let { flow.onCompletion { onCompletion() } } ?: flow } .let { flow -> onError?.let { flow.catch { onError(it) } } ?: flow } .launchIn(scope) }

You can also define a wrapper class for a flow, to simplify the usage of this function.

class FlowCallback<T>( private val flow: Flow<T>, private val scope: CoroutineScope, ) { fun subscribe( scope: CoroutineScope, onEach: ((T) -> Unit)? = null, onError: ((Throwable) -> Unit)? = null, onStart: (() -> Unit)? = null, onCompletion: (() -> Unit)? = null, ): Job { return flow .let { flow -> onEach?.let { flow.onEach { onEach(it) } } ?: flow } .let { flow -> onStart?.let { flow.onStart { onStart() } } ?: flow } .let { flow -> onCompletion?.let { flow.onCompletion { onCompletion() } } ?: flow } .let { flow -> onError?.let { flow.catch { onError(it) } } ?: flow } .launchIn(scope) } }

Conclusion

In this article, we explored how to convert suspending functions and flows into callback-based APIs. This is useful when integrating with legacy code or libraries that expect callbacks. We also discussed how to handle errors and cancellation in these conversions. By using the provided examples, you can easily adapt your suspending functions and flows to work with callback-based APIs in Kotlin.