Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE - An illustration of how to observe post-cancel task activity #3556

Closed

Conversation

idg10
Copy link

@idg10 idg10 commented Jun 2, 2023

This is a response to #2153

For a while now, the ReactiveUI team has asserted that the root cause for this issue (and some other related ones) is that if you ask Rx to wrap a Task as an IObservable, you don't get completion or error notifications if you unsubscribe before the task completes.

This behaviour is by design, but it is problematic in cases where you want to cancel the task, because task cancellation is triggered by unsubscription.

Our view is that if you're using Rx's task wrapping, then you can either get notifications through to completion, or you can unsubscribe early to cancel the task, but you can't do both. (Rx always shuts down notifications as the first act of unsubscription. This is permitted within the rules of IObservable, and is deeply baked into the framework. Although it is not technically illegal to implement an IObservable that continues to produce notifications up until the call to Dispose returns, you shouldn't rely on that, because such behaviour intentionally leaves implementations some wiggle room. Basically, once you've called Dispose, all best are off.)

But since we want to engage constructively with Rx users, we don't just want to say "by design" and leave it at that. So this PR is an illustrative spike showing one way to solve this that doesn't rely on post-Dispose notifications.

This is a response to reactiveui#2153

For a while now, the ReactiveUI team has asserted that the root cause for this issue (and some other related ones) is that if you ask Rx to wrap a Task<T> as an IObservable<T>, you don't get completion or error notifications if you unsubscribe before the task completes.

This behaviour is by design, but it is problematic in cases where you want to cancel the task, because task cancellation is triggered by unsubscription.

Our view is that if you're using Rx's task wrapping, then you can either get notifications through to completion, or you can unsubscribe early to cancel the task, but you can't do both. (Rx always shuts down notifications as the first act of unsubscription. This is permitted within the rules of IObservable, and is deeply baked into the framework. Although it is not technically illegal to implement an IObservable that continues to produce notifications up until the call to Dispose returns, you shouldn't rely on that, because such behaviour intentionally leaves implementations some wiggle room. Basically, once you've called Dispose, all best are off.)

But since we want to engage constructively with Rx users, we don't just want to say "by design" and leave it at that. So this PR is an illustrative spike showing one way to solve this that doesn't rely on post-Dispose notifications.
@ChrisPulman
Copy link
Member

Hi Ian,

Thank you for putting this together, I have tested the functionality and it would seem from my point of view to solve the issues at hand. I added the following to the ObservableEx class for the remaining variants of Commands:

internal static IObservable<(IObservable<Unit> Result, Action Cancel)> FromAsyncWithPostCancelNotifications<TParam>(
        Func<TParam, CancellationToken, Task> actionAsync, TParam param) => Observable.Defer(
            () =>
            {
                var cancelThisInvocationSource = new CancellationTokenSource();
                var result = Observable.FromAsync(
                    async cancelFromRx =>
                    {
                        var combinedCancel = CancellationTokenSource.CreateLinkedTokenSource(
                            cancelThisInvocationSource.Token, cancelFromRx);
                        await actionAsync(param, combinedCancel.Token);
                    });

                return Observable.Return<(IObservable<Unit> Result, Action Cancel)>(
                    (result, () => cancelThisInvocationSource.Cancel()));
            });

    internal static IObservable<(IObservable<TResult> Result, Action Cancel)> FromAsyncWithPostCancelNotifications<TResult>(
        Func<CancellationToken, Task<TResult>> actionAsync) => Observable.Defer(
            () =>
            {
                var cancelThisInvocationSource = new CancellationTokenSource();
                var result = Observable.FromAsync(
                    async cancelFromRx =>
                    {
                        var combinedCancel = CancellationTokenSource.CreateLinkedTokenSource(
                            cancelThisInvocationSource.Token, cancelFromRx);
                        return await actionAsync(combinedCancel.Token);
                    });

                return Observable.Return<(IObservable<TResult> Result, Action Cancel)>(
                    (result, () => cancelThisInvocationSource.Cancel()));
            });

    internal static IObservable<(IObservable<TResult> Result, Action Cancel)> FromAsyncWithPostCancelNotifications<TParam, TResult>(
        Func<TParam, CancellationToken, Task<TResult>> actionAsync, TParam param) => Observable.Defer(
            () =>
            {
                var cancelThisInvocationSource = new CancellationTokenSource();
                var result = Observable.FromAsync(
                    async cancelFromRx =>
                    {
                        var combinedCancel = CancellationTokenSource.CreateLinkedTokenSource(
                            cancelThisInvocationSource.Token, cancelFromRx);
                        return await actionAsync(param, combinedCancel.Token);
                    });

                return Observable.Return<(IObservable<TResult> Result, Action Cancel)>(
                    (result, () => cancelThisInvocationSource.Cancel()));
            });

I had to alter the CreateFromObservableCancellable to allow more than a Unit as a Parameter, therefore the two functions are:

internal static ReactiveCommand<TParam, TResult> CreateFromObservableCancellable<TParam, TResult>(
        Func<IObservable<(IObservable<TResult> Result, Action Cancel)>> execute,
        IObservable<bool>? canExecute = null,
        IScheduler? outputScheduler = null)

internal static ReactiveCommand<TParam, TResult> CreateFromObservableCancellable<TParam, TResult>(
        Func<TParam, IObservable<(IObservable<TResult> Result, Action Cancel)>> execute,
        IObservable<bool>? canExecute = null,
        IScheduler? outputScheduler = null)

I added these tests

[Fact]
        public async Task ReactiveCommandCreateFromTaskHandlesExecuteCancellation()
        {
            using var phaseSync = new Barrier(2);
            var statusTrail = new List<(int Position, string Status)>();
            var position = 0;
            var fixture = ReactiveCommand.CreateFromTask(
                        async cts =>
                        {
                            await AwaitTestPhaseAsync(phaseSync); // #1
                            statusTrail.Add((position++, "started command"));
                            try
                            {
                                await Task.Delay(10000, cts);
                            }
                            catch (OperationCanceledException)
                            {
                                // User Handles cancellation.
                                statusTrail.Add((position++, "starting cancelling command"));
                                await AwaitTestPhaseAsync(phaseSync); // #2

                                // dummy cleanup
                                await AwaitTestPhaseAsync(phaseSync); // #3
                                statusTrail.Add((position++, "finished cancelling command"));
                                throw;
                            }

                            return Unit.Default;
                        },
                        outputScheduler: ImmediateScheduler.Instance);

            Exception? fail = null;
            fixture.ThrownExceptions.Subscribe(ex => fail = ex);
            var latestIsExecutingValue = false;
            fixture.IsExecuting.Subscribe(isExecuting =>
            {
                statusTrail.Add((position++, $"command executing = {isExecuting}"));
                Volatile.Write(ref latestIsExecutingValue, isExecuting);
            });

            fail.Should().BeNull();
            var result = false;
            var disposable = fixture.Execute().Subscribe(_ => result = true);
            await AwaitTestPhaseAsync(phaseSync); // #1
            Volatile.Read(ref latestIsExecutingValue).Should().BeTrue();
            statusTrail.Any(x => x.Status == "started command").Should().BeTrue();
            disposable.Dispose();
            await AwaitTestPhaseAsync(phaseSync); // #2
            Volatile.Read(ref latestIsExecutingValue).Should().BeTrue();
            await AwaitTestPhaseAsync(phaseSync); // #3

            var start = Environment.TickCount;
            while (unchecked(Environment.TickCount - start) < 1000 && Volatile.Read(ref latestIsExecutingValue))
            {
                await Task.Yield();
            }

            // No result expected as cancelled
            result.Should().BeFalse();
            statusTrail.Should().Equal(
                               (0, "command executing = False"),
                               (1, "command executing = True"),
                               (2, "started command"),
                               (3, "starting cancelling command"),
                               (4, "finished cancelling command"),
                               (5, "command executing = False"));
            (fail as OperationCanceledException).Should().NotBeNull();
        }

        [Fact]
        public void ReactiveCommandCreateFromTaskHandlesTaskException() =>
            new TestScheduler().With(
                async scheduler =>
                {
                    var subj = new Subject<Unit>();
                    Exception? fail = null;
                    var fixture = ReactiveCommand.CreateFromTask(
                        async cts =>
                        {
                            await subj.Take(1);
                            throw new Exception("break execution");
                        },
                        outputScheduler: scheduler);
                    fixture.IsExecuting.ToObservableChangeSet(ImmediateScheduler.Instance).Bind(out var isExecuting).Subscribe();
                    fixture.ThrownExceptions.Subscribe(ex => fail = ex);
                    isExecuting[0].Should().BeFalse();
                    fail.Should().BeNull();
                    fixture.Execute().Subscribe();

                    scheduler.AdvanceByMs(10);
                    isExecuting[1].Should().BeTrue();
                    fail.Should().BeNull();

                    scheduler.AdvanceByMs(10);
                    subj.OnNext(Unit.Default);

                    scheduler.AdvanceByMs(10);
                    isExecuting[2].Should().BeFalse();
                    fail?.Message.Should().Be("break execution");

                    // Required for correct async / await task handling
                    await Task.Delay(0).ConfigureAwait(false);
                });

        [Fact]
        public async Task ReactiveCommandCreateFromTaskHandlesCancellation()
        {
            var phaseSync = new Barrier(2);
            var statusTrail = new List<(int Position, string Status)>();
            var position = 0;
            var fixture = ReactiveCommand.CreateFromTask(
                        async cts =>
                        {
                            await AwaitTestPhaseAsync(phaseSync); // #1
                            statusTrail.Add((position++, "started command"));
                            try
                            {
                                await Task.Delay(10000, cts);
                            }
                            catch (OperationCanceledException)
                            {
                                // User Handles cancellation.
                                statusTrail.Add((position++, "starting cancelling command"));
                                await AwaitTestPhaseAsync(phaseSync); // #2

                                // dummy cleanup
                                statusTrail.Add((position++, "finished cancelling command"));
                                await AwaitTestPhaseAsync(phaseSync); // #3
                                throw;
                            }

                            return Unit.Default;
                        },
                        outputScheduler: ImmediateScheduler.Instance);

            Exception? fail = null;
            fixture.ThrownExceptions.Subscribe(ex => fail = ex);
            var latestIsExecutingValue = false;
            fixture.IsExecuting.Subscribe(isExecuting =>
            {
                statusTrail.Add((position++, $"command executing = {isExecuting}"));
                Volatile.Write(ref latestIsExecutingValue, isExecuting);
            });

            fail.Should().BeNull();
            var result = false;
            var disposable = fixture.Execute().Subscribe(_ => result = true);
            await AwaitTestPhaseAsync(phaseSync); // #1
            Volatile.Read(ref latestIsExecutingValue).Should().BeTrue();
            statusTrail.Any(x => x.Status == "started command").Should().BeTrue();
            disposable.Dispose();
            await AwaitTestPhaseAsync(phaseSync); // #2
            Volatile.Read(ref latestIsExecutingValue).Should().BeTrue();
            await AwaitTestPhaseAsync(phaseSync); // #3
            var start = Environment.TickCount;
            while (unchecked(Environment.TickCount - start) < 1000 && Volatile.Read(ref latestIsExecutingValue))
            {
                await Task.Yield();
            }

            // No result expected as cancelled
            result.Should().BeFalse();
            statusTrail.Should().Equal(
                               (0, "command executing = False"),
                               (1, "command executing = True"),
                               (2, "started command"),
                               (3, "starting cancelling command"),
                               (4, "finished cancelling command"),
                               (5, "command executing = False"));
            (fail as OperationCanceledException).Should().NotBeNull();
        }

        [Fact]
        public async Task ReactiveCommandCreateFromTaskHandlesCompletion()
        {
            var phaseSync = new Barrier(2);
            var statusTrail = new List<(int Position, string Status)>();
            var position = 0;
            var fixture = ReactiveCommand.CreateFromTask(
                        async cts =>
                        {
                            await AwaitTestPhaseAsync(phaseSync); // #1
                            statusTrail.Add((position++, "started command"));
                            try
                            {
                                await Task.Delay(1000, cts);
                            }
                            catch (OperationCanceledException)
                            {
                                // User Handles cancellation.
                                statusTrail.Add((position++, "starting cancelling command"));

                                // dummy cleanup
                                await Task.Delay(5000, CancellationToken.None).ConfigureAwait(false);
                                statusTrail.Add((position++, "finished cancelling command"));
                                throw;
                            }

                            statusTrail.Add((position++, "finished command"));
                            await AwaitTestPhaseAsync(phaseSync); // #2
                            return Unit.Default;
                        },
                        outputScheduler: ImmediateScheduler.Instance);

            Exception? fail = null;
            fixture.ThrownExceptions.Subscribe(ex => fail = ex);
            var latestIsExecutingValue = false;
            fixture.IsExecuting.Subscribe(isExecuting =>
            {
                statusTrail.Add((position++, $"command executing = {isExecuting}"));
                Volatile.Write(ref latestIsExecutingValue, isExecuting);
            });

            fail.Should().BeNull();
            var result = false;
            fixture.Execute().Subscribe(_ => result = true);
            await AwaitTestPhaseAsync(phaseSync); // #1
            Volatile.Read(ref latestIsExecutingValue).Should().BeTrue();
            await AwaitTestPhaseAsync(phaseSync); // #2

            var start = Environment.TickCount;
            while (unchecked(Environment.TickCount - start) < 1000 && Volatile.Read(ref latestIsExecutingValue))
            {
                await Task.Yield();
            }

            result.Should().BeTrue();
            statusTrail.Should().Equal(
                               (0, "command executing = False"),
                               (1, "command executing = True"),
                               (2, "started command"),
                               (3, "finished command"),
                               (4, "command executing = False"));
            fail.Should().BeNull();

            // Check execution completed
            Volatile.Read(ref latestIsExecutingValue).Should().BeFalse();
        }

		/// <summary>
        /// Awaits the test phase asynchronous.
        /// </summary>
        /// <param name="phaseSync">The phase synchronize, add a local var phaseSync = new Barrier(2);.</param>
        /// <returns>A Task.</returns>
        private static Task AwaitTestPhaseAsync(Barrier phaseSync) =>
            Task.Run(() => phaseSync.SignalAndWait(CancellationToken.None));

Personally I am happy with this execution and the sequencing is as expected and similar to other Rx functions.
This was the execution sequences that I was trying to get a resolution to match:

  • execute Task => exception thrown in user code or task cancellation occurs => Task completes => error => finally.
    OR
  • execute Task => Task completes => produce result/s (should always return a Unit upon task completion if no value produced) => complete => finally.

The code produced in this illustration produces the sequence as expected and seems to solve the issue we have been experiencing.

I can see how this function would benefit other people in a similar situation who currently use Observable.FromAsync and also experience the same issues with cancellation, therefore I can see the benefit of having the relevant functions added to the System.Reactive library.

@glennawatson , @anaisbetts Please review and give feedback on this, thank you.

@idg10
Copy link
Author

idg10 commented Jun 7, 2023

OK, I've added a feature request in the Rx repo to track this: dotnet/reactive#1966

@idg10 idg10 changed the title DO NOTE MERGE - An illustration of how to observe post-cancel task activity DO NOT MERGE - An illustration of how to observe post-cancel task activity Jun 23, 2023
ChrisPulman added a commit that referenced this pull request Jan 1, 2024
…Task (#3704)

<!-- Please be sure to read the
[Contribute](https://github.com/reactiveui/reactiveui#contribute)
section of the README -->

**What kind of change does this PR introduce?**
<!-- Bug fix, feature, docs update, ... -->

Fix for #1245
Fix for #2153
Fix for #3450

**What is the current behavior?**
<!-- You can also link to an open issue here. -->

ReactiveCommand does not properly support Cancellation tokens properly
for CreateFromTask due to an underlying issue in System.Reactive

**What is the new behavior?**
<!-- If this is a feature change -->

Fix the issues with the base functions within ReactiveCommand due to an
issue with Observable.FromAsync from System.Reactive by using a new
ObservableMixins.FromAsyncWithAllNotifications as the new function, this
extends Observable.FromAsync handling the error bubbling as required.

ObservableMixins.FromAsyncWithAllNotifications can be used to transform
a Cancellation Task into an Observable producing the expected
cancellation, errors and results.

**What might this PR break?**

ReactiveCommand.CreateFromTask will now handle exceptions as expected,
any existing workarounds could be removed once tested with actual
implementation in end users code.

**Please check if the PR fulfills these requirements**
- [x] Tests for the changes have been added (for bug fixes / features)
- [ ] Docs have been added / updated (for bug fixes / features)

**Other information**:

Co-authored-by: @idg10 - created the base code in #3556
Copy link

This pull request has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jan 15, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants