diff --git a/src/Microsoft.DotNet.Interactive.CSharp/CSharpKernel.cs b/src/Microsoft.DotNet.Interactive.CSharp/CSharpKernel.cs index edbacb02e2..2924e31b54 100644 --- a/src/Microsoft.DotNet.Interactive.CSharp/CSharpKernel.cs +++ b/src/Microsoft.DotNet.Interactive.CSharp/CSharpKernel.cs @@ -254,13 +254,16 @@ await RunAsync( // Publish the compilation diagnostics. This doesn't include the exception. var kernelDiagnostics = diagnostics.Select(Diagnostic.FromCodeAnalysisDiagnostic).ToImmutableArray(); - var formattedDiagnostics = - diagnostics - .Select(d => d.ToString()) - .Select(text => new FormattedValue(PlainTextFormatter.MimeType, text)) - .ToImmutableArray(); + if (kernelDiagnostics.Length > 0) + { + var formattedDiagnostics = + diagnostics + .Select(d => d.ToString()) + .Select(text => new FormattedValue(PlainTextFormatter.MimeType, text)) + .ToImmutableArray(); - context.Publish(new DiagnosticsProduced(kernelDiagnostics, submitCode, formattedDiagnostics)); + context.Publish(new DiagnosticsProduced(kernelDiagnostics, submitCode, formattedDiagnostics)); + } // Report the compilation failure or exception if (exception != null) diff --git a/src/Microsoft.DotNet.Interactive.ExtensionLab/RecordTranscriptExtension.cs b/src/Microsoft.DotNet.Interactive.ExtensionLab/RecordTranscriptExtension.cs index 77a2122c9d..fc9008bc35 100644 --- a/src/Microsoft.DotNet.Interactive.ExtensionLab/RecordTranscriptExtension.cs +++ b/src/Microsoft.DotNet.Interactive.ExtensionLab/RecordTranscriptExtension.cs @@ -1,9 +1,11 @@ -using System; +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + using System.CommandLine; using System.CommandLine.Invocation; using System.IO; -using System.Linq; using System.Threading.Tasks; + using Microsoft.DotNet.Interactive.Server; namespace Microsoft.DotNet.Interactive.ExtensionLab diff --git a/src/Microsoft.DotNet.Interactive.Js/src/dotnet-interactive/contracts.ts b/src/Microsoft.DotNet.Interactive.Js/src/dotnet-interactive/contracts.ts index c2e74ba698..edc7175c76 100644 --- a/src/Microsoft.DotNet.Interactive.Js/src/dotnet-interactive/contracts.ts +++ b/src/Microsoft.DotNet.Interactive.Js/src/dotnet-interactive/contracts.ts @@ -6,10 +6,12 @@ // --------------------------------------------- Kernel Commands export const AddPackageType = "AddPackage"; +export const CancelType = "Cancel"; export const ChangeWorkingDirectoryType = "ChangeWorkingDirectory"; export const DisplayErrorType = "DisplayError"; export const DisplayValueType = "DisplayValue"; export const ParseNotebookType = "ParseNotebook"; +export const QuitType = "Quit"; export const RequestCompletionsType = "RequestCompletions"; export const RequestDiagnosticsType = "RequestDiagnostics"; export const RequestHoverTextType = "RequestHoverText"; @@ -20,10 +22,12 @@ export const UpdateDisplayedValueType = "UpdateDisplayedValue"; export type KernelCommandType = typeof AddPackageType + | typeof CancelType | typeof ChangeWorkingDirectoryType | typeof DisplayErrorType | typeof DisplayValueType | typeof ParseNotebookType + | typeof QuitType | typeof RequestCompletionsType | typeof RequestDiagnosticsType | typeof RequestHoverTextType @@ -40,6 +44,9 @@ export interface KernelCommand { targetKernelName?: string; } +export interface Cancel extends KernelCommand { +} + export interface ChangeWorkingDirectory extends KernelCommand { workingDirectory: string; } @@ -58,6 +65,9 @@ export interface ParseNotebook extends KernelCommand { rawData: Uint8Array; } +export interface Quit extends Cancel { +} + export interface RequestCompletions extends LanguageServiceCommand { } diff --git a/src/Microsoft.DotNet.Interactive.Tests/CancelCommandTests.cs b/src/Microsoft.DotNet.Interactive.Tests/CancelCommandTests.cs new file mode 100644 index 0000000000..dbb3968cb2 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/CancelCommandTests.cs @@ -0,0 +1,203 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading.Tasks; + +using FluentAssertions; +using FluentAssertions.Execution; + +using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Events; +using Microsoft.DotNet.Interactive.Tests.Utility; + +using Xunit; +using Xunit.Abstractions; + + +namespace Microsoft.DotNet.Interactive.Tests +{ + public class CancelCommandTests : LanguageKernelTestBase + { + public CancelCommandTests(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task cancel_command_cancels_all_deferred_commands_on_composite_kernel() + { + var deferredCommandExecuted = false; + + var kernel = CreateKernel(); + + var deferred = new SubmitCode("placeholder") + { + Handler = (command, context) => + { + deferredCommandExecuted = true; + return Task.CompletedTask; + } + }; + + var cancelCommand = new Cancel(); + + kernel.DeferCommand(deferred); + + var events = kernel.KernelEvents.ToSubscribedList(); + + await kernel.SendAsync(cancelCommand); + + using var _ = new AssertionScope(); + + deferredCommandExecuted.Should().BeFalse(); + + events + .Should().ContainSingle() + .Which + .Command + .Should() + .Be(cancelCommand); + } + + [Theory] + [InlineData(Language.CSharp, Skip = "requires scheduler working")] + [InlineData(Language.FSharp, Skip = "requires scheduler working")] + [InlineData(Language.PowerShell, Skip = "requires scheduler working")] + public async Task cancel_command_cancels_all_deferred_commands_on_subkernels(Language language) + { + + var kernel = CreateKernel(language); + + var deferred = new CancellableCommand(); + + var cancelCommand = new Cancel(); + + foreach (var subkernel in kernel.ChildKernels) + { + subkernel.DeferCommand(deferred); + } + + await kernel.SendAsync(cancelCommand); + + using var _ = new AssertionScope(); + + deferred.HasRun.Should().BeFalse(); + + KernelEvents + .Should().ContainSingle() + .Which + .Command + .Should() + .Be(cancelCommand); + } + + [Theory] + [InlineData(Language.CSharp, Skip = "requires scheduler working")] + [InlineData(Language.FSharp, Skip = "requires scheduler working")] + [InlineData(Language.PowerShell, Skip = "to address later")] + public void cancel_command_cancels_the_running_command(Language language) + { + + var kernel = CreateKernel(language); + + var cancelCommand = new Cancel(); + + var submitCodeCommand = new CancellableCommand(); + + Task.WhenAll( + Task.Run(async () => + { + await kernel.SendAsync(submitCodeCommand); + }), + Task.Run(async () => + { + await Task.Delay(100); + await kernel.SendAsync(cancelCommand); + })) + .Wait(TimeSpan.FromSeconds(20)); + + + using var _ = new AssertionScope(); + + submitCodeCommand.HasRun.Should().BeTrue(); + submitCodeCommand.HasBeenCancelled.Should().BeTrue(); + + KernelEvents + .Should() + .ContainSingle(c => c.Command == submitCodeCommand) + .Which + .Exception + .Should() + .BeOfType(); + } + + [Theory] + [InlineData(Language.CSharp, Skip = "requires scheduler working")] + [InlineData(Language.FSharp, Skip = "requires scheduler working")] + [InlineData(Language.PowerShell, Skip = "to address later")] + public async Task commands_issued_after_cancel_command_are_executed(Language language) + { + + var kernel = CreateKernel(language); + + var cancelCommand = new Cancel(); + + var commandToCancel = new CancellableCommand( ); + + var commandToRun = new SubmitCode("1"); + + var _ = kernel.SendAsync(commandToCancel); + await Task.Delay(4000); + await kernel.SendAsync(cancelCommand); + await kernel.SendAsync(commandToRun); + + // using var _ = new AssertionScope(); + + commandToCancel.HasRun.Should().BeTrue(); + commandToCancel.HasBeenCancelled.Should().BeTrue(); + + KernelEvents + .Should() + .ContainSingle(c => c.Command == commandToCancel) + .Which + .Exception + .Should() + .BeOfType(); + } + + + [Theory] + [InlineData(Language.CSharp, @"while(true){ + if(Microsoft.DotNet.Interactive.KernelInvocationContext.Current.CancellationToken.IsCancellationRequested) + { + Console.WriteLine(""done c#""); + break; + } +} +", "done c#", Skip = "requires scheduler working")] + [InlineData(Language.FSharp, @" +System.Threading.Thread.Sleep(3000) +Console.WriteLine(""done c#"")", "done f#", Skip = "for the moment")] + public async Task user_code_can_react_to_cancel_command_using_cancellation_token(Language language, string code, string expectedValue) + { + var kernel = CreateKernel(language); + var cancelCommand = new Cancel(); + var submitCodeCommand = new SubmitCode(code); + + var _ = kernel.SendAsync(submitCodeCommand); + await Task.Delay(4000); + await kernel.SendAsync(cancelCommand); + + KernelEvents + .Should() + .ContainSingle() + .Which + .Value + .Should() + .Be(expectedValue); + + } + } + + +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive.Tests/CompositeKernelTests.cs b/src/Microsoft.DotNet.Interactive.Tests/CompositeKernelTests.cs index 6f6a40ea70..7d31537183 100644 --- a/src/Microsoft.DotNet.Interactive.Tests/CompositeKernelTests.cs +++ b/src/Microsoft.DotNet.Interactive.Tests/CompositeKernelTests.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Threading.Tasks; using FluentAssertions; +using FluentAssertions.Execution; using Microsoft.DotNet.Interactive.Commands; using Microsoft.DotNet.Interactive.CSharp; using Microsoft.DotNet.Interactive.Events; @@ -21,7 +22,7 @@ namespace Microsoft.DotNet.Interactive.Tests { public class CompositeKernelTests : IDisposable { - private readonly CompositeDisposable _disposables = new CompositeDisposable(); + private readonly CompositeDisposable _disposables = new(); public CompositeKernelTests(ITestOutputHelper output) { @@ -424,6 +425,7 @@ public async Task Deferred_commands_on_composite_kernel_are_execute_on_first_sub typeof(CompleteCodeSubmissionReceived), typeof(CommandSucceeded)); } + [Fact] public async Task Deferred_commands_on_composite_kernel_can_use_directives() diff --git a/src/Microsoft.DotNet.Interactive.Tests/KernelSchedulerTests.cs b/src/Microsoft.DotNet.Interactive.Tests/KernelSchedulerTests.cs new file mode 100644 index 0000000000..bf874247d6 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/KernelSchedulerTests.cs @@ -0,0 +1,367 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using FluentAssertions; + +using Microsoft.DotNet.Interactive.Tests.Utility; + +using Pocket; + +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.DotNet.Interactive.Tests +{ + public class KernelSchedulerTests : IDisposable + { + private readonly CompositeDisposable _disposables = new(); + + public KernelSchedulerTests(ITestOutputHelper output) + { + DisposeAfterTest(output.SubscribeToPocketLogger()); + } + + public void Dispose() + { + try + { + _disposables?.Dispose(); + } + catch (Exception ex) + { + Logger.Log.Error(exception: ex); + } + } + + private void DisposeAfterTest(IDisposable disposable) + { + _disposables.Add(disposable); + } + + private void DisposeAfterTest(Action action) + { + _disposables.Add(action); + } + + [Fact] + public async Task scheduled_work_is_completed_in_order() + { + using var scheduler = new KernelScheduler(); + + var executionList = new List(); + + await scheduler.Schedule(1, PerformWork); + await scheduler.Schedule(2, PerformWork); + await scheduler.Schedule(3, PerformWork); + + + executionList.Should().BeEquivalentSequenceTo(1, 2, 3); + + Task PerformWork(int v) + { + executionList.Add(v); + return Task.FromResult(v); + } + } + + [Fact] + public async Task scheduled_work_does_not_execute_in_parallel() + { + using var scheduler = new KernelScheduler(); + var concurrencyCounter = 0; + var maxObservedParallelism = 0; + var tasks = new Task[3]; + + for (var i = 0; i < 3; i++) + { + var task = scheduler.Schedule(i, async v => + { + Interlocked.Increment(ref concurrencyCounter); + + await Task.Delay(100); + maxObservedParallelism = Math.Max(concurrencyCounter, maxObservedParallelism); + + Interlocked.Decrement(ref concurrencyCounter); + return v; + }); + tasks[i] = task; + } + + await Task.WhenAll(tasks); + + maxObservedParallelism.Should().Be(1); + } + + [Fact] + public async Task deferred_work_is_executed_before_new_work() + { + var executionList = new List(); + + Task PerformWork(int v) + { + executionList.Add(v); + return Task.FromResult(v); + } + + using var scheduler = new KernelScheduler(); + scheduler.RegisterDeferredOperationSource( + (v, _) => Enumerable.Repeat(v * 10, v), PerformWork); + + for (var i = 1; i <= 3; i++) + { + await scheduler.Schedule(i, PerformWork); + } + + executionList.Should().BeEquivalentSequenceTo(10, 1, 20, 20, 2, 30, 30, 30, 3); + } + + [Fact] + public void cancel_scheduler_work_prevents_any_scheduled_work_from_executing() + { + var executionList = new List(); + using var scheduler = new KernelScheduler(); + var barrier = new Barrier(2); + Task PerformWork(int v) + { + barrier.SignalAndWait(5000); + executionList.Add(v); + return Task.FromResult(v); + } + + var scheduledWork = new List + { + scheduler.Schedule(1, PerformWork), + scheduler.Schedule(2, v => + { + executionList.Add(v); + return Task.FromResult(v); + }), + scheduler.Schedule(3, v => + { + executionList.Add(v); + return Task.FromResult(v); + }) + }; + + barrier.SignalAndWait(); + scheduler.Cancel(); + Task.WhenAll(scheduledWork); + + + executionList.Should().BeEquivalentTo(1); + } + + [Fact] + public async Task cancelled_work_prevents_any_scheduled_work_from_executing() + { + var executionList = new List(); + using var scheduler = new KernelScheduler(); + var barrier = new Barrier(2); + + async Task PerformWork(int v) + { + barrier.SignalAndWait(); + await Task.Delay(3000); + executionList.Add(v); + return v; + } + + var scheduledWork = new List + { + scheduler.Schedule(1, PerformWork), + scheduler.Schedule(2, v => + { + executionList.Add(v); + return Task.FromResult(v); + }), + scheduler.Schedule(3, v => + { + executionList.Add(v); + return Task.FromResult(v); + }) + }; + + barrier.SignalAndWait(); + scheduler.Cancel(); + try + { + await Task.WhenAll(scheduledWork); + } + catch (TaskCanceledException) + { + + } + + executionList.Should().BeEmpty(); + } + + [Fact] + public void cancelling_work_throws_exception() + { + var executionList = new List(); + using var scheduler = new KernelScheduler(); + var barrier = new Barrier(2); + + async Task PerformWork(int v) + { + barrier.SignalAndWait(); + await Task.Delay(3000); + executionList.Add(v); + return v; + } + + var scheduledWork = new List + { + scheduler.Schedule(1, PerformWork), + scheduler.Schedule(2, v => + { + executionList.Add(v); + return Task.FromResult(v); + }), + scheduler.Schedule(3, v => + { + executionList.Add(v); + return Task.FromResult(v); + }) + }; + + barrier.SignalAndWait(); + scheduler.Cancel(); + var operation = new Action( () => Task.WhenAll(scheduledWork).Wait(5000)); + + operation.Should().Throw(); + } + + [Fact] + public async Task exception_in_scheduled_work_halts_execution() + { + var executionList = new List(); + using var scheduler = new KernelScheduler(); + var barrier = new Barrier(2); + + Task PerformWork(int v) + { + barrier.SignalAndWait(); + throw new InvalidOperationException("test exception"); + } + + var scheduledWork = new List + { + scheduler.Schedule(1, PerformWork), + scheduler.Schedule(2, v => + { + executionList.Add(v); + return Task.FromResult(v); + }), + scheduler.Schedule(3, v => + { + executionList.Add(v); + return Task.FromResult(v); + }) + }; + + barrier.SignalAndWait(); + try + { + await Task.WhenAll(scheduledWork); + } + catch(InvalidOperationException) + { + + } + + executionList.Should().BeEmpty(); + } + + [Fact] + public void exception_in_scheduled_work_is_propagated() + { + var executionList = new List(); + using var scheduler = new KernelScheduler(); + var barrier = new Barrier(2); + + Task PerformWork(int v) + { + barrier.SignalAndWait(); + throw new InvalidOperationException("test exception"); + } + + var scheduledWork = new List + { + scheduler.Schedule(1, PerformWork), + scheduler.Schedule(2, v => + { + executionList.Add(v); + return Task.FromResult(v); + }), + scheduler.Schedule(3, v => + { + executionList.Add(v); + return Task.FromResult(v); + }) + }; + + barrier.SignalAndWait(); + var operation = new Action(() => Task.WhenAll(scheduledWork).Wait(5000)); + + operation.Should().Throw() + .Which + .Message + .Should() + .Be("test exception"); + } + + [Fact] + public async Task awaiting_for_work_to_complete_does_not_wait_for_subsequent_work() + { + var executionList = new List(); + + using var scheduler = new KernelScheduler(); + + async Task PerformWorkAsync(int v) + { + await Task.Delay(200); + executionList.Add(v); + return v; + } + + await scheduler.Schedule(1, PerformWorkAsync); + await scheduler.Schedule(2, PerformWorkAsync); + + _ = scheduler.Schedule(3, PerformWorkAsync); + + executionList.Should().BeEquivalentSequenceTo(1, 2); + + + } + + [Fact] + public async Task deferred_work_is_done_based_on_the_scope_of_scheduled_work() + { + var executionList = new List(); + + Task PerformWork(int v) + { + executionList.Add(v); + return Task.FromResult(v); + } + + using var scheduler = new KernelScheduler(); + scheduler.RegisterDeferredOperationSource( + (v, scope) => scope == "scope2" ? Enumerable.Repeat(v * 10, v) : Enumerable.Empty(), PerformWork); + + for (var i = 1; i <= 3; i++) + { + await scheduler.Schedule(i, PerformWork, $"scope{i}"); + } + + executionList.Should().BeEquivalentSequenceTo(1, 20, 20, 2, 3); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTestBase.cs b/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTestBase.cs index 5292ab1fc1..be8eee10a4 100644 --- a/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTestBase.cs +++ b/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTestBase.cs @@ -15,17 +15,19 @@ using Microsoft.DotNet.Interactive.Tests.Utility; using Pocket; - +using Xunit; using static Pocket.Logger; using Xunit.Abstractions; +[assembly: CollectionBehavior(DisableTestParallelization = true)] + namespace Microsoft.DotNet.Interactive.Tests { [LogTestNamesToPocketLogger] public abstract class LanguageKernelTestBase : IDisposable { - private readonly CompositeDisposable _disposables = new CompositeDisposable(); + private readonly CompositeDisposable _disposables = new(); protected LanguageKernelTestBase(ITestOutputHelper output) { @@ -40,7 +42,7 @@ public void Dispose() } catch (Exception ex) { - Log.Error(exception: ex); + Log.Error(ex); } } diff --git a/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTests.cs b/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTests.cs index 1d46d0d6dd..4c5408644c 100644 --- a/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTests.cs +++ b/src/Microsoft.DotNet.Interactive.Tests/LanguageKernelTests.cs @@ -7,13 +7,16 @@ using System.Linq; using System.Reactive.Linq; using System.Threading.Tasks; + using FluentAssertions; using FluentAssertions.Execution; using FluentAssertions.Extensions; + using Microsoft.DotNet.Interactive.Commands; using Microsoft.DotNet.Interactive.Events; using Microsoft.DotNet.Interactive.Formatting; using Microsoft.DotNet.Interactive.Tests.Utility; + using Xunit; using Xunit.Abstractions; @@ -294,9 +297,9 @@ public async Task when_code_contains_compile_time_error_diagnostics_are_produced .Which .Diagnostics .Should() - .ContainSingle(diag => - diag.LinePositionSpan == diagnosticRange && - diag.Code == code && + .ContainSingle(diag => + diag.LinePositionSpan == diagnosticRange && + diag.Code == code && diag.Message == diagnosticMessage); // The FormattedValues are populated of DiagnosticsProduced event are populated @@ -374,7 +377,7 @@ public async Task powershell_produces_diagnostics_from_parse_errors() .Should() .ContainSingle(d => d.LinePositionSpan == diagnosticRange && - d.Code == "ExpectedExpression" && + d.Code == "ExpectedExpression" && d.Message == "An expression was expected after '('."); KernelEvents @@ -1038,7 +1041,7 @@ open System .Select(e => e.Value as StandardOutputValueProduced) .SelectMany(e => e.FormattedValues.Select(v => v.Value)) .Should() - .BeEquivalentTo(new [] {"1", "2"}); + .BeEquivalentTo(new[] { "1", "2" }); } @@ -1222,7 +1225,7 @@ public async Task SetVariableAsync_declares_the_specified_variable(Language lang succeeded.Should().BeTrue(); x.Should().Be(123); } - + [Theory] [InlineData(Language.CSharp)] [InlineData(Language.FSharp)] diff --git a/src/Microsoft.DotNet.Interactive.Tests/QuitCommandTests.cs b/src/Microsoft.DotNet.Interactive.Tests/QuitCommandTests.cs new file mode 100644 index 0000000000..6d2f409237 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/QuitCommandTests.cs @@ -0,0 +1,215 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading.Tasks; +using FluentAssertions; +using FluentAssertions.Execution; +using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Events; +using Microsoft.DotNet.Interactive.Tests.Utility; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.DotNet.Interactive.Tests +{ + [Collection("Do not parallelize")] + public class QuitCommandTests : LanguageKernelTestBase + { + public QuitCommandTests(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task quit_command_fails_when_not_configured() + { + var kernel = CreateKernel(); + Quit.OnQuit(null); + var quitCommand = new Quit(); + + await kernel.SendAsync(quitCommand); + + using var _ = new AssertionScope(); + + KernelEvents + .Should().ContainSingle() + .Which + .Command + .Should() + .Be(quitCommand); + + KernelEvents + .Should().ContainSingle() + .Which + .Exception + .Should() + .BeOfType(); + } + + [Fact] + public async Task quit_command_cancels_all_deferred_commands_on_composite_kernel() + { + var deferredCommandExecuted = false; + + var quitCommandExecuted = false; + + var kernel = CreateKernel(); + + var deferred = new SubmitCode("placeholder") + { + Handler = (command, context) => + { + deferredCommandExecuted = true; + return Task.CompletedTask; + } + }; + + + Quit.OnQuit(() => { quitCommandExecuted = true; }); + + var quitCommand = new Quit(); + + kernel.DeferCommand(deferred); + + await kernel.SendAsync(quitCommand); + + using var _ = new AssertionScope(); + + deferredCommandExecuted.Should().BeFalse(); + quitCommandExecuted.Should().BeTrue(); + + KernelEvents + .Should().ContainSingle() + .Which + .Command + .Should() + .Be(quitCommand); + } + + [Theory] + [InlineData(Language.CSharp)] + [InlineData(Language.FSharp)] + [InlineData(Language.PowerShell)] + public async Task quit_command_cancels_all_deferred_commands_on_subkernels(Language language) + { + var deferredCommandExecuted = false; + + var quitCommandExecuted = false; + + var kernel = CreateKernel(language); + + var deferred = new SubmitCode("placeholder") + { + Handler = (command, context) => + { + deferredCommandExecuted = true; + return Task.CompletedTask; + } + }; + + Quit.OnQuit(() => { quitCommandExecuted = true; }); + + var quitCommand = new Quit(); + + foreach (var subkernel in kernel.ChildKernels) + { + subkernel.DeferCommand(deferred); + } + + await kernel.SendAsync(quitCommand); + + using var _ = new AssertionScope(); + + deferredCommandExecuted.Should().BeFalse(); + quitCommandExecuted.Should().BeTrue(); + + KernelEvents + .Should().ContainSingle() + .Which + .Command + .Should() + .Be(quitCommand); + } + + [Theory] + [InlineData(Language.CSharp, "System.Threading.Thread.Sleep(3000);")] + [InlineData(Language.FSharp, "System.Threading.Thread.Sleep(3000)")] + [InlineData(Language.PowerShell, "Start-Sleep -Milliseconds 3000", Skip = "to address later")] + public void Quit_command_is_handled(Language language, string code) + { + var kernel = CreateKernel(language); + + var quitCommandExecuted = false; + + Quit.OnQuit(() => { quitCommandExecuted = true; }); + + var quitCommand = new Quit(); + + var submitCodeCommand = new SubmitCode(code); + + Task.WhenAll( + Task.Run(async () => + { + await Task.Delay(20); + await kernel.SendAsync(submitCodeCommand); + }), + Task.Run(async () => + { + await Task.Delay(100); + await kernel.SendAsync(quitCommand); + })) + .Wait(TimeSpan.FromSeconds(20)); + + using var _ = new AssertionScope(); + + quitCommandExecuted.Should().BeTrue(); + + KernelEvents + .Should() + .ContainSingle() + .Which + .Command + .Should() + .Be(quitCommand); + } + + + [Theory] + [InlineData(Language.CSharp, "System.Threading.Thread.Sleep(3000);")] + [InlineData(Language.FSharp, "System.Threading.Thread.Sleep(3000)")] + [InlineData(Language.PowerShell, "Start-Sleep -Milliseconds 3000", Skip = "to address later")] + public void Quit_command_causes_the_running_command_to_fail(Language language, string code) + { + var kernel = CreateKernel(language); + + Quit.OnQuit(() => { }); + + var quitCommand = new Quit(); + + var submitCodeCommand = new SubmitCode(code); + + Task.WhenAll( + Task.Run(async () => + { + await Task.Delay(20); + await kernel.SendAsync(submitCodeCommand); + }), + Task.Run(async () => + { + await Task.Delay(100); + await kernel.SendAsync(quitCommand); + })) + .Wait(TimeSpan.FromSeconds(20)); + + using var _ = new AssertionScope(); + + KernelEvents + .Should() + .ContainSingle(c => c.Command == submitCodeCommand) + .Which + .Exception + .Should() + .BeOfType(); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.Command_contract_has_not_been_broken.approved.Cancel.json b/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.Command_contract_has_not_been_broken.approved.Cancel.json new file mode 100644 index 0000000000..2dc8e40766 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.Command_contract_has_not_been_broken.approved.Cancel.json @@ -0,0 +1 @@ +{"token":"the-token","commandType":"Cancel","command":{"targetKernelName":"csharp"}} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.Command_contract_has_not_been_broken.approved.Quit.json b/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.Command_contract_has_not_been_broken.approved.Quit.json new file mode 100644 index 0000000000..72391cb97a --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.Command_contract_has_not_been_broken.approved.Quit.json @@ -0,0 +1 @@ +{"token":"the-token","commandType":"Quit","command":{"targetKernelName":"csharp"}} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.cs b/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.cs index d2bd77327e..afb0a17b27 100644 --- a/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.cs +++ b/src/Microsoft.DotNet.Interactive.Tests/Server/SerializationTests.cs @@ -196,6 +196,10 @@ IEnumerable commands() yield return new UpdateDisplayedValue( new FormattedValue("text/html", "hi!"), "the-value-id"); + + yield return new Quit("csharp"); + + yield return new Cancel("csharp"); } } diff --git a/src/Microsoft.DotNet.Interactive.Tests/Utility/CancellableCommand.cs b/src/Microsoft.DotNet.Interactive.Tests/Utility/CancellableCommand.cs new file mode 100644 index 0000000000..1bf3fae3a0 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/Utility/CancellableCommand.cs @@ -0,0 +1,33 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading.Tasks; +using Microsoft.DotNet.Interactive.Commands; + +namespace Microsoft.DotNet.Interactive.Tests.Utility +{ + public class CancellableCommand : KernelCommand + { + public CancellableCommand(string targetKernelName = null, KernelCommand parent = null) : base(targetKernelName, parent) + { + } + + public override Task InvokeAsync(KernelInvocationContext context) + { + HasRun = true; + + while (!context.CancellationToken.IsCancellationRequested) + { + + } + + HasBeenCancelled = true; + + return Task.CompletedTask; + } + + public bool HasBeenCancelled { get; private set; } + + public bool HasRun { get; private set; } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive.Tests/Utility/TestCommand.cs b/src/Microsoft.DotNet.Interactive.Tests/Utility/TestCommand.cs new file mode 100644 index 0000000000..21e9a6b907 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive.Tests/Utility/TestCommand.cs @@ -0,0 +1,24 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Threading.Tasks; +using Microsoft.DotNet.Interactive.Commands; + +namespace Microsoft.DotNet.Interactive.Tests.Utility +{ + public class TestCommand : KernelCommand + { + private readonly Func _handler; + + public TestCommand(Func handler, string targetKernelName = null, KernelCommand parent = null) : base(targetKernelName, parent) + { + _handler = handler ?? throw new ArgumentNullException(nameof(handler)); + } + + public override Task InvokeAsync(KernelInvocationContext context) + { + return _handler(this, context); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/Commands/Cancel.cs b/src/Microsoft.DotNet.Interactive/Commands/Cancel.cs new file mode 100644 index 0000000000..7f582010d1 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive/Commands/Cancel.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Threading.Tasks; + +namespace Microsoft.DotNet.Interactive.Commands +{ + public class Cancel : KernelCommand + { + public Cancel(string targetKernelName = null): base(targetKernelName) + { + + } + + public override Task InvokeAsync(KernelInvocationContext context) + { + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/Commands/Quit.cs b/src/Microsoft.DotNet.Interactive/Commands/Quit.cs new file mode 100644 index 0000000000..4ac1b73a9c --- /dev/null +++ b/src/Microsoft.DotNet.Interactive/Commands/Quit.cs @@ -0,0 +1,38 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + + +using System; +using System.Text.Json.Serialization; +using System.Threading.Tasks; + +namespace Microsoft.DotNet.Interactive.Commands +{ + public class Quit : Cancel + { + private static Action _onQuit; + private static readonly Action DefaultOnQuit = () => throw new InvalidOperationException("Quit command is not configured"); + + static Quit() + { + _onQuit = DefaultOnQuit; + } + + public static void OnQuit(Action onQuit) + { + _onQuit = onQuit ?? DefaultOnQuit; + } + + [JsonConstructor] + public Quit(string targetKernelName = null): base(targetKernelName) + { + } + + public override Task InvokeAsync(KernelInvocationContext context) + { + _onQuit(); + context?.Complete(context.Command); + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/CompositeKernel.cs b/src/Microsoft.DotNet.Interactive/CompositeKernel.cs index f90d3115eb..32c72eb67d 100644 --- a/src/Microsoft.DotNet.Interactive/CompositeKernel.cs +++ b/src/Microsoft.DotNet.Interactive/CompositeKernel.cs @@ -73,6 +73,7 @@ public void Add(Kernel kernel, IReadOnlyCollection aliases = null) } kernel.ParentKernel = this; + kernel.SetScheduler(Scheduler); kernel.AddMiddleware(LoadExtensions); AddChooseKernelDirective(kernel, aliases); @@ -202,30 +203,6 @@ private Kernel GetHandlingKernel( return kernel ?? this; } - internal override async Task HandleAsync( - KernelCommand command, - KernelInvocationContext context) - { - var kernel = context.HandlingKernel; - - if (kernel is null) - { - throw new NoSuitableKernelException(command); - } - - await kernel.RunDeferredCommandsAsync(); - - if (kernel != this) - { - // route to a subkernel - await kernel.Pipeline.SendAsync(command, context); - } - else - { - await base.HandleAsync(command, context); - } - } - private protected override IEnumerable GetDirectiveParsersForCompletion( DirectiveNode directiveNode, int requestPosition) diff --git a/src/Microsoft.DotNet.Interactive/Kernel.cs b/src/Microsoft.DotNet.Interactive/Kernel.cs index cc9ab76838..3bec15a9a6 100644 --- a/src/Microsoft.DotNet.Interactive/Kernel.cs +++ b/src/Microsoft.DotNet.Interactive/Kernel.cs @@ -7,34 +7,31 @@ using System.CommandLine; using System.CommandLine.Parsing; using System.Linq; -using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; + using Microsoft.CodeAnalysis.Text; using Microsoft.DotNet.Interactive.Commands; using Microsoft.DotNet.Interactive.Events; using Microsoft.DotNet.Interactive.Parsing; using Microsoft.DotNet.Interactive.Server; -using Microsoft.DotNet.Interactive.Utility; namespace Microsoft.DotNet.Interactive { public abstract partial class Kernel : IDisposable { - private readonly Subject _kernelEvents = new Subject(); + private readonly Subject _kernelEvents = new(); private readonly CompositeDisposable _disposables; - private readonly ConcurrentQueue _deferredCommands = new ConcurrentQueue(); - - private readonly ConcurrentQueue _commandQueue = - new ConcurrentQueue(); - private readonly Dictionary _dynamicHandlers = - new Dictionary(); + private readonly ConcurrentQueue _deferredCommands = new(); + private readonly Dictionary _dynamicHandlers = new(); private FrontendEnvironment _frontendEnvironment; private ChooseKernelDirective _chooseKernelDirective; + private KernelScheduler _commandScheduler = null; + protected Kernel(string name) { if (string.IsNullOrWhiteSpace(name)) @@ -50,6 +47,7 @@ protected Kernel(string name) Pipeline = new KernelCommandPipeline(this); + AddSetKernelMiddleware(); AddDirectiveMiddlewareAndCommonCommandHandlers(); @@ -59,6 +57,18 @@ protected Kernel(string name) )); } + internal KernelScheduler Scheduler + { + get + { + if(_commandScheduler is null) + { + SetScheduler(new ()); + } + return _commandScheduler; + } + } + internal KernelCommandPipeline Pipeline { get; } public CompositeKernel ParentKernel { get; internal set; } @@ -77,6 +87,12 @@ public void DeferCommand(KernelCommand command) } command.SetToken($"deferredCommand::{Guid.NewGuid():N}"); + + if(string.IsNullOrWhiteSpace(command.TargetKernelName)) + { + command.TargetKernelName = Name; + } + _deferredCommands.Enqueue(command); } @@ -141,15 +157,9 @@ private IReadOnlyList PreprocessCommands(KernelCommand command) { return command switch { - SubmitCode submitCode - when submitCode.LanguageNode is null => SubmissionParser.SplitSubmission(submitCode), - - RequestDiagnostics requestDiagnostics - when requestDiagnostics.LanguageNode is null => SubmissionParser.SplitSubmission(requestDiagnostics), - - LanguageServiceCommand languageServiceCommand - when languageServiceCommand.LanguageNode is null => PreprocessLanguageServiceCommand(languageServiceCommand), - + SubmitCode { LanguageNode: null } submitCode => SubmissionParser.SplitSubmission(submitCode), + RequestDiagnostics { LanguageNode: null } requestDiagnostics => SubmissionParser.SplitSubmission(requestDiagnostics), + LanguageServiceCommand { LanguageNode: null } languageServiceCommand => PreprocessLanguageServiceCommand(languageServiceCommand), _ => new[] { command } }; } @@ -231,62 +241,6 @@ public void RegisterCommandType() KernelCommandEnvelope.RegisterCommandTypeReplacingIfNecessary(); } - private class KernelOperation - { - public KernelOperation( - KernelCommand command, - TaskCompletionSource taskCompletionSource) - { - Command = command; - TaskCompletionSource = taskCompletionSource; - - AsyncContext.TryEstablish(out var id); - AsyncContextId = id; - } - - public KernelCommand Command { get; } - - public TaskCompletionSource TaskCompletionSource { get; } - - public int AsyncContextId { get; } - } - - private async Task ExecuteCommand(KernelOperation operation) - { - var context = KernelInvocationContext.Establish(operation.Command); - - // only subscribe for the root command - using var _ = - context.Command == operation.Command - ? context.KernelEvents.Subscribe(PublishEvent) - : Disposable.Empty; - - try - { - await Pipeline.SendAsync(operation.Command, context); - - if (operation.Command == context.Command) - { - await context.DisposeAsync(); - } - else - { - context.Complete(operation.Command); - } - - operation.TaskCompletionSource.SetResult(context.Result); - } - catch (Exception exception) - { - if (!context.IsComplete) - { - context.Fail(exception); - } - - operation.TaskCompletionSource.SetException(exception); - } - } - internal virtual async Task HandleAsync( KernelCommand command, KernelInvocationContext context) @@ -298,75 +252,21 @@ internal virtual async Task HandleAsync( public Task SendAsync( KernelCommand command, CancellationToken cancellationToken) - { - return SendAsync(command, cancellationToken, null); - } - - internal Task SendAsync( - KernelCommand command, - CancellationToken cancellationToken, - Action onDone) { if (command == null) { throw new ArgumentNullException(nameof(command)); } - UndeferCommands(); - - var tcs = new TaskCompletionSource(); - - var operation = new KernelOperation(command, tcs); - - _commandQueue.Enqueue(operation); - - ProcessCommandQueue(_commandQueue, cancellationToken, onDone); - - return tcs.Task; + return Scheduler.Schedule(command, OnExecuteAsync, Name); + } - private void ProcessCommandQueue( - ConcurrentQueue commandQueue, - CancellationToken cancellationToken, - Action onDone) + protected internal void CancelCommands() { - if (commandQueue.TryDequeue(out var currentOperation)) - { - Task.Run(async () => - { - AsyncContext.Id = currentOperation.AsyncContextId; - - await ExecuteCommand(currentOperation); - - ProcessCommandQueue(commandQueue, cancellationToken, onDone); - }, cancellationToken).ConfigureAwait(false); - } - else - { - onDone?.Invoke(); - } + Scheduler.Cancel(); } - - internal Task RunDeferredCommandsAsync() - { - var tcs = new TaskCompletionSource(); - UndeferCommands(); - ProcessCommandQueue( - _commandQueue, - CancellationToken.None, - () => tcs.SetResult(Unit.Default)); - return tcs.Task; - } - - private void UndeferCommands() - { - while (_deferredCommands.TryDequeue(out var initCommand)) - { - _commandQueue.Enqueue(new KernelOperation(initCommand, new TaskCompletionSource())); - } - } - - protected void PublishEvent(KernelEvent kernelEvent) + protected internal void PublishEvent(KernelEvent kernelEvent) { if (kernelEvent == null) { @@ -422,7 +322,7 @@ private Task HandleRequestCompletionsAsync( return Task.CompletedTask; } - private IReadOnlyList GetDirectiveCompletionItems( + private IEnumerable GetDirectiveCompletionItems( DirectiveNode directiveNode, int requestPosition) { @@ -534,9 +434,65 @@ protected virtual void SetHandlingKernel( protected virtual ChooseKernelDirective CreateChooseKernelDirective() { - return new ChooseKernelDirective(this); + return new(this); } internal ChooseKernelDirective ChooseKernelDirective => _chooseKernelDirective ??= CreateChooseKernelDirective(); + + internal void SetScheduler(KernelScheduler scheduler) + { + + _commandScheduler = scheduler; + + IEnumerable GetDeferredOperations(KernelCommand command, string scope) + { + if (scope != Name) + { + yield break; + } + + while (_deferredCommands.TryDequeue(out var kernelCommand)) + { + yield return kernelCommand; + } + } + + Scheduler.RegisterDeferredOperationSource(GetDeferredOperations, OnExecuteAsync); + } + + internal async Task OnExecuteAsync(KernelCommand command) + { + var context = KernelInvocationContext.Establish(command); + + // only subscribe for the root command + using var _ = context.Command == command + ? context.KernelEvents.Subscribe(PublishEvent) + : Disposable.Empty; + + try + { + await Pipeline.SendAsync(command, context); + + if (command == context.Command) + { + await context.DisposeAsync(); + } + else + { + context.Complete(command); + } + + return context.Result; + } + catch (Exception exception) + { + if (!context.IsComplete) + { + context.Fail(exception); + } + + throw; + } + } } } \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs b/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs new file mode 100644 index 0000000000..67d58ddb55 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive/KernelCommandScheduler.cs @@ -0,0 +1,230 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Reactive; +using System.Reactive.Disposables; +using System.Threading; +using System.Threading.Tasks; + +using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Utility; + +namespace Microsoft.DotNet.Interactive +{ + public class KernelCommandScheduler + { + private readonly ConcurrentQueue<(KernelCommand command, Kernel kernel)> _deferredCommands = new(); + + private readonly ConcurrentQueue _commandQueue = new(); + + + + public Task Schedule(KernelCommand command, Kernel kernel, CancellationToken cancellationToken) + { + + switch (command) + { + case Cancel _: + CancelCommands(); + break; + default: + UndeferCommandsFor(kernel); + break; + } + + var kernelCommandResultSource = new TaskCompletionSource(); + + var operation = new KernelOperation(command, kernelCommandResultSource, kernel); + + _commandQueue.Enqueue(operation); + + ProcessCommandQueue(_commandQueue, cancellationToken); + + return kernelCommandResultSource.Task; + } + + private void ProcessCommandQueue( + ConcurrentQueue commandQueue, + CancellationToken cancellationToken) + { + if (commandQueue.TryDequeue(out var currentOperation)) + { + Task.Run(async () => + { + AsyncContext.Id = currentOperation.AsyncContextId; + + await ExecuteCommand(currentOperation); + + ProcessCommandQueue(commandQueue, cancellationToken); + }, cancellationToken).ConfigureAwait(false); + } + } + + private async Task ExecuteCommand(KernelOperation operation) + { + var context = KernelInvocationContext.Establish(operation.Command); + + // only subscribe for the root command + using var _ = + context.Command == operation.Command + ? context.KernelEvents.Subscribe(operation.Kernel.PublishEvent) + : Disposable.Empty; + + try + { + await operation.Kernel.Pipeline.SendAsync(operation.Command, context); + + if (operation.Command == context.Command) + { + await context.DisposeAsync(); + } + else + { + context.Complete(operation.Command); + } + + operation.TaskCompletionSource.SetResult(context.Result); + } + catch (Exception exception) + { + if (!context.IsComplete) + { + context.Fail(exception); + } + + operation.TaskCompletionSource.SetException(exception); + } + } + private class KernelOperation + { + public KernelOperation( + KernelCommand command, + TaskCompletionSource taskCompletionSource, + Kernel kernel) + { + Command = command; + TaskCompletionSource = taskCompletionSource; + Kernel = kernel; + + AsyncContext.TryEstablish(out var id); + AsyncContextId = id; + } + + public KernelCommand Command { get; } + + public TaskCompletionSource TaskCompletionSource { get; } + public Kernel Kernel { get; } + + public int AsyncContextId { get; } + } + + public void DeferCommand(KernelCommand command, Kernel kernel) + { + _deferredCommands.Enqueue((command, kernel)); + } + + internal Task RunDeferredCommandsAsync(Kernel kernel) + { + var tcs = new TaskCompletionSource(); + UndeferCommandsFor(kernel); + ProcessCommandQueue( + _commandQueue, + CancellationToken.None); + return tcs.Task; + } + + private void UndeferCommands() + { + while (_deferredCommands.TryDequeue(out var initCommand)) + { + _commandQueue.Enqueue( + new KernelOperation( + initCommand.command, + new TaskCompletionSource(), + initCommand.kernel)); + } + } + + private void UndeferCommandsFor(Kernel kernel) + { + var commandsToKeepInDeferredList = new ConcurrentQueue<(KernelCommand command, Kernel kernel)>(); + while (_deferredCommands.TryDequeue(out var deferredCommand)) + { + if (IsInPath(kernel, deferredCommand.kernel)) + { + _commandQueue.Enqueue( + new KernelOperation( + deferredCommand.command, + new TaskCompletionSource(), + deferredCommand.kernel)); + } + else + { + commandsToKeepInDeferredList.Enqueue(deferredCommand); + } + } + + while (commandsToKeepInDeferredList.TryDequeue(out var deferredCommand)) + { + _deferredCommands.Enqueue(deferredCommand); + } + + + bool IsInPath(Kernel toTest, Kernel deferredCommandKernel) + { + while (toTest is not null) + { + if (toTest == deferredCommandKernel) + { + return true; + } + + toTest = toTest.ParentKernel; + } + return false; + } + } + + private static bool CanCancel(KernelCommand command) + { + return command switch + { + Quit _ => false, + Cancel _ => false, + _ => true + }; + } + + public void CancelCommands() + { + foreach (var kernelInvocationContext in KernelInvocationContext.ActiveContexts.Where(c => !c.IsComplete && CanCancel(c.Command))) + { + kernelInvocationContext.Cancel(); + } + + using var disposables = new CompositeDisposable(); + var inFlightOperations = _commandQueue.Where(operation => CanCancel(operation.Command)).ToList(); + foreach (var inFlightOperation in inFlightOperations) + { + KernelInvocationContext currentContext = null; + + + if (inFlightOperation is not null + ) + { + currentContext = KernelInvocationContext.Establish(inFlightOperation.Command); + disposables.Add(currentContext.KernelEvents.Subscribe(inFlightOperation.Kernel.PublishEvent)); + inFlightOperation.TaskCompletionSource.SetResult(currentContext.Result); + } + + currentContext?.Cancel(); + } + + _deferredCommands.Clear(); + _commandQueue.Clear(); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/KernelExtensions.cs b/src/Microsoft.DotNet.Interactive/KernelExtensions.cs index f3bd306218..d5d9859ad9 100644 --- a/src/Microsoft.DotNet.Interactive/KernelExtensions.cs +++ b/src/Microsoft.DotNet.Interactive/KernelExtensions.cs @@ -22,6 +22,21 @@ namespace Microsoft.DotNet.Interactive { public static class KernelExtensions { + public static T UseQuitCommand(this T kernel, IDisposable disposeOnQuit, CancellationToken cancellationToken) where T : Kernel + { + Quit.OnQuit(() => + { + disposeOnQuit?.Dispose(); + Environment.Exit(0); + }); + + cancellationToken.Register(async () => + { + await kernel.SendAsync(new Quit()); + }); + return kernel; + } + public static Kernel FindKernel(this Kernel kernel, string name) { var root = kernel diff --git a/src/Microsoft.DotNet.Interactive/KernelInvocationContext.cs b/src/Microsoft.DotNet.Interactive/KernelInvocationContext.cs index f160e6b6d9..36ced63c24 100644 --- a/src/Microsoft.DotNet.Interactive/KernelInvocationContext.cs +++ b/src/Microsoft.DotNet.Interactive/KernelInvocationContext.cs @@ -2,7 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Threading; @@ -16,15 +18,17 @@ namespace Microsoft.DotNet.Interactive { public class KernelInvocationContext : IAsyncDisposable { - private static readonly AsyncLocal _current = new AsyncLocal(); + internal static ConcurrentBag ActiveContexts { get; private set; } = new(); + + private static readonly AsyncLocal _current = new(); - private readonly ReplaySubject _events = new ReplaySubject(); + private readonly ReplaySubject _events = new(); - private readonly HashSet _childCommands = new HashSet(); + private readonly HashSet _childCommands = new(); - private readonly CompositeDisposable _disposables = new CompositeDisposable(); + private readonly CompositeDisposable _disposables = new(); - private readonly List> _onCompleteActions = new List>(); + private readonly List> _onCompleteActions = new(); private readonly CancellationTokenSource _cancellationTokenSource; @@ -35,6 +39,7 @@ private KernelInvocationContext(KernelCommand command) CommandToSignalCompletion = command; Result = new KernelCommandResult(_events); + _disposables.Add(_cancellationTokenSource); _disposables.Add(ConsoleOutput.Subscribe(c => { return new CompositeDisposable @@ -70,14 +75,32 @@ public void Complete(KernelCommand command) } } + public void Cancel() + { + if (!IsComplete) + { + _cancellationTokenSource.Cancel(false); + Fail(new OperationCanceledException($"Command :{Command} cancelled.")); + } + } + public void Fail( Exception exception = null, string message = null) { - Publish(new CommandFailed(exception, Command, message)); + if (!IsComplete) + { + + Publish(new CommandFailed(exception, Command, message)); + _events.OnCompleted(); - _events.OnCompleted(); - IsComplete = true; + if (_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource.Cancel(false); + } + + IsComplete = true; + } } public void OnComplete(Func onComplete) @@ -111,6 +134,8 @@ public static KernelInvocationContext Establish(KernelCommand command) if (_current.Value == null || _current.Value.IsComplete) { var context = new KernelInvocationContext(command); + + ActiveContexts.Add(context); _current.Value = context; } @@ -138,6 +163,8 @@ public ValueTask DisposeAsync() { if (_current.Value is { } active) { + ActiveContexts = + new ConcurrentBag(ActiveContexts.Except(new[] {_current.Value})); _current.Value = null; if (_onCompleteActions.Count > 0) diff --git a/src/Microsoft.DotNet.Interactive/KernelScheduler.cs b/src/Microsoft.DotNet.Interactive/KernelScheduler.cs new file mode 100644 index 0000000000..31d241b976 --- /dev/null +++ b/src/Microsoft.DotNet.Interactive/KernelScheduler.cs @@ -0,0 +1,271 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +using Pocket; + +namespace Microsoft.DotNet.Interactive +{ + + public class KernelScheduler : IDisposable + { + private CancellationTokenSource _cancellationTokenSource = new(); + private static readonly Logger Logger = new Logger("Scheduler"); + + private List _scheduledOperations = new(); + private List _deferredOperationRegistrations = new(); + + private readonly object _operationsLock = new(); + + public Task Schedule(T value, OnExecuteDelegate onExecuteAsync, string scope = "default") + { + var operation = new ScheduledOperation(value, onExecuteAsync, scope); + + + lock (_operationsLock) + { + _cancellationTokenSource.Token.Register(() => + { + if (!operation.CompletionSource.Task.IsCompleted) + { + operation.CompletionSource.SetCanceled(); + } + }); + + _scheduledOperations.Add(operation); + if (_scheduledOperations.Count == 1) + { + var previousSynchronizationContext = SynchronizationContext.Current; + var synchronizationContext = new ClockwiseSynchronizationContext(); + + SynchronizationContext.SetSynchronizationContext(synchronizationContext); + Task.Run(async () => + { + while (_scheduledOperations.Count > 0) + { + await ProcessScheduledOperations(_cancellationTokenSource.Token); + } + }).ContinueWith(_ => + { + SynchronizationContext.SetSynchronizationContext(previousSynchronizationContext); + }); + } + } + + return operation.Task; + } + + private async Task ProcessScheduledOperations(CancellationToken cancellationToken) + { + using var _ = Logger.OnEnterAndExit(); + ScheduledOperation operation; + lock (_operationsLock) + { + if (_scheduledOperations.Count > 0) + { + operation = _scheduledOperations[0]; + _scheduledOperations.RemoveAt(0); + } + else + { + return; + } + } + + try + { + if (operation is not null) + { + // get all deferred operations and pump in + foreach (var deferredOperationRegistration in _deferredOperationRegistrations) + { + foreach (var deferred in deferredOperationRegistration.GetDeferredOperations(operation.Value, + operation.Scope)) + { + var deferredOperation = new ScheduledOperation(deferred, + deferredOperationRegistration.OnExecute, operation.Scope); + + cancellationToken.Register(() => + { + if (!deferredOperation.CompletionSource.Task.IsCompleted) + { + deferredOperation.CompletionSource.SetCanceled(); + } + }); + + await DoWork(deferredOperation); + } + } + + await DoWork(operation); + } + } + catch + { + Cancel(); + throw; + } + + async Task DoWork(ScheduledOperation scheduleOperation) + { + using var _ = Logger.OnEnterAndExit("DoWork"); + if (!scheduleOperation.CompletionSource.Task.IsCanceled) + { + try + { + var operationResult = await scheduleOperation.OnExecuteAsync(scheduleOperation.Value); + scheduleOperation.CompletionSource.SetResult(operationResult); + } + catch (Exception e) + { + scheduleOperation.CompletionSource.SetException(e); + throw; + } + } + } + } + + public void RegisterDeferredOperationSource(GetDeferredOperationsDelegate getDeferredOperations, OnExecuteDelegate onExecuteAsync) + { + _deferredOperationRegistrations.Add(new DeferredOperation(onExecuteAsync, getDeferredOperations)); + } + + public void Cancel() + { + lock (_operationsLock) + { + + + if (SynchronizationContext.Current is ClockwiseSynchronizationContext synchronizationContext) + { + synchronizationContext.Cancel(); + } + + _scheduledOperations = new List(); + _deferredOperationRegistrations = new List(); + + _cancellationTokenSource.Cancel(); + _cancellationTokenSource = new CancellationTokenSource(); + } + } + + public void Dispose() + { + Cancel(); + } + + public delegate Task OnExecuteDelegate(T value); + + public delegate IEnumerable GetDeferredOperationsDelegate(T state, string scope); + + private class ScheduledOperation + { + public T Value { get; } + public OnExecuteDelegate OnExecuteAsync { get; } + public string Scope { get; } + public Task Task => CompletionSource.Task; + + + public ScheduledOperation(T value, OnExecuteDelegate onExecuteAsync, string scope) + { + Value = value; + CompletionSource = new TaskCompletionSource(); + OnExecuteAsync = onExecuteAsync; + Scope = scope; + } + + public TaskCompletionSource CompletionSource { get; } + } + + private class DeferredOperation + { + public GetDeferredOperationsDelegate GetDeferredOperations { get; } + public OnExecuteDelegate OnExecute { get; } + public DeferredOperation(OnExecuteDelegate onExecute, GetDeferredOperationsDelegate getDeferredOperations) + { + OnExecute = onExecute; + GetDeferredOperations = getDeferredOperations; + } + } + + } + + internal sealed class ClockwiseSynchronizationContext : SynchronizationContext, IDisposable + { + private static readonly Logger Logger = new Logger("SynchronizationContext"); + + private readonly BlockingCollection _queue = new(); + + public ClockwiseSynchronizationContext() + { + var thread = new Thread(Run); + + thread.Start(); + } + + public override void Post(SendOrPostCallback callback, object state) + { + if (callback == null) + { + throw new ArgumentNullException(nameof(callback)); + } + + var workItem = new WorkItem(callback, state); + + try + { + _queue.Add(workItem); + } + catch (InvalidOperationException) + { + throw new ObjectDisposedException($"The {nameof(ClockwiseSynchronizationContext)} has been disposed."); + } + } + + public override void Send(SendOrPostCallback callback, object state) => + throw new NotSupportedException($"Synchronous Send is not supported by {nameof(ClockwiseSynchronizationContext)}."); + + public void Cancel() + { + Cancelled = true; + } + + public bool Cancelled { get; private set; } + + private void Run() + { + SetSynchronizationContext(this); + + foreach (var workItem in _queue.GetConsumingEnumerable()) + { + if (!Cancelled) + { + workItem.Run(); + } + + } + } + + public void Dispose() => _queue.CompleteAdding(); + + private struct WorkItem + { + public WorkItem(SendOrPostCallback callback, object state) + { + Callback = callback; + State = state; + } + + private readonly SendOrPostCallback Callback; + + private readonly object State; + + public void Run() => Callback(State); + } + } +} \ No newline at end of file diff --git a/src/Microsoft.DotNet.Interactive/KernelSupportsNugetExtensions.cs b/src/Microsoft.DotNet.Interactive/KernelSupportsNugetExtensions.cs index 8a6559575c..b16e85eee7 100644 --- a/src/Microsoft.DotNet.Interactive/KernelSupportsNugetExtensions.cs +++ b/src/Microsoft.DotNet.Interactive/KernelSupportsNugetExtensions.cs @@ -263,8 +263,6 @@ internal static KernelCommandInvocation DoNugetRestore() }; await invocationContext.QueueAction(restore); - var kernel = invocationContext.HandlingKernel; - await kernel.RunDeferredCommandsAsync(); }; static string InstallingPackageMessage(PackageReference package) diff --git a/src/Microsoft.DotNet.Interactive/Server/KernelCommandEnvelope.cs b/src/Microsoft.DotNet.Interactive/Server/KernelCommandEnvelope.cs index c94ba2abd0..74969fa4e2 100644 --- a/src/Microsoft.DotNet.Interactive/Server/KernelCommandEnvelope.cs +++ b/src/Microsoft.DotNet.Interactive/Server/KernelCommandEnvelope.cs @@ -74,7 +74,9 @@ public static void ResetToDefaults() [nameof(RequestSignatureHelp)] = typeof(KernelCommandEnvelope), [nameof(SerializeNotebook)] = typeof(KernelCommandEnvelope), [nameof(SubmitCode)] = typeof(KernelCommandEnvelope), - [nameof(UpdateDisplayedValue)] = typeof(KernelCommandEnvelope) + [nameof(UpdateDisplayedValue)] = typeof(KernelCommandEnvelope), + [nameof(Quit)] = typeof(KernelCommandEnvelope), + [nameof(Cancel)] = typeof(KernelCommandEnvelope) }; _commandTypesByCommandTypeName = new ConcurrentDictionary(_envelopeTypesByCommandTypeName diff --git a/src/dotnet-interactive-vscode/src/interfaces/src/contracts.ts b/src/dotnet-interactive-vscode/src/interfaces/src/contracts.ts index c2e74ba698..edc7175c76 100644 --- a/src/dotnet-interactive-vscode/src/interfaces/src/contracts.ts +++ b/src/dotnet-interactive-vscode/src/interfaces/src/contracts.ts @@ -6,10 +6,12 @@ // --------------------------------------------- Kernel Commands export const AddPackageType = "AddPackage"; +export const CancelType = "Cancel"; export const ChangeWorkingDirectoryType = "ChangeWorkingDirectory"; export const DisplayErrorType = "DisplayError"; export const DisplayValueType = "DisplayValue"; export const ParseNotebookType = "ParseNotebook"; +export const QuitType = "Quit"; export const RequestCompletionsType = "RequestCompletions"; export const RequestDiagnosticsType = "RequestDiagnostics"; export const RequestHoverTextType = "RequestHoverText"; @@ -20,10 +22,12 @@ export const UpdateDisplayedValueType = "UpdateDisplayedValue"; export type KernelCommandType = typeof AddPackageType + | typeof CancelType | typeof ChangeWorkingDirectoryType | typeof DisplayErrorType | typeof DisplayValueType | typeof ParseNotebookType + | typeof QuitType | typeof RequestCompletionsType | typeof RequestDiagnosticsType | typeof RequestHoverTextType @@ -40,6 +44,9 @@ export interface KernelCommand { targetKernelName?: string; } +export interface Cancel extends KernelCommand { +} + export interface ChangeWorkingDirectory extends KernelCommand { workingDirectory: string; } @@ -58,6 +65,9 @@ export interface ParseNotebook extends KernelCommand { rawData: Uint8Array; } +export interface Quit extends Cancel { +} + export interface RequestCompletions extends LanguageServiceCommand { } diff --git a/src/dotnet-interactive.Tests/CommandLine/CommandLineParserTests.cs b/src/dotnet-interactive.Tests/CommandLine/CommandLineParserTests.cs index af5e91aa2d..3b189b7274 100644 --- a/src/dotnet-interactive.Tests/CommandLine/CommandLineParserTests.cs +++ b/src/dotnet-interactive.Tests/CommandLine/CommandLineParserTests.cs @@ -8,16 +8,17 @@ using System.IO; using System.Linq; using System.Threading.Tasks; + using FluentAssertions; using FluentAssertions.Execution; + using Microsoft.DotNet.Interactive.App.CommandLine; -using Microsoft.DotNet.Interactive.App.Commands; using Microsoft.DotNet.Interactive.Http; using Microsoft.DotNet.Interactive.Server; using Microsoft.DotNet.Interactive.Telemetry; using Microsoft.DotNet.Interactive.Tests.Utility; -using Microsoft.DotNet.Interactive.Utility; using Microsoft.Extensions.DependencyInjection; + using Xunit; using Xunit.Abstractions; @@ -86,7 +87,7 @@ public async Task It_parses_log_output_directory() var logPath = new DirectoryInfo(Path.GetTempPath()); await _parser.InvokeAsync($"jupyter --log-path {logPath} {_connectionFile}", _console); - + _startOptions .LogPath .FullName @@ -132,7 +133,7 @@ public void jupyter_command_help_shows_default_port_range() public void jupyter_install_command_parses_path_option() { Directory.CreateDirectory(_kernelSpecInstallPath.FullName); - + _parser.InvokeAsync($"jupyter install --path {_kernelSpecInstallPath}"); var installedKernels = _kernelSpecInstallPath.GetDirectories(); @@ -204,7 +205,7 @@ public void http_command__does_not_parse_http_port_range_option() public async Task jupyter_command_returns_error_if_connection_file_path_is_not_passed() { var testConsole = new TestConsole(); - + await _parser.InvokeAsync("jupyter", testConsole); testConsole.Error.ToString().Should().Contain("Required argument missing for command: jupyter"); @@ -224,7 +225,7 @@ public void jupyter_command_does_not_parse_http_port_option() [Fact] public async Task jupyter_command_enables_http_api_when_http_port_range_is_specified() { - await _parser.InvokeAsync($"jupyter --http-port-range 3000-5000 {_connectionFile}"); + await _parser.InvokeAsync($"jupyter --http-port-range 3000-5000 {_connectionFile}"); _startOptions.EnableHttpApi.Should().BeTrue(); } @@ -248,7 +249,7 @@ public void jupyter_command_parses_connection_file_path() [Fact] public async Task jupyter_command_enables_http_api_by_default() { - await _parser.InvokeAsync($"jupyter {_connectionFile}"); + await _parser.InvokeAsync($"jupyter {_connectionFile}"); _startOptions.EnableHttpApi.Should().BeTrue(); } @@ -369,7 +370,7 @@ public async Task stdio_command_parses_http_port_range_options() public async Task stdio_command_requires_api_bootstrapping_when_http_is_enabled() { await _parser.InvokeAsync("stdio --http-port-range 3000-4000"); - + var kernel = GetKernel(); kernel.FrontendEnvironment.As() @@ -418,16 +419,5 @@ public void stdio_command_honors_default_kernel_option() options.DefaultKernel.Should().Be("bsharp"); } - - [Fact] - public async Task stdio_command_extends_the_protocol_with_quit_command() - { - await _parser.InvokeAsync("stdio"); - - var envelope = KernelCommandEnvelope.Deserialize(@"{ ""commandType"": ""Quit"", ""command"" : { } }"); - - envelope.Command.Should() - .BeOfType(); - } } } diff --git a/src/dotnet-interactive.Tests/dotnet-interactive.Tests.v3.ncrunchproject b/src/dotnet-interactive.Tests/dotnet-interactive.Tests.v3.ncrunchproject index 5ef144da36..aa6b1f8b1a 100644 --- a/src/dotnet-interactive.Tests/dotnet-interactive.Tests.v3.ncrunchproject +++ b/src/dotnet-interactive.Tests/dotnet-interactive.Tests.v3.ncrunchproject @@ -1,8 +1,8 @@  - ..\dotnet-interactive-vscode\src\contracts.ts ..\Microsoft.DotNet.Interactive.Js\src\dotnet-interactive\contracts.ts + ..\dotnet-interactive-vscode\src\interfaces\src\contracts.ts diff --git a/src/dotnet-interactive/Commands/Quit.cs b/src/dotnet-interactive/Commands/Quit.cs deleted file mode 100644 index 7862881aba..0000000000 --- a/src/dotnet-interactive/Commands/Quit.cs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) .NET Foundation and contributors. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - - -using System; -using System.Threading.Tasks; -using Microsoft.DotNet.Interactive.Commands; - -namespace Microsoft.DotNet.Interactive.App.Commands -{ - public class Quit : KernelCommand - { - internal static IDisposable DisposeOnQuit { get; set; } - public Quit(string targetKernelName = null): base(targetKernelName) - { - Handler = (command, context) => - { - context.Complete(context.Command); - DisposeOnQuit?.Dispose(); - Environment.Exit(0); - return Task.CompletedTask; - }; - } - } -} \ No newline at end of file diff --git a/src/dotnet-interactive/KernelExtensions.cs b/src/dotnet-interactive/KernelExtensions.cs index c497ffb50c..9f4b7702ac 100644 --- a/src/dotnet-interactive/KernelExtensions.cs +++ b/src/dotnet-interactive/KernelExtensions.cs @@ -4,28 +4,17 @@ using System; using System.CommandLine; using System.CommandLine.Invocation; -using System.Threading; -using Microsoft.DotNet.Interactive.App.Commands; + using Microsoft.DotNet.Interactive.Formatting; -using Microsoft.DotNet.Interactive.Server; + using Recipes; + using static Microsoft.DotNet.Interactive.Formatting.PocketViewTags; namespace Microsoft.DotNet.Interactive.App { public static class KernelExtensions { - public static T UseQuitCommand(this T kernel, IDisposable disposeOnQuit, CancellationToken cancellationToken) where T : Kernel - { - Quit.DisposeOnQuit = disposeOnQuit; - KernelCommandEnvelope.RegisterCommandType(nameof(Quit)); - cancellationToken.Register(async () => - { - await kernel.SendAsync(new Quit()); - }); - return kernel; - } - public static T UseAboutMagicCommand(this T kernel) where T : Kernel { @@ -42,8 +31,8 @@ public static T UseAboutMagicCommand(this T kernel) var url = "https://github.com/dotnet/interactive"; var encodedImage = string.Empty; - var assembly = typeof(Program).Assembly; - using (var resourceStream = assembly.GetManifestResourceStream($"{typeof(Program).Namespace}.resources.logo-456x456.png")) + var assembly = typeof(KernelExtensions).Assembly; + using (var resourceStream = assembly.GetManifestResourceStream($"{typeof(KernelExtensions).Namespace}.resources.logo-456x456.png")) { if (resourceStream != null) { @@ -57,7 +46,7 @@ public static T UseAboutMagicCommand(this T kernel) PocketView html = table( tbody( tr( - td(img[src: encodedImage, width:"125em"]), + td(img[src: encodedImage, width: "125em"]), td[style: "line-height:.8em"]( p[style: "font-size:1.5em"](b(".NET Interactive")), p("© 2020 Microsoft Corporation"),