diff --git a/MLS.Agent/Program.cs b/MLS.Agent/Program.cs index 6d2762587..18a84c559 100644 --- a/MLS.Agent/Program.cs +++ b/MLS.Agent/Program.cs @@ -49,8 +49,11 @@ public static X509Certificate2 ParseKey(string base64EncodedKey) typeof(Shell).Assembly }; - private static void StartLogging(CompositeDisposable disposables, StartupOptions options) + private static IDisposable StartAppInsightsLogging(StartupOptions options) { + + var disposables = new CompositeDisposable(); + if (options.Production) { var applicationVersion = VersionSensor.Version().AssemblyInformationalVersion; @@ -66,6 +69,24 @@ private static void StartLogging(CompositeDisposable disposables, StartupOptions })); } + if (options.ApplicationInsightsKey != null) + { + var telemetryClient = new TelemetryClient(new TelemetryConfiguration(options.ApplicationInsightsKey)) + { + InstrumentationKey = options.ApplicationInsightsKey + }; + disposables.Add(telemetryClient.SubscribeToPocketLogger(_assembliesEmittingPocketLoggerLogs)); + } + + Log.Event("AgentStarting"); + + return disposables; + } + + internal static IDisposable StartToolLogging(StartupOptions options) + { + var disposables = new CompositeDisposable(); + if (options.LogPath != null) { var log = new SerilogLoggerConfiguration() @@ -94,22 +115,16 @@ private static void StartLogging(CompositeDisposable disposables, StartupOptions args.SetObserved(); }; - if (options.ApplicationInsightsKey != null) - { - var telemetryClient = new TelemetryClient(new TelemetryConfiguration(options.ApplicationInsightsKey)) - { - InstrumentationKey = options.ApplicationInsightsKey - }; - disposables.Add(telemetryClient.SubscribeToPocketLogger(_assembliesEmittingPocketLoggerLogs)); - } - - Log.Event("AgentStarting"); + return disposables; } public static IWebHost ConstructWebHost(StartupOptions options) { - var disposables = new CompositeDisposable(); - StartLogging(disposables, options); + var disposables = new CompositeDisposable + { + StartAppInsightsLogging(options), + StartToolLogging(options) + }; if (options.Key is null) { @@ -120,7 +135,6 @@ public static IWebHost ConstructWebHost(StartupOptions options) Log.Trace("Received Key: {key}", options.Key); } - var webHost = new WebHostBuilder() .UseKestrel() .UseContentRoot(Path.GetDirectoryName(typeof(Program).Assembly.Location)) diff --git a/MLS.Blazor/App.g.cs b/MLS.Blazor/App.g.cs new file mode 100644 index 000000000..e69de29bb diff --git a/MLS.Blazor/Pages/Index.g.cs b/MLS.Blazor/Pages/Index.g.cs new file mode 100644 index 000000000..e69de29bb diff --git a/MLS.Blazor/Pages/_Imports.g.cs b/MLS.Blazor/Pages/_Imports.g.cs new file mode 100644 index 000000000..e69de29bb diff --git a/MLS.Blazor/_Imports.g.cs b/MLS.Blazor/_Imports.g.cs new file mode 100644 index 000000000..e69de29bb diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs index 642a7d949..c4df564c1 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/CompleteRequestHandlerTests.cs @@ -5,7 +5,9 @@ using System.Threading.Tasks; using Clockwise; using FluentAssertions; +using FluentAssertions.Extensions; using Microsoft.DotNet.Interactive.Jupyter.Protocol; +using Recipes; using Xunit; using Xunit.Abstractions; @@ -22,9 +24,12 @@ public async Task send_completeReply_on_CompleteRequest() { var scheduler = CreateScheduler(); var request = Message.Create(new CompleteRequest("System.Console.", 15), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); - await _kernelStatus.Idle(); + var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request); + + await scheduler.Schedule(context); + + await context.Done().Timeout(5.Seconds()); _serverRecordingSocket.DecodedMessages .Should() diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs index 66ae00c46..a37f1460f 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/ExecuteRequestHandlerTests.cs @@ -22,9 +22,9 @@ public async Task sends_ExecuteInput_when_ExecuteRequest_is_handled() { var scheduler = CreateScheduler(); var request = Message.Create(new ExecuteRequest("var a =12;"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .Should().Contain(message => @@ -40,9 +40,9 @@ public async Task sends_ExecuteReply_message_on_when_code_submission_is_handled( { var scheduler = CreateScheduler(); var request = Message.Create(new ExecuteRequest("var a =12;"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .Should().Contain(message => @@ -54,9 +54,9 @@ public async Task sends_ExecuteReply_with_error_message_on_when_code_submission_ { var scheduler = CreateScheduler(); var request = Message.Create(new ExecuteRequest("asdes"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .Should() @@ -74,9 +74,9 @@ public async Task sends_DisplayData_message_on_ValueProduced() { var scheduler = CreateScheduler(); var request = Message.Create(new ExecuteRequest("Console.WriteLine(2+2);"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .Should().Contain(message => @@ -92,9 +92,9 @@ public async Task sends_ExecuteReply_message_on_ReturnValueProduced() { var scheduler = CreateScheduler(); var request = Message.Create(new ExecuteRequest("2+2"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .Should().Contain(message => @@ -110,9 +110,9 @@ public async Task sends_ExecuteReply_message_when_submission_contains_only_a_dir { var scheduler = CreateScheduler(); var request = Message.Create(new ExecuteRequest("%%csharp"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .Should().Contain(message => diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/InterruptRequestHandlerTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/InterruptRequestHandlerTests.cs index cf4d94358..d06362ece 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/InterruptRequestHandlerTests.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/InterruptRequestHandlerTests.cs @@ -1,7 +1,6 @@ // Copyright (c) .NET Foundation and contributors. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. -using System; using Clockwise; using FluentAssertions; using System.Linq; @@ -23,9 +22,9 @@ public async Task sends_InterruptReply() { var scheduler = CreateScheduler(); var request = Message.Create(new InterruptRequest(), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); - await _kernelStatus.Idle(); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages .SingleOrDefault(message => diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/IsCompleteRequestHandlerTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/IsCompleteRequestHandlerTests.cs index 3b852ba2b..58a2c37a3 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/IsCompleteRequestHandlerTests.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/IsCompleteRequestHandlerTests.cs @@ -26,8 +26,8 @@ public async Task sends_isCompleteReply_with_complete_if_the_code_is_a_complete_ var scheduler = CreateScheduler(); var request = Message.Create(new IsCompleteRequest("var a = 12;"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); - await _kernelStatus.Idle(); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); + await _kernel.Idle(); Logger.Log.Info("DecodedMessages: {messages}", _serverRecordingSocket.DecodedMessages); @@ -47,8 +47,8 @@ public async Task sends_isCompleteReply_with_incomplete_and_indent_if_the_code_i { var scheduler = CreateScheduler(); var request = Message.Create(new IsCompleteRequest("var a = 12"), null); - await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request, _kernelStatus)); - await _kernelStatus.Idle(); + await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request)); + await _kernel.Idle(); _serverRecordingSocket.DecodedMessages.SingleOrDefault(message => message.Contains(MessageTypeValues.IsCompleteReply)) .Should() diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/JupyterRequestHandlerTestBase{T}.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/JupyterRequestHandlerTestBase{T}.cs index 0847cf16f..cd2b42c0b 100644 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/JupyterRequestHandlerTestBase{T}.cs +++ b/Microsoft.DotNet.Interactive.Jupyter.Tests/JupyterRequestHandlerTestBase{T}.cs @@ -13,35 +13,36 @@ namespace Microsoft.DotNet.Interactive.Jupyter.Tests public abstract class JupyterRequestHandlerTestBase : IDisposable where T : JupyterMessageContent { - private readonly CompositeDisposable _disposables =new CompositeDisposable(); + private readonly CompositeDisposable _disposables = new CompositeDisposable(); protected readonly MessageSender _ioPubChannel; protected readonly MessageSender _serverChannel; protected readonly RecordingSocket _serverRecordingSocket; protected readonly RecordingSocket _ioRecordingSocket; - protected readonly KernelStatus _kernelStatus; + protected readonly IKernel _kernel; protected JupyterRequestHandlerTestBase(ITestOutputHelper output) { _disposables.Add(output.SubscribeToPocketLogger()); + _kernel = new CompositeKernel + { + new CSharpKernel() + }.UseDefaultMagicCommands(); + + _disposables.Add(_kernel.LogEventsToPocketLogger()); + var signatureValidator = new SignatureValidator("key", "HMACSHA256"); _serverRecordingSocket = new RecordingSocket(); _serverChannel = new MessageSender(_serverRecordingSocket, signatureValidator); _ioRecordingSocket = new RecordingSocket(); _ioPubChannel = new MessageSender(_ioRecordingSocket, signatureValidator); - _kernelStatus = new KernelStatus( - Header.Create(typeof(T), "test"), - _serverChannel); } public void Dispose() => _disposables.Dispose(); protected ICommandScheduler CreateScheduler() { - var handler = new JupyterRequestContextHandler(new CompositeKernel - { - new CSharpKernel() - }.UseDefaultMagicCommands()); + var handler = new JupyterRequestContextHandler(_kernel); return CommandScheduler.Create(handler.Handle).Trace(); } diff --git a/Microsoft.DotNet.Interactive.Jupyter.Tests/KernelStatusTests.cs b/Microsoft.DotNet.Interactive.Jupyter.Tests/KernelStatusTests.cs deleted file mode 100644 index 4a03ead77..000000000 --- a/Microsoft.DotNet.Interactive.Jupyter.Tests/KernelStatusTests.cs +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright (c) .NET Foundation and contributors. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System.Linq; -using System.Threading.Tasks; -using FluentAssertions; -using FluentAssertions.Extensions; -using Microsoft.DotNet.Interactive.Jupyter.Protocol; -using MLS.Agent.Tools.Tests; -using Newtonsoft.Json.Linq; -using Recipes; -using Xunit; - -namespace Microsoft.DotNet.Interactive.Jupyter.Tests -{ - public class KernelStatusTests - { - private readonly KernelStatus _kernelStatus; - private readonly RecordingSocket _recordingSocket = new RecordingSocket(); - - public KernelStatusTests() - { - _kernelStatus = new KernelStatus( - Header.Create( - typeof(ExecuteRequest), "test"), - new MessageSender(_recordingSocket, new SignatureValidator("key", "HMACSHA256") - )); - } - - [Fact] - public void When_idle_then_awaiting_idle_returns_immediately() - { - var task = _kernelStatus.Idle(); - - task.IsCompleted.Should().BeTrue(); - } - - [Fact] - public async Task When_not_idle_then_idle_returns_after_SetAsIdle_is_called() - { - _kernelStatus.SetAsBusy(); - - var task = _kernelStatus.Idle(); - - task.IsCompleted.Should().BeFalse(); - - await Task.WhenAll( - Task.Run(() => _kernelStatus.SetAsIdle()), - task).Timeout(3.Seconds()); - } - - [Fact] - public void Status_message_is_only_sent_on_state_change() - { - _kernelStatus.SetAsBusy(); - _kernelStatus.SetAsBusy(); - _kernelStatus.SetAsIdle(); - _kernelStatus.SetAsIdle(); - _kernelStatus.SetAsBusy(); - - _recordingSocket - .DecodedMessages - .Where(m => m.StartsWith("{")) - .Select(JObject.Parse) - .SelectMany(jobj => jobj.Properties() - .Where(p => p.Name == "execution_state") - .Select(p => p.Value.Value())) - .Should() - .BeEquivalentSequenceTo("idle", "busy", "idle", "busy"); - } - } -} \ No newline at end of file diff --git a/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs b/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs index b1d463205..b2f5463e0 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/CompleteRequestHandler.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; -using System.Collections.Concurrent; using System.Linq; using System.Reactive.Concurrency; using System.Text.RegularExpressions; @@ -27,43 +26,39 @@ public async Task Handle(JupyterRequestContext context) { var completeRequest = GetJupyterRequest(context); - context.KernelStatus.SetAsBusy(); - var command = new RequestCompletion(completeRequest.Code, completeRequest.CursorPosition); - var openRequest = new InflightRequest(context, completeRequest, 0); - - InFlightRequests[command] = openRequest; - - await Kernel.SendAsync(command); + await SendTheThingAndWaitForTheStuff(context, command); } - protected override void OnKernelEvent(IKernelEvent @event) + protected override void OnKernelEventReceived( + IKernelEvent @event, + JupyterRequestContext context) { switch (@event) { case CompletionRequestCompleted completionRequestCompleted: - OnCompletionRequestCompleted(completionRequestCompleted, InFlightRequests); - break; - case CompletionRequestReceived _: + OnCompletionRequestCompleted( + completionRequestCompleted, + context.Request, + context.ServerChannel); break; } } - private static void OnCompletionRequestCompleted(CompletionRequestCompleted completionRequestCompleted, ConcurrentDictionary openRequests) + protected override void OnKernelEvent(IKernelEvent @event) { - openRequests.TryGetValue(completionRequestCompleted.Command, out var openRequest); - if (openRequest == null) - { - return; - } + } + + private static void OnCompletionRequestCompleted(CompletionRequestCompleted completionRequestCompleted, Message request, IMessageSender serverChannel) + { + var command = completionRequestCompleted.Command as RequestCompletion; - var pos = ComputeReplacementStartPosition(openRequest.Request.Code, openRequest.Request.CursorPosition); - var reply = new CompleteReply(pos, openRequest.Request.CursorPosition, matches: completionRequestCompleted.CompletionList.Select(e => e.InsertText).ToList()); + var pos = ComputeReplacementStartPosition(command.Code, command.CursorPosition); + var reply = new CompleteReply(pos, command.CursorPosition, matches: completionRequestCompleted.CompletionList.Select(e => e.InsertText).ToList()); - var completeReply = Message.CreateResponse(reply, openRequest.Context.Request); - openRequest.Context.ServerChannel.Send(completeReply); - openRequest.Context.KernelStatus.SetAsIdle(); + var completeReply = Message.CreateResponse(reply, request); + serverChannel.Send(completeReply); } private static int ComputeReplacementStartPosition(string code, int cursorPosition) diff --git a/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs b/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs index 426b02b06..73301982d 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/ExecuteRequestHandler.cs @@ -5,7 +5,6 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; -using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.Interactive.Commands; @@ -28,115 +27,106 @@ public async Task Handle(JupyterRequestContext context) { var executeRequest = GetJupyterRequest(context); - context.KernelStatus.SetAsBusy(); - var executionCount = executeRequest.Silent ? _executionCount : Interlocked.Increment(ref _executionCount); + _executionCount = executeRequest.Silent ? _executionCount : Interlocked.Increment(ref _executionCount); - var executeInputPayload = new ExecuteInput(executeRequest.Code, executionCount); + var executeInputPayload = new ExecuteInput(executeRequest.Code, _executionCount); var executeReply = Message.Create(executeInputPayload, context.Request.Header); context.ServerChannel.Send(executeReply); var command = new SubmitCode(executeRequest.Code); - var openRequest = new InflightRequest(context, executeRequest, executionCount); - - InFlightRequests[command] = openRequest; - - try - { - await Kernel.SendAsync(command); - } - catch (Exception e) - { - OnCommandFailed(new CommandFailed(e, command)); - } + await SendTheThingAndWaitForTheStuff(context, command); } - private static Dictionary CreateTransient(string displayId) - { - var transient = new Dictionary { { "display_id", displayId ?? Guid.NewGuid().ToString() } }; - return transient; - } - - protected override void OnKernelEvent(IKernelEvent @event) + protected override void OnKernelEventReceived( + IKernelEvent @event, + JupyterRequestContext context) { switch (@event) { case ValueProducedEventBase valueProduced: - OnValueProduced(valueProduced); + OnValueProduced(valueProduced, context.Request, context.IoPubChannel); break; case CommandHandled commandHandled: - OnCommandHandled(commandHandled); + OnCommandHandled(commandHandled, context.Request, context.IoPubChannel); break; case CommandFailed commandFailed: - OnCommandFailed(commandFailed); - break; - case CodeSubmissionReceived _: - case IncompleteCodeSubmissionReceived _: - break; - case CompleteCodeSubmissionReceived _: + OnCommandFailed(commandFailed, context.Request, context.ServerChannel, context.IoPubChannel); break; } } - private void OnCommandFailed(CommandFailed commandFailed) + private static Dictionary CreateTransient(string displayId) { - if (!InFlightRequests.TryRemove(commandFailed.GetRootCommand(), out var openRequest)) - { - return; - } + var transient = new Dictionary { { "display_id", displayId ?? Guid.NewGuid().ToString() } }; + return transient; + } - var errorContent = new Error( + protected override void OnKernelEvent(IKernelEvent @event) + { + } + + private void OnCommandFailed( + CommandFailed commandFailed, + Message request, + IMessageSender serverChannel, + IMessageSender ioPubChannel) + { + var errorContent = new Error ( eName: "Unhandled Exception", eValue: commandFailed.Message ); - if (!openRequest.Request.Silent) + var isSilent = ((ExecuteRequest)request.Content).Silent; + + if (!isSilent) { // send on io var error = Message.Create( errorContent, - openRequest.Context.Request.Header); - openRequest.Context.IoPubChannel.Send(error); + request.Header); + + ioPubChannel.Send(error); // send on stderr var stdErr = new StdErrStream(errorContent.EValue); var stream = Message.Create( stdErr, - openRequest.Context.Request.Header); - openRequest.Context.IoPubChannel.Send(stream); + request.Header); + + ioPubChannel.Send(stream); } // reply Error - var executeReplyPayload = new ExecuteReplyError(errorContent, executionCount: openRequest.ExecutionCount); + var executeReplyPayload = new ExecuteReplyError(errorContent, executionCount: _executionCount); // send to server var executeReply = Message.CreateResponse( executeReplyPayload, - openRequest.Context.Request); + request); - openRequest.Context.ServerChannel.Send(executeReply); - openRequest.Context.KernelStatus.SetAsIdle(); + serverChannel.Send(executeReply); } - private void SendDisplayData(DisplayData displayData, InflightRequest openRequest) + private void SendDisplayData(DisplayData displayData, Message request, IMessageSender ioPubChannel) { - if (!openRequest.Request.Silent) + var isSilent = ((ExecuteRequest) request.Content).Silent; + + if (!isSilent) { // send on io var executeResultMessage = Message.Create( displayData, - openRequest.Context.Request.Header); - openRequest.Context.IoPubChannel.Send(executeResultMessage); + request.Header); + ioPubChannel.Send(executeResultMessage); } } - private void OnValueProduced(ValueProducedEventBase valueProduced) + private void OnValueProduced( + ValueProducedEventBase valueProduced, + Message request, + IMessageSender ioPubChannel) { - if (!InFlightRequests.TryGetValue(valueProduced.GetRootCommand(), out var openRequest)) - { - return; - } - var transient = CreateTransient(valueProduced.ValueId); var formattedValues = valueProduced @@ -158,7 +148,7 @@ private void OnValueProduced(ValueProducedEventBase valueProduced) break; case ReturnValueProduced _: executeResultData = new ExecuteResult( - openRequest.ExecutionCount, + _executionCount, transient: transient, data: formattedValues); break; @@ -171,7 +161,7 @@ private void OnValueProduced(ValueProducedEventBase valueProduced) throw new ArgumentException("Unsupported event type", nameof(valueProduced)); } - SendDisplayData(executeResultData, openRequest); + SendDisplayData(executeResultData, request, ioPubChannel); } private static void CreateDefaultFormattedValueIfEmpty(Dictionary formattedValues, object value) @@ -184,23 +174,22 @@ private static void CreateDefaultFormattedValueIfEmpty(Dictionary _done = new TaskCompletionSource(); + + public JupyterRequestContext(IMessageSender serverChannel, IMessageSender ioPubChannel, Message request) + { + ServerChannel = serverChannel ?? throw new ArgumentNullException(nameof(serverChannel)); + IoPubChannel = ioPubChannel ?? throw new ArgumentNullException(nameof(ioPubChannel)); + Request = request ?? throw new ArgumentNullException(nameof(request)); + } + public IMessageSender ServerChannel { get; } + public IMessageSender IoPubChannel { get; } + public Message Request { get; } public T GetRequestContent() where T : JupyterMessageContent @@ -17,15 +30,8 @@ public T GetRequestContent() where T : JupyterMessageContent return Request?.Content as T; } - public IKernelStatus KernelStatus { get; } + public void Complete() => _done.SetResult(Unit.Default); - public JupyterRequestContext(IMessageSender serverChannel, IMessageSender ioPubChannel, Message request, - IKernelStatus kernelStatus) - { - ServerChannel = serverChannel ?? throw new ArgumentNullException(nameof(serverChannel)); - IoPubChannel = ioPubChannel ?? throw new ArgumentNullException(nameof(ioPubChannel)); - Request = request ?? throw new ArgumentNullException(nameof(request)); - KernelStatus = kernelStatus; - } + public Task Done() => _done.Task; } } \ No newline at end of file diff --git a/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs b/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs index aa5fcf9c5..c3ca49235 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/JupyterRequestContextHandler.cs @@ -17,8 +17,7 @@ public class JupyterRequestContextHandler : ICommandHandler { @@ -53,6 +52,8 @@ public async Task Handle( break; } + delivery.Command.Complete(); + return delivery.Complete(); } } diff --git a/Microsoft.DotNet.Interactive.Jupyter/KernelStatus.cs b/Microsoft.DotNet.Interactive.Jupyter/KernelStatus.cs deleted file mode 100644 index 99691003c..000000000 --- a/Microsoft.DotNet.Interactive.Jupyter/KernelStatus.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) .NET Foundation and contributors. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System; -using System.Reactive.Linq; -using System.Reactive.Subjects; -using System.Threading.Tasks; -using Microsoft.DotNet.Interactive.Jupyter.Protocol; - -namespace Microsoft.DotNet.Interactive.Jupyter -{ - public class KernelStatus : IKernelStatus - { - private readonly Header _requestHeader; - private readonly MessageSender _messageSender; - private readonly BehaviorSubject _idleState = new BehaviorSubject(true); - - public KernelStatus(Header requestHeader, MessageSender messageSender) - { - _requestHeader = requestHeader; - _messageSender = messageSender; - - _idleState - .DistinctUntilChanged() - .Subscribe(value => SetStatus(value ? StatusValues.Idle : StatusValues.Busy)); - } - - public void SetAsBusy() => _idleState.OnNext(false); - - public void SetAsIdle() => _idleState.OnNext(true); - - public async Task Idle() => await _idleState.FirstAsync(value => value); - - private void SetStatus(string status) - { - var content = new Status(status); - - var statusMessage = Message.Create(content, _requestHeader); - - _messageSender.Send(statusMessage); - } - } -} \ No newline at end of file diff --git a/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs b/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs index 19b10e76c..170e23b96 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs @@ -6,6 +6,8 @@ using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; using Microsoft.DotNet.Interactive.Commands; using Microsoft.DotNet.Interactive.Events; using Microsoft.DotNet.Interactive.Jupyter.Protocol; @@ -24,11 +26,29 @@ protected RequestHandlerBase(IKernel kernel, IScheduler scheduler) Kernel = kernel ?? throw new ArgumentNullException(nameof(kernel)); KernelEvents = Kernel.KernelEvents.ObserveOn(scheduler ?? throw new ArgumentNullException(nameof(scheduler))); + + // FIX: (RequestHandlerBase) do we care about this? _disposables.Add(KernelEvents.Subscribe(OnKernelEvent)); } protected abstract void OnKernelEvent(IKernelEvent @event); + protected async Task SendTheThingAndWaitForTheStuff( + JupyterRequestContext context, + IKernelCommand command) + { + var sub = Kernel.KernelEvents.Subscribe(e => OnKernelEventReceived(e, context)); + + await ((KernelBase) Kernel).SendAsync( + command, + CancellationToken.None, + onDone: () => sub.Dispose()); + } + + protected abstract void OnKernelEventReceived( + IKernelEvent @event, + JupyterRequestContext context); + protected static T GetJupyterRequest(JupyterRequestContext context) { var request = context.GetRequestContent() ?? diff --git a/Microsoft.DotNet.Interactive.Jupyter/Shell.cs b/Microsoft.DotNet.Interactive.Jupyter/Shell.cs index 71e7c0af1..c40d3f4ad 100644 --- a/Microsoft.DotNet.Interactive.Jupyter/Shell.cs +++ b/Microsoft.DotNet.Interactive.Jupyter/Shell.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Reactive.Linq; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; @@ -11,11 +12,13 @@ using NetMQ.Sockets; using Pocket; using Recipes; +using static Pocket.Logger; namespace Microsoft.DotNet.Interactive.Jupyter { public class Shell : IHostedService { + private readonly IKernel _kernel; private readonly ICommandScheduler _scheduler; private readonly RouterSocket _shell; private readonly PublisherSocket _ioPubSocket; @@ -31,6 +34,7 @@ public class Shell : IHostedService private readonly RouterSocket _control; public Shell( + IKernel kernel, ICommandScheduler scheduler, ConnectionInformation connectionInformation) { @@ -39,6 +43,7 @@ public Shell( throw new ArgumentNullException(nameof(connectionInformation)); } + _kernel = kernel ?? throw new ArgumentNullException(nameof(kernel)); _scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); _shellAddress = $"{connectionInformation.Transport}://{connectionInformation.IP}:{connectionInformation.ShellPort}"; @@ -71,8 +76,8 @@ public async Task StartAsync(CancellationToken cancellationToken) _ioPubSocket.Bind(_ioPubAddress); _stdIn.Bind(_stdInAddress); _control.Bind(_controlAddress); - - using (var activity = Logger.Log.OnEnterAndExit()) + + using (var activity = Log.OnEnterAndExit()) { while (!cancellationToken.IsCancellationRequested) { @@ -80,34 +85,40 @@ public async Task StartAsync(CancellationToken cancellationToken) activity.Info("Received: {message}", message.ToJson()); - var status = new KernelStatus(message.Header, new MessageSender(_ioPubSocket, _signatureValidator)); + SetBusy(); switch (message.Header.MessageType) { case MessageTypeValues.KernelInfoRequest: - status.SetAsBusy(); HandleKernelInfoRequest(message); - status.SetAsIdle(); + SetIdle(); break; case MessageTypeValues.KernelShutdownRequest: - status.SetAsBusy(); - status.SetAsIdle(); + SetIdle(); break; default: var context = new JupyterRequestContext( _shellSender, _ioPubSender, - message, - new KernelStatus(message.Header, _shellSender)); + message); await _scheduler.Schedule(context); + await context.Done(); + + SetIdle(); + break; } + + void SetBusy() => _shellSender.Send(Message.Create(new Status(StatusValues.Busy), message.Header)); + + void SetIdle() => _shellSender.Send(Message.Create(new Status(StatusValues.Busy), message.Header)); } } + } public Task StopAsync(CancellationToken cancellationToken) @@ -122,7 +133,6 @@ private void HandleKernelInfoRequest(Message request) var replyMessage = Message.CreateResponse(kernelInfoReply, request); - _shellSender.Send(replyMessage); } } diff --git a/Microsoft.DotNet.Interactive/CompositeKernel.cs b/Microsoft.DotNet.Interactive/CompositeKernel.cs index 93296f713..9feadc710 100644 --- a/Microsoft.DotNet.Interactive/CompositeKernel.cs +++ b/Microsoft.DotNet.Interactive/CompositeKernel.cs @@ -42,7 +42,10 @@ public void Add(IKernel kernel) AddDirective(chooseKernelCommand); - AddDisposable(kernel.KernelEvents.Subscribe(PublishEvent)); + AddDisposable(kernel.KernelEvents.Subscribe(e => + { + PublishEvent(e); + })); } protected override void SetHandlingKernel( diff --git a/Microsoft.DotNet.Interactive/Events/KernelBusy.cs b/Microsoft.DotNet.Interactive/Events/KernelBusy.cs new file mode 100644 index 000000000..ec0b80060 --- /dev/null +++ b/Microsoft.DotNet.Interactive/Events/KernelBusy.cs @@ -0,0 +1,8 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +namespace Microsoft.DotNet.Interactive.Events +{ + public class KernelBusy : KernelEventBase + { + } +} \ No newline at end of file diff --git a/Microsoft.DotNet.Interactive/Events/KernelIdle.cs b/Microsoft.DotNet.Interactive/Events/KernelIdle.cs new file mode 100644 index 000000000..a229cf446 --- /dev/null +++ b/Microsoft.DotNet.Interactive/Events/KernelIdle.cs @@ -0,0 +1,8 @@ +// Copyright (c) .NET Foundation and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. +namespace Microsoft.DotNet.Interactive.Events +{ + public class KernelIdle : KernelEventBase + { + } +} \ No newline at end of file diff --git a/Microsoft.DotNet.Interactive/IKernel.cs b/Microsoft.DotNet.Interactive/IKernel.cs index 8cbf1fa6d..385665a9d 100644 --- a/Microsoft.DotNet.Interactive/IKernel.cs +++ b/Microsoft.DotNet.Interactive/IKernel.cs @@ -17,7 +17,9 @@ public interface IKernel : IDisposable IObservable KernelEvents { get; } - Task SendAsync(IKernelCommand command, CancellationToken cancellationToken); + Task SendAsync( + IKernelCommand command, + CancellationToken cancellationToken); IReadOnlyCollection Directives { get; } } diff --git a/Microsoft.DotNet.Interactive/KernelBase.cs b/Microsoft.DotNet.Interactive/KernelBase.cs index da00195fc..6b2b23abe 100644 --- a/Microsoft.DotNet.Interactive/KernelBase.cs +++ b/Microsoft.DotNet.Interactive/KernelBase.cs @@ -9,6 +9,7 @@ using System.CommandLine.Invocation; using System.Linq; using System.Reactive.Disposables; +using System.Reactive.Linq; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; @@ -17,12 +18,24 @@ namespace Microsoft.DotNet.Interactive { + public class KernelIdleState + { + private readonly BehaviorSubject _idleState = new BehaviorSubject(true); + + public IObservable IdleState => _idleState.DistinctUntilChanged(); + + public void SetAsBusy() => _idleState.OnNext(false); + + public void SetAsIdle() => _idleState.OnNext(true); + } + public abstract class KernelBase : IKernel { private readonly Subject _channel = new Subject(); private readonly CompositeDisposable _disposables; private readonly List _directiveCommands = new List(); private Parser _directiveParser; + private readonly KernelIdleState _idleState =new KernelIdleState(); protected KernelBase() { @@ -33,6 +46,23 @@ protected KernelBase() AddSetKernelMiddleware(); AddDirectiveMiddlewareAndCommonCommandHandlers(); + + _disposables.Add(_idleState.IdleState.Subscribe(idle => + { + if (idle) + { + PublishEvent(new KernelIdle()); + } + else + { + PublishEvent(new KernelBusy()); + } + })); + } + + public void WhenIdle(Func p) + { + throw new NotImplementedException(); } public KernelCommandPipeline Pipeline { get; } @@ -310,9 +340,19 @@ internal virtual async Task HandleInternalAsync( private readonly ConcurrentQueue _commandQueue = new ConcurrentQueue(); + + public Task SendAsync( IKernelCommand command, CancellationToken cancellationToken) + { + return SendAsync(command, cancellationToken, null); + } + + public Task SendAsync( + IKernelCommand command, + CancellationToken cancellationToken, + Action onDone) { if (command == null) { @@ -325,15 +365,29 @@ public Task SendAsync( _commandQueue.Enqueue(operation); - Task.Run(async () => + DoTheThing(_commandQueue); + + return tcs.Task; + + void DoTheThing(ConcurrentQueue commandQueue) { - if (_commandQueue.TryDequeue(out var currentOperation)) + if (commandQueue.TryDequeue(out var currentOperation)) { - await ExecuteCommand(currentOperation); - } - }, cancellationToken).ConfigureAwait(false); + _idleState.SetAsBusy(); + + Task.Run(async () => + { + await ExecuteCommand(currentOperation); - return tcs.Task; + DoTheThing(commandQueue); + }, cancellationToken).ConfigureAwait(false); + } + else + { + _idleState.SetAsIdle(); + onDone?.Invoke(); + } + } } protected void PublishEvent(IKernelEvent kernelEvent) diff --git a/Microsoft.DotNet.Interactive/KernelExtensions.cs b/Microsoft.DotNet.Interactive/KernelExtensions.cs index cec2d3598..ff6230435 100644 --- a/Microsoft.DotNet.Interactive/KernelExtensions.cs +++ b/Microsoft.DotNet.Interactive/KernelExtensions.cs @@ -6,15 +6,31 @@ using System.CommandLine.Builder; using System.CommandLine.Invocation; using System.IO; +using System.Reactive.Disposables; +using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Events; +using Microsoft.DotNet.Interactive.Extensions; using Pocket; namespace Microsoft.DotNet.Interactive { public static class KernelExtensions { + public static async Task Idle(this IKernel kernel) + { + var busyStream = kernel.KernelEvents.OfType(); + var idleStream = kernel.KernelEvents.OfType(); + // this is a stream that produces idle events only if there is a busy event first, makes sense? + + var pattern = busyStream.And(idleStream); + var stream = Observable.When(pattern.Then((b, i) => i)); + + await stream.FirstAsync(); + } + public static Task SendAsync( this IKernel kernel, IKernelCommand command) @@ -48,11 +64,31 @@ public static T UseExtendDirective(this T kernel) } public static T LogEventsToPocketLogger(this T kernel) - where T : KernelBase + where T : IKernel { - kernel.KernelEvents - .Subscribe(e => - Logger.Log.Info("KernelEvent: {event}", e)); + var disposables = new CompositeDisposable(); + + disposables.Add( + kernel.KernelEvents + .Subscribe( + e => + { + Logger.Log.Info("{kernel}: {event}", + kernel.Name, + e); + })); + + kernel.VisitSubkernels(k => + { + disposables.Add( + k.KernelEvents.Subscribe( + e => + { + Logger.Log.Info("{kernel}: {event}", + k.Name, + e); + })); + }); return kernel; } diff --git a/WorkspaceServer.Tests/Kernel/CompositeKernelTests.cs b/WorkspaceServer.Tests/Kernel/CompositeKernelTests.cs index 5c9081ca4..c129c7c10 100644 --- a/WorkspaceServer.Tests/Kernel/CompositeKernelTests.cs +++ b/WorkspaceServer.Tests/Kernel/CompositeKernelTests.cs @@ -8,7 +8,9 @@ using System.Threading.Tasks; using Microsoft.DotNet.Interactive; using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Events; using Microsoft.DotNet.Interactive.Tests; +using MLS.Agent.Tools.Tests; using Pocket; using WorkspaceServer.Kernel; using Xunit; @@ -195,5 +197,30 @@ await kernel.SendAsync( .Should() .Be("hello!"); } + + [Fact] + public async Task Events_published_by_child_kernel_are_visible_in_parent_kernel() + { + var subKernel = new CSharpKernel(); + + var compositeKernel = new CompositeKernel + { + subKernel + }; + + var events = compositeKernel.KernelEvents.ToSubscribedList(); + + await subKernel.SendAsync(new SubmitCode("var x = 1;")); + + events + .Select(e => e.GetType()) + .Should() + .ContainInOrder( + typeof(KernelBusy), + typeof(CodeSubmissionReceived), + typeof(CompleteCodeSubmissionReceived), + typeof(CommandHandled), + typeof(KernelIdle)); + } } } \ No newline at end of file diff --git a/WorkspaceServer.Tests/Kernel/KernelInvocationContextTests.cs b/WorkspaceServer.Tests/Kernel/KernelInvocationContextTests.cs index ea6fdf367..e67f62c28 100644 --- a/WorkspaceServer.Tests/Kernel/KernelInvocationContextTests.cs +++ b/WorkspaceServer.Tests/Kernel/KernelInvocationContextTests.cs @@ -4,10 +4,14 @@ using System; using FluentAssertions; using System.Linq; +using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.DotNet.Interactive; using Microsoft.DotNet.Interactive.Commands; +using Microsoft.DotNet.Interactive.Events; +using MLS.Agent.Tools.Tests; +using WorkspaceServer.Kernel; using Xunit; namespace WorkspaceServer.Tests.Kernel @@ -46,5 +50,32 @@ await Task.Run(() => .And .NotBeNull(); } + + [Fact] + public async Task When_a_command_spawns_another_command_then_parent_context_is_not_complete_until_child_context_is_complete() + { + var kernel = new CompositeKernel + { + new CSharpKernel().UseKernelHelpers() + }; + + kernel.Pipeline.AddMiddleware(async (command, context, next) => + { + context.Publish(new DisplayedValueProduced("1", command)); + + await next(command, context); + + context.Publish(new DisplayedValueProduced("3", command)); + }); + + var result = await kernel.SendAsync(new SubmitCode("display(2);")); + + var events = result.KernelEvents.ToEnumerable(); + + events.OfType() + .Select(v => v.Value) + .Should() + .BeEquivalentSequenceTo(1, 2, 3); + } } } \ No newline at end of file