For arbitrary promise implementation, the deferred pattern (not to be confused with antipattern) may may look like:
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
deferred object holds unsettled promise that can be passed to other function scopes by reference. All promise chains will be executed on promise settlement, it doesn't matter if deferred.promise was settled before chaining with then or after. The state of promise cannot be changed after it was settled.
As the answer suggests, the initial choices are ReplaySubject and AsyncSubject.
For the given setup (a demo)
var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
console.log.bind(console, 'Early result'),
console.log.bind(console, 'Early error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Late result'),
console.log.bind(console, 'Late error')
);
});
This results in desirable behaviour:
subject.error('one');
subject.next('two');
Early error one
Late error one
This results in undesirable behaviour:
subject.error('one');
subject.next('two');
subject.complete();
Early error one
Late result two
This results in undesirable behaviour:
subject.next('two');
subject.complete();
subject.next('three');
Early result two
Late result three
The results from ReplaySubject differ but are still inconsistent with expected results. next values and error errors are treated separately, and complete doesn't prevent the observers from receiving new data. This may work for single next/error, the problem is that next or error may be called multiple times unintentionally.
The reason why first() is used is because subscribes are one-time subscriptions, and I would like to remove them to avoid leaks.
How should it be implemented with RxJS observables?