article banner

Using coroutines from other languages

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Different languages have different approaches to concurrency. In Java, it is common to start and block threads, but this practically never happens in JavaScript, where async processes are represented by promises and async functions. This means we need to learn how to define an API in Kotlin for developers from different platforms, but we also need to learn some of the limitations of certain platforms. In this chapter, we will learn which Kotlin features are platform-specific and how to adjust Kotlin coroutines for platform specificity if you want to expose an API to other popular languages.

Threads on different platforms

JavaScript operates on a single thread1, so you can't force a thread to sleep. This is why there is no runBlocking function in Kotlin/JS. If you want to run your Kotlin code on the JS platform, you must forget about blocking threads. You can still use runTest for testing, which returns a promise in Kotlin/JS.

For the same reason, we cannot use Dispatchers.IO in Kotlin/JS because Dispatchers.IO is currently Kotlin/JVM-specific. However, I don't find this problematic: we should use Dispatchers.IO when we need to block threads, and that should only happen when we interoperate with JVM or native APIs.

Transforming suspending into non-suspending functions

We already know what to do when we need to transform blocking or callback functions into suspending functions. But what if we need to do the opposite and use our suspending functions from languages other than Kotlin? As we've seen in the Coroutines under the hood chapter, suspending functions need Continuation, which must be provided by Kotlin. We need to find a way to overcome this for other languages, and since Kotlin is a multiplatform language, these other languages might include all JVM languages (Java, Scala, Groovy, Clojure, ...), JavaScript languages (JavaScript, TypeScript), or native languages (Swift, C++, C, ...). Let's see the most important options that are available to us.

Let's say that you implement a multiplatform library using suspending functions, and as its API you expose the following façade2:

class AnkiConnector( // ... ) { suspend fun checkConnection(): Boolean = ... suspend fun getDeckNames(): List<String> = ... suspend fun pushDeck( deckName: String, markdown: String ): AnkiConnectorResult = ... suspend fun pullDeck( deckName: String, currentMarkdown: String ): AnkiConnectorResult = ... }

We might assume that we cannot call suspending functions from languages other than Kotlin, so if you want your library to support other languages, you need to specify an alternative.

Transforming suspend functions into blocking functions

On Kotlin/JVM and Kolin/Native, the easiest option is to transform our suspending functions into blocking functions using runBlocking. This isn't the best solution, but it is the easiest.

When you define a library, you can provide a wrapper class defining blocking functions for the classes you want to expose in your API.

class AnkiConnectorBlocking { private val connector = AnkiConnector(/*...*/) fun checkConnection(): Boolean = runBlocking { connector.checkConnection() } fun getDeckNames(): List<String> = runBlocking { connector.getDeckNames() } fun pushDeck( deckName: String, markdown: String ): AnkiConnectorResult = runBlocking { connector.pushDeck(deckName, markdown) } fun pullDeck( deckName: String, currentMarkdown: String ): AnkiConnectorResult = runBlocking { connector.pullDeck(deckName, currentMarkdown) } }

If you want to write classes that might be used by Kotlin and other languages, you might define your blocking variants next to suspend variants.

class AnkiConnector( // ... ) { suspend fun checkConnection(): Boolean = ... fun checkConnectionBlocking(): Boolean = runBlocking { connector.checkConnection() } // ... }

You can already find Kotlin plugins that generate such blocking variants implicitly for properly annotated functions. An example of such a plugin is kotlin-jvm-blocking-bridge.

class AnkiConnector( // ... ) { @JvmBlockingBridge suspend fun checkConnection(): Boolean = ... }
// Java
class JavaTest {
    public static void main(String[] args) {
        AnkiConnector connector = new AnkiConnector();
        boolean connection = connector.checkConnection();
        // ...
    }
}

Transforming suspend functions into callback functions

Another popular option is to transform suspending functions into callback functions. For that, we need to define a scope and use it to start coroutines for each function. In the example below, I call my callbacks with Result, which represents a success when the wrapper function has finished, or it represents a failure in the case of an exception. I also made this callback function return Cancellable, which can be used to cancel the execution of a particular function.

class AnkiConnectorCallback { private val connector = AnkiConnector(/*...*/) private val scope = CoroutineScope(SupervisorJob()) fun checkConnection( callback: (Result<Boolean>) -> Unit ): Cancellable = toCallback(callback) { connector.checkConnection() } fun getDeckNames( callback: (Result<List<String>>) -> Unit ): Cancellable = toCallback(callback) { connector.getDeckNames() } fun pushDeck( deckName: String, markdown: String, callback: (Result<AnkiConnectorResult>) -> Unit ): Cancellable = toCallback(callback) { connector.pushDeck(deckName, markdown) } fun pullDeck( deckName: String, currentMarkdown: String, callback: (Result<AnkiConnectorResult>) -> Unit ): Cancellable = toCallback(callback) { connector.pullDeck(deckName, currentMarkdown) } fun <T> toCallback( callback: (Result<T>) -> Unit, body: suspend () -> T ): Cancellable { val job = scope.launch { try { val result = body() callback(Result.success(result)) } catch (t: Throwable) { callback(Result.failure(t)) } } return Cancellable(job) } class Cancellable(private val job: Job) { fun cancel() { job.cancel() } } }

Platform-specific options

Many platforms have their own ways of representing references to asynchronous tasks. In JavaScript, for instance, this is a Promise. To start a suspending function and expose its return value as a Promise which can be used in JavaScript to await this task, we can use the promise coroutine builder. Of course, this only works in Kotlin/JS.

@JsExport @JsName("AnkiConnector") class AnkiConnectorJs { private val connector = AnkiConnector(/*...*/) private val scope = CoroutineScope(SupervisorJob()) fun checkConnection(): Promise<Boolean> = scope.promise { connector.checkConnection() } fun getDeckNames(): Promise<Array<String>> = scope.promise { connector.getDeckNames().toTypedArray() } fun pushDeck( deckName: String, markdown: String ): Promise<AnkiConnectorResult> = scope.promise { connector.pushDeck(deckName, markdown) } fun pullDeck( deckName: String, currentMarkdown: String ): Promise<AnkiConnectorResult> = scope.promise { connector.pullDeck(deckName, currentMarkdown) } }

In Kotlin/JVM, we might want to return a CompletableFuture from Java stdlib. For that, we should use the future coroutine builder.

class AnkiConnectorFuture { private val connector = AnkiConnector(/*...*/) private val scope = CoroutineScope(SupervisorJob()) fun checkConnection(): CompletableFuture<Boolean> = scope.future { connector.checkConnection() } fun getDeckNames(): CompletableFuture<List<String>> = scope.future { connector.getDeckNames() } fun pushDeck( deckName: String, markdown: String ): CompletableFuture<AnkiConnectorResult> = scope.future { connector.pushDeck(deckName, markdown) } fun pullDeck( deckName: String, currentMarkdown: String ): CompletableFuture<AnkiConnectorResult> = scope.future { connector.pullDeck(deckName, currentMarkdown) } }

We might also transform our functions into objects from external libraries, like RxJava, which is very popular on the JVM. To transform suspending functions into RxJava objects, like Single or Observable, Kotlin provides a set of dependencies. For instance, to transform a suspending function into a Single from RxJava 3.x, we can use the kotlinx-coroutines-rx3 dependency and the rxSingle function.

class AnkiConnectorBlocking { private val connector = AnkiConnector(/*...*/) fun checkConnection(): Single<Boolean> = rxSingle { connector.checkConnection() } fun getDeckNames(): Single<List<String>> = rxSingle { connector.getDeckNames() } fun pushDeck( deckName: String, markdown: String ): Single<AnkiConnectorResult> = rxSingle { connector.pushDeck(deckName, markdown) } fun pullDeck( deckName: String, currentMarkdown: String ): Single<AnkiConnectorResult> = rxSingle { connector.pullDeck(deckName, currentMarkdown) } }

Calling suspending functions from other languages

In some of the projects I've worked on, previous developers decided to write tests using the Spock framework and Groovy, but this became problematic when we started using Kotlin Coroutines in those projects. We could not avoid calling suspending functions in unit tests because those were often the functions we were testing. It was also too hard to migrate all those tests from Groovy to Kotlin.

The easiest solution to this problem is to use runBlocking to construct a continuation in the other language, such as Java. This can be done in the following way.

// Java
public class MainJava {
   public static void main(String[] args) {
       AnkiConnector connector = new AnkiConnector();
       boolean connected;
       try {
           connected = BuildersKt.runBlocking(
                   EmptyCoroutineContext.INSTANCE,
                   (s, c) -> connector.checkConnection(c)
           );
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }
       // ...
   }
}

Three things make this function inconvenient to use:

  • BuildersKt is maybe not the best name.
  • We need to specify some context, even though EmptyCoroutineContext suffices in our case.
  • We always need to deal with the InterruptedException that runBlocking throws if the blocked thread is interrupted.

To help us improve this code, we can implement a wrapper function.

// Java
class SuspendUtils {
   public static <T> T runBlocking(
       Function<Continuation<? super T>, T> func
   ) {
       try {
           return BuildersKt.runBlocking(
               EmptyCoroutineContext.INSTANCE,
               (s, c) -> func.apply(c)
           );
       } catch (InterruptedException e) {
           throw new RuntimeException(e);
       }
   }
}

public class MainJava {
   public static void main(String[] args) {
       AnkiConnector connector = new AnkiConnector();
       boolean connected = SuspendUtils.runBlocking((c) ->
           connector.checkConnection(c)
       );
       // ...
   }
}

Flow and Reactive Streams

We've already discussed this topic in the Flow building chapter, but you can use simple conversion functions if you need to transform between Flow and libraries representing reactive streams (like Reactor, RxJava 2.x. or RxJava 3.x).

All objects like Flux, Flowable, or Observable implement the Publisher interface from the Java standard library, which can be converted to Flow with the asFlow function from the kotlinx-coroutines-reactive library.

suspend fun main() = coroutineScope { Flux.range(1, 5).asFlow() .collect { print(it) } // 12345 Flowable.range(1, 5).asFlow() .collect { print(it) } // 12345 Observable.range(1, 5).asFlow() .collect { print(it) } // 12345 }

To convert the other way around, you need more specific libraries. With kotlinx-coroutines-reactor, you can convert Flow to Flux. With kotlinx-coroutines-rx3 (or kotlinx-coroutines-rx2), you can convert Flow to Flowable or Observable.

suspend fun main(): Unit = coroutineScope { val flow = flowOf(1, 2, 3, 4, 5) flow.asFlux() .doOnNext { print(it) } // 12345 .subscribe() flow.asFlowable() .subscribe { print(it) } // 12345 flow.asObservable() .subscribe { print(it) } // 12345 }

You can also define simple functions to observe a flow on the JVM or to block the current thread until a flow is completed.

// Kotlin object FlowUtils { private val scope = CoroutineScope(SupervisorJob()) @JvmStatic @JvmOverloads fun <T> observe( flow: Flow<T>, onNext: OnNext<T>? = null, onError: OnError? = null, onCompletion: OnCompletion? = null, ) { scope.launch { flow.onCompletion { onCompletion?.handle() } .onEach { onNext?.handle(it) } .catch { onError?.handle(it) } .collect() } } @JvmStatic @JvmOverloads fun <T> observeBlocking( flow: Flow<T>, onNext: OnNext<T>? = null, onError: OnError? = null, onCompletion: OnCompletion? = null, ) = runBlocking { flow.onCompletion { onCompletion?.handle() } .onEach { onNext?.handle(it) } .catch { onError?.handle(it) } .collect() } fun interface OnNext<T> { fun handle(value: T) } fun interface OnError { fun handle(value: Throwable) } fun interface OnCompletion { fun handle() } } class FlowTest { fun test(): Flow<String> = flow { emit("A") delay(1000) emit("B") } }
// Java
public class Main {
    public static void main(String[] args) {
        FlowTest obj = new FlowTest();
        FlowUtils.observeBlocking(
                obj.test(),
                System.out::println
        );
    }
}
// A
// (1 sec)
// B

I don't see much utility in converting Flow into Promise or CompletableFuture. Flow can represent many values, but Promise or CompletableFuture can represent only one.

Summary

In this section, we've learned about some limitations of coroutines on different platforms. We've also learned how to transform Kotlin Coroutine concepts so they can be used in other languages. The two most basic techniques are transforming suspending functions into either blocking or callback functions. A flow can be transformed into an object from a library like RxJava or Reactor, or it can be wrapped with a class that can be used in other languages. However, there are also many platform-specific options. I hope you will find this chapter useful when you need to use Kotlin Coroutines from other languages.

1:

This statement is not completely true because multiple threads can be used on npm, and there is the Web Workers API for browsers to start processes on different threads; however, JavaScript can generally be considered single-threaded.

2:

This is a simplified facade from my AnkiMarkdown project.