Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE - An illustration of how to observe post-cancel task activity #3556

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/ReactiveUI.Tests/Commands/ReactiveCommandTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
Expand Down Expand Up @@ -1200,6 +1201,103 @@ public async Task ReactiveCommandCreateFromTaskHandlesTaskExceptionAsync()
Assert.Equal("break execution", fail?.Message);
}

[Fact]
public async Task ReactiveCommandCreateFromTaskThenCancelSetsIsExecutingFalseOnlyAfterCancellationCompleteAsync()
{
// This tests for the problem described at https://github.com/reactiveui/ReactiveUI/issues/2153
// The exact sequence of events is important here. In particular, we need the test to be able
// to make observations while a task is in progress, while it is in the process of being cancelled,
// and after it has finished. This requires some careful sequencing. The System.Threading.Barrier
// class is designed for managing precisely this kind lock-step progress. Unfortunately, it
// doesn't directly intrinsically support async/await. Its SignalAndWait blocks the calling
// thread, which is a problem for async UI code, since that typically uses a single thread for
// most work. Calling SignalAndWait on the UI thread (or in this case, the test thread, which
// is effectively a stand-in for the UI thread) deadlocks, because the matching call to
// SignalAndWait that it's waiting can't happen until the UI thread becomes available.
// So we wrap the use of this in an async-friendly helper that calls SignalAndWait on a
// thread pool thread.
// https://learn.microsoft.com/en-us/dotnet/api/microsoft.visualstudio.threading.asyncbarrier
// would arguably be a better solution, but due to some slightly unfortunate accidents of
// history, it and a whole load of other highly useful async synchronization primitives
// ended up in a DLL whose name makes it sound a lot like it will only work in Visual Studio.
// I didn't want to be the one to introduce a dependency on that component, hence this
// ad hoc wrapper instead, but I would recommend at least considering using the very
// misleadingly-named https://www.nuget.org/packages/Microsoft.VisualStudio.Threading.
using var phaseSync = new Barrier(2);
Task AwaitTestPhaseAsync() => Task.Run(() => phaseSync.SignalAndWait(CancellationToken.None));

var fixture = ReactiveCommand.CreateFromTask(async (token) =>
{
// Phase 1: command execution has begun.
await AwaitTestPhaseAsync();

Debug.WriteLine("started command");
try
{
await Task.Delay(10000, token);
}
catch (OperationCanceledException)
{
// Phase 2: command task has detected cancellation request.
await AwaitTestPhaseAsync();

// Phase 3: test has observed IsExecuting while cancellation is in progress.
await AwaitTestPhaseAsync();

////Debug.WriteLine("starting cancelling command");
////await Task.Delay(5000, CancellationToken.None);
////Debug.WriteLine("finished cancelling command");
throw;
}

Debug.WriteLine("finished command");
});

// This test needs to check the latest value emitted by IsExecuting at various points.
// The obvious way to do this would be with "await fixture.IsExecuting", but that ends
// up involving various bits of Rx scheduling machinery, which can interfere with the
// sequencing this test requires. (For example, "await fixture.IsExecuting" can end up
// waiting until after the entire Task we're testing here has actually completed!)
// So we just keep a variable up to date with the most recently observed value, enabling
// the test to inspect that at any time without an await.
var latestIsExecutingValue = false;
fixture.IsExecuting.Subscribe(isExecuting =>
{
Debug.WriteLine($"command executing = {isExecuting}");
Volatile.Write(ref latestIsExecutingValue, isExecuting);
});

var disposable = fixture.Execute().Subscribe();

// Phase 1: command execution has begun.
await AwaitTestPhaseAsync();

Assert.True(Volatile.Read(ref latestIsExecutingValue), "IsExecuting should be true when execution is underway");

disposable.Dispose();

// Phase 2: command task has detected cancellation request.
await AwaitTestPhaseAsync();

Assert.True(Volatile.Read(ref latestIsExecutingValue), "IsExecuting should remain true while cancellation is in progress");

// Phase 3: test has observed IsExecuting while cancellation is in progress.
await AwaitTestPhaseAsync();

// Finally, we need to wait for the task to complete. We can't directly observe this,
// because once the task has actually completed, it can't give us any sort of notification.
// If it were able to do something to notify us, then that would mean it was still
// running.
// So instead, we're just going to wait for IsExecuting to become false.
var start = Environment.TickCount;
while (unchecked(Environment.TickCount - start) < 1000 && Volatile.Read(ref latestIsExecutingValue))
{
await Task.Yield();
}

Assert.False(Volatile.Read(ref latestIsExecutingValue), "IsExecuting should be false once cancellation completes");
}

[Fact]
public async Task ReactiveCommandExecutesFromInvokeCommand()
{
Expand Down
122 changes: 122 additions & 0 deletions src/ReactiveUI/ObservableEx.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) 2023 .NET Foundation and Contributors. All rights reserved.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ReactiveUI;

/// <summary>
/// Observables not already built into Rx.
/// </summary>
internal static class ObservableEx
{
/// <summary>
/// Adapts a factory of <see cref="Task"/> in a way that enables observation to continue after
/// cancellation has been requested.
/// </summary>
/// <param name="actionAsync">
/// The factory method that will be invoked to start a task each time an observer subscribes.
/// </param>
/// <returns>
/// An observable source which executes the factory method each time an observer subscribes, and
/// which provides a single item: a tuple containing an <see cref="IObservable{T}"/> representing
/// the outcome of the task, and a callback that can be invoked to cancel this particular
/// invocation of the task.
/// </returns>
/// <remarks>
/// <para>
/// This is conceptually similar to <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/>,
/// except it better supports scenarios in which application code wants to observe what the task
/// does even after cancellation.
/// </para>
/// <para>
/// Rx's <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/> is simpler, but it has
/// a consequent limitation. It is simpler because it enables cancellation through unsubscription:
/// if you want to cancel the task that was started when a particular observer subscribed to the
/// observable it returns, all you need to do is unsubscribe. (To be more precise, if <c>taskSource</c>
/// is some <see cref="IObservable{T}"/> returned by this method, each time you call
/// <c>taskSource.Subscribe</c>, it will start a new task and return an <see cref="IDisposable"/>.
/// As with any Rx subscription, you can unsubscribe by calling <see cref="IDisposable.Dispose"/>
/// on that object returned by <see cref="IObservable{T}.Subscribe(IObserver{T})"/>. And in this case,
/// where the observable is a wrapper for a task returned by
/// <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/>, that will also attempt
/// to cancel the task, via the <see cref="CancellationToken"/> that was passed to the factory method.)
/// But there is a fundamental limitation with this: the rules of Rx state that from the moment you
/// call <c>Dispose</c> (and before <c>Dispose</c> has returned), you cannot rely on getting any
/// further notifications. The rules permit further notifications (until such time as <c>Dispose</c>
/// returns) but they do not require them. And in practice, <c>System.Reactive</c> implements all
/// of its observables in a way that tends to shut down notifications very early on.
/// </para>
/// <para>
/// The upshot of this is that if you cancel a task by unsubscribing from the observer that was
/// going to tell you what the task did, you can't rely on receiving any notification about the
/// outcome of the task. This has two upshots. Firstly, if cancellation takes a while to process
/// there's no way to discover when it has finally completed. Secondly, if an error occurs while
/// the task is attempting to stop, you'll have no way to observe that.
/// </para>
/// <para>
/// These proplems are fundamentally unavoidable if the mechanism by which you cancel the task is
/// to unsubscribe from notifications about the outcome of the task. This method therefore takes
/// a different approach: it separates observation from cancellation. It supplies a delegate you
/// can invoke to initiate cancellation without having to unsubscribe. This enabes you to discover
/// when and how the task finally completes.
/// </para>
/// <para>
/// This method might seem more complex than it needs to be. The 'obvious' simpler method might
/// have this signature:
/// </para>
/// <code>
/// <![CDATA[
/// (IObservable<Unit> Result, Action Cancel) FromAsyncWithPostCancelNotifications(Func<CancellationToken, Task> actionAsync)
/// ]]>
/// </code>
/// <para>
/// However, the problem with that is that it's only good for one shot. By design, FromAsync invokes
/// its action callback every time you subscribe to the source it returns and this method does the same.
/// So it's no good returning a single cancellation callback. We need one each time the operation is
/// invoked. And since invocation is triggered by subscribing to the source, the observable
/// itself is going to need to return the means of cancellation.
/// </para>
/// <para>
/// So as with <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/>, each subscriber to
/// an observable returned by this method will receive a single notification. That notification
/// is a tuple. The first value is another <see cref="IObservable{T}"/>, which provides the
/// outcome when the task completes (just like the observable returned by
/// <see cref="Observable.FromAsync(Func{CancellationToken, Task})"/>), but this tuple also provides
/// a callback that you can invoke to cancel the task. If you remain subscribed to the inner observable
/// it will tell you when the task eventually completes (which might take some time, because cancellation
/// is never instantaneous, and can sometimes be quite slow) and whether it did so successfully or
/// by producing an error.
/// </para>
/// <para>
/// As per https://github.com/reactiveui/ReactiveUI/issues/2153#issuecomment-1495544227 this could
/// become a feature of Rx in the future if the ReactiveUI team wants it. Since it's not in there now, we
/// need to roll our own.
/// </para>
/// </remarks>
internal static IObservable<(IObservable<Unit> Result, Action Cancel)> FromAsyncWithPostCancelNotifications(
Func<CancellationToken, Task> actionAsync)
{
return Observable.Defer(
() =>
{
var cancelThisInvocationSource = new CancellationTokenSource();
var result = Observable.FromAsync(
async cancelFromRx =>
{
var combinedCancel = CancellationTokenSource.CreateLinkedTokenSource(
cancelThisInvocationSource.Token, cancelFromRx);
await actionAsync(combinedCancel.Token);
});

return Observable.Return<(IObservable<Unit> Result, Action Cancel)>(
(result, () => cancelThisInvocationSource.Cancel()));
});
}
}
Loading