The Rx library includes operators that accept lambda parameters, and some of these lambdas are provided with a CancellationToken that is controlled by the library itself. Some examples of these operators are the FromAsync, StartAsync and Create:
// Converts an asynchronous action into an observable sequence. Each subscription
// to the resulting sequence causes the action to be started. The CancellationToken
// passed to the asynchronous action is tied to the observable sequence's subscription
// that triggered the action's invocation and can be used for best-effort cancellation.
public static IObservable<Unit> FromAsync(Func<CancellationToken, Task> actionAsync);
I was under the impression that the Rx library does a good job at managing the lifecycle of the CancellationTokenSources that obviously has to create behind the scenes, but I am not so sure any more. Let's first state that the documentation insists strongly that the CancellationTokenSources should be disposed of:
This type implements the
IDisposableinterface. When you have finished using an instance of the type, you should dispose of it either directly or indirectly. To dispose of the type directly, call itsDisposemethod in atry/catchblock. To dispose of it indirectly, use a language construct such asusing(in C#) orUsing(in Visual Basic).
Also from here:
Always call
Disposebefore you release your last reference to theCancellationTokenSource. Otherwise, the resources it is using will not be freed until the garbage collector calls theCancellationTokenSourceobject'sFinalizemethod.
I made the experiment below to test my assumptions. It uses reflection to read the private fields _source and _disposed of the types CancellationToken and CancellationTokenSource respectively (.NET 5).
CancellationToken capturedToken = default;
var subscription = Observable.FromAsync(async token =>
{
    capturedToken = token;
    token.Register(() => Console.WriteLine("Token canceled"));
    await Task.Delay(Timeout.Infinite, token);
})
.TakeUntil(Observable.Timer(TimeSpan.FromMilliseconds(500)))
.Finally(() => Console.WriteLine("The observable was terminated"))
.Subscribe();
Thread.Sleep(1000);
var cts = (CancellationTokenSource)(typeof(CancellationToken)
    .GetField("_source", BindingFlags.NonPublic | BindingFlags.Instance)
    .GetValue(capturedToken));
bool disposed = (bool)(typeof(CancellationTokenSource)
    .GetField("_disposed", BindingFlags.NonPublic | BindingFlags.Instance)
    .GetValue(cts));
Console.WriteLine($"IsCancellationRequested: {cts.IsCancellationRequested}");
Console.WriteLine($"IsDisposed: {disposed}");
Output:
Token canceled
The observable was terminated
IsCancellationRequested: True
IsDisposed: False
Try it on Fiddle (.NET Framework version, having differently named private fields)
The captured CancellationToken is inspected half a second after the asynchronous operation has been canceled and the observable has terminated. The _disposed field has the value false, indicating that the Dispose method of the associated CancellationTokenSource has not been invoked. Am I doing something wrong, or the Rx library indeed omits disposing of the CancellationTokenSources it creates?
.NET 5.0.1, System.Reactive 5.0.0, C# 9