article banner

Kotlin Flow to RxJava or Reactor and vice versa

Kotlin Flow is a powerful tool for handling asynchronous data streams, that quickly replaces RxJava and Reactor in many applications. However, there are still many applications that use RxJava or Reactor, and there are many legacy classes that use them in projects that migrate to Kotlin Coroutines. In such cases, it is often necessary to convert between Kotlin Flow and RxJava or Reactive Streams. Let's learn how to do that.

This article is a part of a series about interoperability between Kotlin Coroutines and other libraries. This series includes:

Converting RxJava or Reactor to Kotlin Flow

Streams from both RxJava and Reactor implement the Publisher interface, which is a part of the Reactive Streams specification. That is why we can have only one method to convert all those streams to Kotlin Flow, and it is called asFlow(). To use it, you need kotlinx-coroutines-reactive dependency in your project.

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$kotlinxCoroutinesVersion")

Then you can use asFlow() on any Publisher:

import kotlinx.coroutines.flow.asFlow fun userUpdates(publisher: Publisher<User>): Flow<User> = publisher.asFlow()

The same library also provides other useful extensions, like asPublisher() to convert a Flow to a Publisher. You can also await first or last value from a Publisher using awaitFirst() and awaitLast() suspending extensions.

val first = publisher.awaitFirst() val last = publisher.awaitLast()

Converting Kotlin Flow to RxJava or Reactor

To convert Kotlin Flow to RxJava or Reactor, you can use specific libraries that provide extensions for this purpose. For RxJava, you can use the kotlinx-coroutines-rx2 or kotlinx-coroutines-rx3 library, depending on the version of RxJava you are using. For Reactor, you can use the kotlinx-coroutines-reactor library.

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx2:$kotlinxCoroutinesVersion") // for RxJava 2 // or implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3:$kotlinxCoroutinesVersion") // for RxJava 3 // or implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinxCoroutinesVersion") // for Reactor

In all those cases you can convert a flow into the appropriate reactive stream using asFlowable(), asObservable() or asFlux().

val flow: Flow<User> = getUserUpdates() val flowable: Flowable<User> = flow.asFlowable() val observable: Observable<User> = flow.asObservable() val flux: Flux<User> = flow.asFlux()

A scheduler can also be tuned into a coroutine dispatcher using asCoroutineDispatcher. There is no simple way to convert a Flow to Single, Maybe or Completable, since Flow is rather designed to represent a stream of values (for those we should rather use suspending functions returning single value, nullable value/Result and unit respectively). However, if you need to make such conversions, it might be helpful to use methods that create RxJava or Reactor streams from coroutines.

RxJava or Reactor streams from coroutines

This is not the end, as the libraries mentioned above also provide ways to create RxJava or Reactor streams by starting coroutines that send values to them. This is useful when you want to create a stream of data that is produced by coroutines.

To create Publisher use publish from kotlinx-coroutines-reactive, which allows you to send values using send:

fun userUpdates(): Publisher<User> = publish { val page = 0 while (true) { val users = getUsers(page) if (users.isEmpty()) break for (user in users) { send(user) } } }

To create Flux, Observable or Flowable in RxJava, you can use flux, rxObservable or rxFlowable from kotlinx-coroutines-reactor , kotlinx-coroutines-rx2 or kotlinx-coroutines-rx3, where you can send elements using send:

fun userUpdates(): Flowable<User> = rxFlowable { val page = 0 while (true) { val users = getUsers(page) if (users.isEmpty()) break for (user in users) { send(user) } } }

To create Mono, Single or Maybe, use mono (from kotlinx-coroutines-reactor), rxSingle or rxMaybe (from kotlinx-coroutines-rx2/kotlinx-coroutines-rx2), which allows you to return a single value:

fun userUpdate(userId: String): Mono<User> = mono { getUserById(userId) }

You can also create Completable in RxJava using rxCompletable from kotlinx-coroutines-rx2 or kotlinx-coroutines-rx3, which allows you to perform an action without returning a value:

fun updateUser(user: User): Completable = rxCompletable { updateUserInDatabase(user) }

Conclusion

Kotlin Coroutines provide a powerful way to work with asynchronous data streams, and converting between Kotlin Flow and RxJava or Reactor streams is straightforward with the right libraries. Whether you are migrating from RxJava or Reactor to Kotlin Flow or need to integrate with existing code, these conversions allow you to leverage the strengths of both paradigms effectively. By using the provided extensions and coroutine builders, you can create reactive streams that fit your application's needs.