Wyko Rijnsburger     About     Feed

Combining Streams using Reactor 3 + Kotlin

I spent a day migrating a service that does a lot of aggregation from RxJava 1 to Reactor 3. The migration went pretty smooth, with the Reactor API being more elegant compared to RxJava in most cases.

However, I struggled migrating the large zip() operations, where we merge async data from various sources into one big Observable. RxJava has zip that works like this:

public static <T1,T2,T3,T4,T5,R> Single<R> zip(Single<? extends T1> o1,
                                   Single<? extends T2> o2,
                                   Single<? extends T3> o3,
                                   Single<? extends T4> o4,
                                   Single<? extends T5> o5,
                                   Func5<? super T1,? super T2,? super T3,? super T4,? super T5,? extends R> zipFunction)

While I think from a maintenance perspective, this is far from ideal, as a user, the resulting code becomes quite readable:

Single.zip(singleA, singleB, singleC, singleD, singleE, 
           (a, b, c, d, e) -> new Aggregate(a, b, c, d, e))

In Reactor, the API unfortunately isn’t as user-friendly. In the combinator function, you lose all type information which results in a long, ugly Function where we’re forced to do manual casting.

Mono.zip(array -> {
    A a = (A) array[0];
    B b = (B) array[1];
    C c = (C) array[2];
    D d = (D) array[3];
    E e = (E) array[4];
    
    return new Aggregate(a, b, c, d, e)
}, monoA, monoB, monoC, monoD, monoE)

There is also the when approach, which returns a Tuple that you can then transform. This approach is slightly more maintainable as it does not require manual casting, but is not that readable:

Mono.when(monoA, monoB, monoC, monoD, monoE)
    .map(tuple -> new Aggregate(tuple.t1(), tuple.t2(), tuple.t3(), tuple.t4(), tuple.t5()))

In Kotlin, we can improve this by using the Tuple Extension Functions for Kotlin provided by Reactor. These Kotlin extension functions provide component() methods to the Reactor Tuples which allow us to use Kotlin’s Destructuring Declarations syntax. We need to manually import these extension functions, in my experience IDEA does not do it for you.

import reactor.util.function.component1
import reactor.util.function.component2
import reactor.util.function.component3
import reactor.util.function.component4
import reactor.util.function.component5

Mono.when(monoA, monoB, monoC, monoD, monoE)
    .map {(a, b, c, d, e) -> Aggregate(a, b, c, d, e)}