I'm trying to extract some common logic, based on RxJava2, into reusable components. Let's imagine I have the following piece of code:
someSingle
    .doOnSuccess { // update UI based on side effect }
    .subscribeOn(...)
    .observeOn(...)
    .subscribe(
        value -> // update UI based on value
        throwable -> // handle error
    )
I want to wrap this into a reusable component, exposing a method that returns a Flowable of events. The clients will receive events and update the UI accordingly. My goal is not to have any reference of the view inside the reusable component. I want the method to be something like this:
fun reusableMethod(...) : Flowable<Event> { ... }
Event is a sealed class, enclosing two sub types - SideEffectEvent and ValueEvent.
What is the best way to transform the stream from the first snippet, so I can get both the side effect and the value to be emitted as flowable values?
Currently, I have the following solution, but I'm not very happy with it, because it looks a bit clunky and complex:
private val sideEffectEvents = PublishProcessor.create<SideEffectEvent>()
fun reusableMethod(...) = 
    Flowable.merge(
        someSingle.doOnSuccess { sideEffectEvents.onNext(SideEffectEvent()) },
        sideEffectEvents
    )
    .subscribeOn(...)
    .observeOn(...)
I have also considered some alternatives:
- Notify the client for SideEffectEvents using a callback that is passed tosomeReusableMethod()- looks very unnatural and having a callback and a stream to subscribe to is not a good code style
- Use a single PublishProcessor. Post side effects to it and use it to subscribe to the originalSingle. Expose acleanUp()method in the reusable component so the client can dispose of the stream when it decides to.
I'm looking forward to suggestions and ideas.
 
     
    