I have an IObservable<int> sequence that emits a single item the first 9 times it is subscribed, and on further subscriptions it emits nothing and completes immediately:
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
Now I want to repeat this sequence until it is completed. So I used the Repeat operator:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
The problem is that this query never completes. The Repeat keeps subscribing to the source sequence again and again for an eternity. Even worse, when the source has stopped producing elements, the query enters in a merciless tight loop of death that hijacks one core of the CPU (my quad-core machine reports continuous CPU utilization 25%). Here is the output of the above code:
1
2
3
4
5
6
7
8
9
What I want is a variant of the Repeat operator that stops repeating the source when the source has stopped producing elements. Searching through the built-in Rx operators I can see a RepeatWhen operator, but apparently this can be used only for starting faster the next repetition, not for stopping the repeating altogether:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
I am not 100% sure though, because the description of the handler parameter is quite obscure, so I might be missing something:
The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.
My question is: how can I implement a RepeatUntilEmpty operator that repeats the source sequence until it's empty? Is it possible to implement it based on the aforementioned RepeatWhen operator? If not, should I go low level (Observable.Create) and reimplement the basic Repeat functionality from scratch? Or can I use the Materialize operator to my advantage, combining it somehow with the existing Repeat? I am out of ideas at the moment. I am willing to accept any kind of solution, either high or low lever.
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
Replacing the Repeat with the RepeatUntilEmpty in my original code, should have the effect of making the query complete immediately after emitting the 9 element.