Kotlin Flow to RxJava or Reactor and vice versa
Kotlin Flow is a powerful tool for handling asynchronous data streams, that quickly replaces RxJava and Reactor in many applications. However, there are still many applications that use RxJava or Reactor, and there are many legacy classes that use them in projects that migrate to Kotlin Coroutines. In such cases, it is often necessary to convert between Kotlin Flow and RxJava or Reactive Streams. Let's learn how to do that.
This article is a part of a series about interoperability between Kotlin Coroutines and other libraries. This series includes:
Converting RxJava or Reactor to Kotlin Flow
Streams from both RxJava and Reactor implement the Publisher
interface, which is a part of the Reactive Streams specification. That is why we can have only one method to convert all those streams to Kotlin Flow, and it is called asFlow()
. To use it, you need kotlinx-coroutines-reactive
dependency in your project.
Then you can use asFlow()
on any Publisher
:
The same library also provides other useful extensions, like asPublisher()
to convert a Flow to a Publisher. You can also await first or last value from a Publisher using awaitFirst()
and awaitLast()
suspending extensions.
Converting Kotlin Flow to RxJava or Reactor
To convert Kotlin Flow to RxJava or Reactor, you can use specific libraries that provide extensions for this purpose. For RxJava, you can use the kotlinx-coroutines-rx2
or kotlinx-coroutines-rx3
library, depending on the version of RxJava you are using. For Reactor, you can use the kotlinx-coroutines-reactor
library.
In all those cases you can convert a flow into the appropriate reactive stream using asFlowable()
, asObservable()
or asFlux()
.
A scheduler can also be tuned into a coroutine dispatcher using asCoroutineDispatcher
. There is no simple way to convert a Flow to Single
, Maybe
or Completable
, since Flow is rather designed to represent a stream of values (for those we should rather use suspending functions returning single value, nullable value/Result
and unit respectively). However, if you need to make such conversions, it might be helpful to use methods that create RxJava or Reactor streams from coroutines.
RxJava or Reactor streams from coroutines
This is not the end, as the libraries mentioned above also provide ways to create RxJava or Reactor streams by starting coroutines that send values to them. This is useful when you want to create a stream of data that is produced by coroutines.
To create Publisher
use publish
from kotlinx-coroutines-reactive
, which allows you to send values using send
:
To create Flux
, Observable
or Flowable
in RxJava, you can use flux
, rxObservable
or rxFlowable
from kotlinx-coroutines-reactor
, kotlinx-coroutines-rx2
or kotlinx-coroutines-rx3
, where you can send elements using send
:
To create Mono
, Single
or Maybe
, use mono
(from kotlinx-coroutines-reactor
), rxSingle
or rxMaybe
(from kotlinx-coroutines-rx2
/kotlinx-coroutines-rx2
), which allows you to return a single value:
You can also create Completable
in RxJava using rxCompletable
from kotlinx-coroutines-rx2
or kotlinx-coroutines-rx3
, which allows you to perform an action without returning a value:
Conclusion
Kotlin Coroutines provide a powerful way to work with asynchronous data streams, and converting between Kotlin Flow and RxJava or Reactor streams is straightforward with the right libraries. Whether you are migrating from RxJava or Reactor to Kotlin Flow or need to integrate with existing code, these conversions allow you to leverage the strengths of both paradigms effectively. By using the provided extensions and coroutine builders, you can create reactive streams that fit your application's needs.