This is the solution we ended up with
public class OnErrorRetryCache<T> {
    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }
    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);
    private Observable<T> cache = null;
    private Observable<T> inProgress = null;
    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }
    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();
        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }
        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();
        return inProgress;
    }
    private void onSuccess() {
        cache = inProgress;
    }
    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}
We needed to cache the result of an HTTP request from Retrofit. So this was created, with an observable that emits a single item in mind. 
If an observer subscribed while the HTTP request was being executed, we wanted it to wait and not execute the request twice, unless the in-progress one failed. To do that the semaphore allows single access to the block that creates or returns the cached observable, and if a new observable is created, we wait until that one terminates.