Skip to content

Commit

Permalink
Merge pull request #1 from colombod/logging-improvements
Browse files Browse the repository at this point in the history
Api reshape
  • Loading branch information
jonsequitur authored Sep 13, 2019
2 parents 96eaf94 + 08ddc8d commit 8a43b3e
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading.Tasks;
using Clockwise;
using FluentAssertions;
Expand All @@ -24,7 +23,6 @@ public async Task send_completeReply_on_CompleteRequest()
{
var scheduler = CreateScheduler();
var request = Message.Create(new CompleteRequest("System.Console.", 15), null);

var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);

await scheduler.Schedule(context);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading.Tasks;
using Clockwise;
using FluentAssertions;
using FluentAssertions.Extensions;
using Microsoft.DotNet.Interactive.Jupyter.Protocol;
using Recipes;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -22,9 +23,10 @@ public async Task sends_ExecuteInput_when_ExecuteRequest_is_handled()
{
var scheduler = CreateScheduler();
var request = Message.Create(new ExecuteRequest("var a =12;"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);
await scheduler.Schedule(context);

await _kernel.Idle();
await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages
.Should().Contain(message =>
Expand All @@ -40,9 +42,10 @@ public async Task sends_ExecuteReply_message_on_when_code_submission_is_handled(
{
var scheduler = CreateScheduler();
var request = Message.Create(new ExecuteRequest("var a =12;"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);
await scheduler.Schedule(context);

await _kernel.Idle();
await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages
.Should().Contain(message =>
Expand All @@ -54,9 +57,10 @@ public async Task sends_ExecuteReply_with_error_message_on_when_code_submission_
{
var scheduler = CreateScheduler();
var request = Message.Create(new ExecuteRequest("asdes"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);
await scheduler.Schedule(context);

await _kernel.Idle();
await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages
.Should()
Expand All @@ -74,9 +78,10 @@ public async Task sends_DisplayData_message_on_ValueProduced()
{
var scheduler = CreateScheduler();
var request = Message.Create(new ExecuteRequest("Console.WriteLine(2+2);"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);
await scheduler.Schedule(context);

await _kernel.Idle();
await context.Done().Timeout(10.Seconds());

_serverRecordingSocket.DecodedMessages
.Should().Contain(message =>
Expand All @@ -92,9 +97,10 @@ public async Task sends_ExecuteReply_message_on_ReturnValueProduced()
{
var scheduler = CreateScheduler();
var request = Message.Create(new ExecuteRequest("2+2"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);
await scheduler.Schedule(context);

await _kernel.Idle();
await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages
.Should().Contain(message =>
Expand All @@ -110,9 +116,10 @@ public async Task sends_ExecuteReply_message_when_submission_contains_only_a_dir
{
var scheduler = CreateScheduler();
var request = Message.Create(new ExecuteRequest("%%csharp"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);
await scheduler.Schedule(context);

await _kernel.Idle();
await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages
.Should().Contain(message =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using FluentAssertions;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions.Extensions;
using Microsoft.DotNet.Interactive.Jupyter.Protocol;
using Recipes;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -22,9 +24,11 @@ public async Task sends_InterruptReply()
{
var scheduler = CreateScheduler();
var request = Message.Create(new InterruptRequest(), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);

await _kernel.Idle();
await scheduler.Schedule(context);

await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages
.SingleOrDefault(message =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) .NET Foundation and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Clockwise;
using FluentAssertions;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions.Extensions;
using Microsoft.DotNet.Interactive.Jupyter.Protocol;
using Pocket;
using Recipes;
Expand All @@ -25,9 +25,10 @@ public async Task sends_isCompleteReply_with_complete_if_the_code_is_a_complete_
{
var scheduler = CreateScheduler();
var request = Message.Create(new IsCompleteRequest("var a = 12;"), null);
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);

await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
await _kernel.Idle();
await scheduler.Schedule(context);
await context.Done().Timeout(5.Seconds());

Logger.Log.Info("DecodedMessages: {messages}", _serverRecordingSocket.DecodedMessages);

Expand All @@ -47,8 +48,11 @@ public async Task sends_isCompleteReply_with_incomplete_and_indent_if_the_code_i
{
var scheduler = CreateScheduler();
var request = Message.Create(new IsCompleteRequest("var a = 12"), null);
await scheduler.Schedule(new JupyterRequestContext(_serverChannel, _ioPubChannel, request));
await _kernel.Idle();
var context = new JupyterRequestContext(_serverChannel, _ioPubChannel, request);

await scheduler.Schedule(context);
await context.Done().Timeout(5.Seconds());

_serverRecordingSocket.DecodedMessages.SingleOrDefault(message =>
message.Contains(MessageTypeValues.IsCompleteReply))
.Should()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ protected override void OnKernelEventReceived(
}
}

protected override void OnKernelEvent(IKernelEvent @event)
{
}

private static void OnCompletionRequestCompleted(CompletionRequestCompleted completionRequestCompleted, Message request, IMessageSender serverChannel)
{
var command = completionRequestCompleted.Command as RequestCompletion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected override void OnKernelEventReceived(
OnValueProduced(valueProduced, context.Request, context.IoPubChannel);
break;
case CommandHandled commandHandled:
OnCommandHandled(commandHandled, context.Request, context.IoPubChannel);
OnCommandHandled(commandHandled, context.Request, context.ServerChannel);
break;
case CommandFailed commandFailed:
OnCommandFailed(commandFailed, context.Request, context.ServerChannel, context.IoPubChannel);
Expand All @@ -62,10 +62,6 @@ private static Dictionary<string, object> CreateTransient(string displayId)
return transient;
}

protected override void OnKernelEvent(IKernelEvent @event)
{
}

private void OnCommandFailed(
CommandFailed commandFailed,
Message request,
Expand Down
37 changes: 16 additions & 21 deletions Microsoft.DotNet.Interactive.Jupyter/InterruptRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,39 @@ public InterruptRequestHandler(IKernel kernel, IScheduler scheduler = null)
: base(kernel, scheduler ?? CurrentThreadScheduler.Instance)
{
}

protected override void OnKernelEvent(IKernelEvent @event)

protected override void OnKernelEventReceived(
IKernelEvent @event,
JupyterRequestContext context)
{
switch (@event)
{
case CurrentCommandCancelled kernelInterrupted:
OnExecutionInterrupted(kernelInterrupted);
OnExecutionInterrupted(kernelInterrupted, context.Request, context.ServerChannel);
break;
}
}

private void OnExecutionInterrupted(CurrentCommandCancelled currentCommandCancelled)
private void OnExecutionInterrupted(CurrentCommandCancelled currentCommandCancelled, Message request, IMessageSender serverChannel)
{
if (InFlightRequests.TryRemove(currentCommandCancelled.Command, out var openRequest))
{
// reply
var interruptReplyPayload = new InterruptReply();

// send to server
var interruptReply = Message.CreateResponse(
interruptReplyPayload,
openRequest.Context.Request);
// reply
var interruptReplyPayload = new InterruptReply();

// send to server
var interruptReply = Message.CreateResponse(
interruptReplyPayload,
request);

serverChannel.Send(interruptReply);

openRequest.Context.ServerChannel.Send(interruptReply);
}
}

public async Task Handle(JupyterRequestContext context)
{
var interruptRequest = GetJupyterRequest(context);

var command = new CancelCurrentCommand();

var openRequest = new InflightRequest(context, interruptRequest, 0);

InFlightRequests[command] = openRequest;

await Kernel.SendAsync(command);
await SendTheThingAndWaitForTheStuff(context, command);
}
}
}
21 changes: 9 additions & 12 deletions Microsoft.DotNet.Interactive.Jupyter/IsCompleteRequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.


using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Threading.Tasks;
using Microsoft.DotNet.Interactive.Commands;
Expand All @@ -26,6 +24,7 @@ public async Task Handle(JupyterRequestContext context)

await SendTheThingAndWaitForTheStuff(context, command);
}


protected override void OnKernelEventReceived(
IKernelEvent @event,
Expand All @@ -34,20 +33,18 @@ protected override void OnKernelEventReceived(
switch (@event)
{
case CompleteCodeSubmissionReceived completeCodeSubmissionReceived:
OnKernelEvent(completeCodeSubmissionReceived, true);
Reply( true, context.Request, context.ServerChannel);
break;
case IncompleteCodeSubmissionReceived incompleteCodeSubmissionReceived:
OnKernelEvent(incompleteCodeSubmissionReceived, false);
Reply( false, context.Request, context.ServerChannel);
break;
}
}

protected override void OnKernelEvent(IKernelEvent @event){}

private void OnKernelEvent(IKernelEvent @event, bool isComplete)
private void Reply(bool isComplete, Message request, IMessageSender serverChannel)
{
if (InFlightRequests.TryRemove(@event.Command, out var openRequest))
{


var status = isComplete ? "complete" : "incomplete";
var indent = isComplete ? string.Empty : "*";
// reply
Expand All @@ -56,10 +53,10 @@ private void OnKernelEvent(IKernelEvent @event, bool isComplete)
// send to server
var executeReply = Message.CreateResponse(
isCompleteReplyPayload,
openRequest.Context.Request);
request);

openRequest.Context.ServerChannel.Send(executeReply);
}
serverChannel.Send(executeReply);

}
}
}
31 changes: 4 additions & 27 deletions Microsoft.DotNet.Interactive.Jupyter/RequestHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
Expand All @@ -17,7 +16,7 @@ namespace Microsoft.DotNet.Interactive.Jupyter
public abstract class RequestHandlerBase<T> : IDisposable
where T : JupyterMessageContent
{

private readonly CompositeDisposable _disposables = new CompositeDisposable();
protected IObservable<IKernelEvent> KernelEvents { get; }

Expand All @@ -27,19 +26,15 @@ protected RequestHandlerBase(IKernel kernel, IScheduler scheduler)

KernelEvents = Kernel.KernelEvents.ObserveOn(scheduler ?? throw new ArgumentNullException(nameof(scheduler)));

// FIX: (RequestHandlerBase) do we care about this?
_disposables.Add(KernelEvents.Subscribe(OnKernelEvent));
}

protected abstract void OnKernelEvent(IKernelEvent @event);

protected async Task SendTheThingAndWaitForTheStuff(
JupyterRequestContext context,
JupyterRequestContext context,
IKernelCommand command)
{
var sub = Kernel.KernelEvents.Subscribe(e => OnKernelEventReceived(e, context));

await ((KernelBase) Kernel).SendAsync(
await ((KernelBase)Kernel).SendAsync(
command,
CancellationToken.None,
onDone: () => sub.Dispose());
Expand All @@ -57,29 +52,11 @@ protected static T GetJupyterRequest(JupyterRequestContext context)
return request;
}

protected IKernel Kernel { get; }

protected ConcurrentDictionary<IKernelCommand, InflightRequest> InFlightRequests { get; } = new ConcurrentDictionary<IKernelCommand, InflightRequest>();
protected IKernel Kernel { get; }

public void Dispose()
{
_disposables.Dispose();
}

protected class InflightRequest
{
public JupyterRequestContext Context { get; }

public T Request { get; }

public int ExecutionCount { get; }

public InflightRequest(JupyterRequestContext context, T request, int executionCount)
{
Context = context;
Request = request;
ExecutionCount = executionCount;
}
}
}
}
Loading

0 comments on commit 8a43b3e

Please sign in to comment.