Combining Streams using Reactor 3 + Kotlin
I spent a day migrating a service that does a lot of aggregation from RxJava 1 to Reactor 3. The migration went pretty smooth, with the Reactor API being more elegant compared to RxJava in most cases.
However, I struggled migrating the large zip()
operations, where we merge async data from various sources into one big Observable. RxJava has zip
that works like this:
While I think from a maintenance perspective, this is far from ideal, as a user, the resulting code becomes quite readable:
In Reactor, the API unfortunately isn’t as user-friendly. In the combinator function, you lose all type information which results in a long, ugly Function where we’re forced to do manual casting.
There is also the when
approach, which returns a Tuple
that you can then transform. This approach is slightly more maintainable as it does not require manual casting, but is not that readable:
In Kotlin, we can improve this by using the Tuple Extension Functions for Kotlin provided by Reactor. These Kotlin extension functions provide component()
methods to the Reactor Tuples which allow us to use Kotlin’s Destructuring Declarations syntax. We need to manually import these extension functions, in my experience IDEA does not do it for you.