Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix verbose RPC message tracing #398

Merged
merged 1 commit into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/StreamJsonRpc/HeaderDelimitedMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ namespace StreamJsonRpc
using System.Threading.Tasks;
using Microsoft;
using Nerdbank.Streams;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StreamJsonRpc.Protocol;
using StreamJsonRpc.Reflection;

/// <summary>
/// Adds headers before each text message transmitted over a stream.
Expand Down Expand Up @@ -209,6 +208,15 @@ unsafe int WriteHeaderText(string value, Span<byte> memory)
{
this.Formatter.Serialize(this.contentSequenceBuilder, content);
ReadOnlySequence<byte> contentSequence = this.contentSequenceBuilder.AsReadOnlySequence;

// Some formatters (e.g. MessagePackFormatter) needs the encoded form in order to produce JSON for tracing.
// Other formatters (e.g. JsonMessageFormatter) would prefer to do its own tracing while it still has a JToken.
// We only help the formatters that need the byte-encoded form here. The rest can do it themselves.
if (this.Formatter is IJsonRpcFormatterTracingCallbacks tracer)
{
tracer.OnSerializationComplete(content, this.contentSequenceBuilder);
}

Memory<byte> headerMemory = this.Writer.GetMemory(1024);
int bytesWritten = 0;

Expand Down
3 changes: 3 additions & 0 deletions src/StreamJsonRpc/IJsonRpcMessageFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

namespace StreamJsonRpc
{
using System;
using System.Buffers;
using StreamJsonRpc.Protocol;
using StreamJsonRpc.Reflection;

/// <summary>
/// An interface that offers <see cref="JsonRpcMessage"/> serialization to and from a sequence of bytes.
Expand All @@ -30,6 +32,7 @@ public interface IJsonRpcMessageFormatter
/// </summary>
/// <param name="message">The message to be traced.</param>
/// <returns>Any object whose <see cref="object.ToString()"/> method will produce a human-readable JSON string, suitable for tracing.</returns>
[Obsolete("Tracing is now done via the " + nameof(IJsonRpcTracingCallbacks) + " and " + nameof(IJsonRpcFormatterTracingCallbacks) + " interfaces. Formatters may throw NotSupportedException from this method.")]
object GetJsonText(JsonRpcMessage message);
}
}
29 changes: 20 additions & 9 deletions src/StreamJsonRpc/JsonMessageFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,13 @@ JsonRpc IJsonRpcInstanceContainer.Rpc
{
set
{
Requires.NotNull(value, nameof(value));
Verify.Operation(this.rpc == null, Resources.FormatterConfigurationLockedAfterJsonRpcAssigned);
this.rpc = value;

if (value != null)
{
this.formatterProgressTracker = new MessageFormatterProgressTracker(value, this);
this.enumerableTracker = new MessageFormatterEnumerableTracker(value, this);
this.duplexPipeTracker = new MessageFormatterDuplexPipeTracker(value, this) { MultiplexingStream = this.multiplexingStream };
}
this.formatterProgressTracker = new MessageFormatterProgressTracker(value, this);
this.enumerableTracker = new MessageFormatterEnumerableTracker(value, this);
this.duplexPipeTracker = new MessageFormatterDuplexPipeTracker(value, this) { MultiplexingStream = this.multiplexingStream };
}
}

Expand Down Expand Up @@ -300,7 +298,12 @@ public JsonRpcMessage Deserialize(ReadOnlySequence<byte> contentBuffer, Encoding
Requires.NotNull(encoding, nameof(encoding));

JToken json = this.ReadJToken(contentBuffer, encoding);
return this.Deserialize(json);
JsonRpcMessage message = this.Deserialize(json);

IJsonRpcTracingCallbacks? tracingCallbacks = this.rpc;
tracingCallbacks?.OnMessageDeserialized(message, json);

return message;
}

/// <inheritdoc/>
Expand All @@ -313,7 +316,12 @@ public async ValueTask<JsonRpcMessage> DeserializeAsync(PipeReader reader, Encod
{
this.ConfigureJsonTextReader(jsonReader);
JToken json = await JToken.ReadFromAsync(jsonReader, cancellationToken).ConfigureAwait(false);
return this.Deserialize(json);
JsonRpcMessage message = this.Deserialize(json);

IJsonRpcTracingCallbacks? tracingCallbacks = this.rpc;
tracingCallbacks?.OnMessageDeserialized(message, json);

return message;
}
}

Expand All @@ -325,6 +333,9 @@ public void Serialize(IBufferWriter<byte> contentBuffer, JsonRpcMessage message)
{
JToken json = this.Serialize(message);

IJsonRpcTracingCallbacks? tracingCallbacks = this.rpc;
tracingCallbacks?.OnMessageSerialized(message, json);

this.WriteJToken(contentBuffer, json);
}

Expand Down Expand Up @@ -407,7 +418,7 @@ public JToken Serialize(JsonRpcMessage message)
/// mutates the <see cref="JsonRpcMessage"/> itself by tokenizing arguments as if we were going to transmit them
/// which BREAKS argument parsing for incoming named argument messages such as $/cancelRequest.
/// </devremarks>
public object GetJsonText(JsonRpcMessage message) => JToken.FromObject(message);
public object GetJsonText(JsonRpcMessage message) => throw new NotSupportedException();

/// <inheritdoc/>
public void Dispose()
Expand Down
70 changes: 28 additions & 42 deletions src/StreamJsonRpc/JsonRpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace StreamJsonRpc
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Collections.ObjectModel;
Expand All @@ -24,7 +25,7 @@ namespace StreamJsonRpc
/// <summary>
/// Manages a JSON-RPC connection with another entity over a <see cref="Stream"/>.
/// </summary>
public class JsonRpc : IDisposableObservable, IJsonRpcFormatterCallbacks
public class JsonRpc : IDisposableObservable, IJsonRpcFormatterCallbacks, IJsonRpcTracingCallbacks
{
private const string ImpliedMethodNameAsyncSuffix = "Async";
private const string CancelRequestSpecialMethod = "$/cancelRequest";
Expand Down Expand Up @@ -1111,6 +1112,32 @@ public Task NotifyWithParameterObjectAsync(string targetName, object? argument =
return this.InvokeCoreAsync<object>(RequestId.NotSpecified, targetName, argumentToPass, CancellationToken.None, isParameterObject: true);
}

void IJsonRpcTracingCallbacks.OnMessageSerialized(JsonRpcMessage message, object encodedMessage)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceData(TraceEventType.Information, (int)TraceEvents.MessageSent, message);
}

if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
{
this.TraceSource.TraceEvent(TraceEventType.Verbose, (int)TraceEvents.MessageSent, "Sent: {0}", encodedMessage);
}
}

void IJsonRpcTracingCallbacks.OnMessageDeserialized(JsonRpcMessage message, object encodedMessage)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceData(TraceEventType.Information, (int)TraceEvents.MessageReceived, message);
}

if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
{
this.TraceSource.TraceEvent(TraceEventType.Verbose, (int)TraceEvents.MessageReceived, "Received: {0}", encodedMessage);
}
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
Expand Down Expand Up @@ -2124,8 +2151,6 @@ private async Task ReadAndHandleRequestsAsync()
this.OnJsonRpcDisconnected(new JsonRpcDisconnectedEventArgs(Resources.ReachedEndOfStream, DisconnectedReason.RemotePartyTerminated));
return;
}

this.TraceMessageReceived(protocolMessage);
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -2384,49 +2409,10 @@ private void TraceLocalMethodAdded(string rpcMethodName, MethodSignatureAndTarge
}
}

private void TraceMessageSent(JsonRpcMessage message)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceData(TraceEventType.Information, (int)TraceEvents.MessageSent, message);
}

if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
{
this.TraceSource.TraceEvent(TraceEventType.Verbose, (int)TraceEvents.MessageSent, "Sent: {0}", this.GetMessageJson(message));
}
}

private void TraceMessageReceived(JsonRpcMessage message)
{
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
{
this.TraceSource.TraceData(TraceEventType.Information, (int)TraceEvents.MessageReceived, message);
}

if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Verbose))
{
this.TraceSource.TraceEvent(TraceEventType.Verbose, (int)TraceEvents.MessageReceived, "Received: {0}", this.GetMessageJson(message));
}
}

private object GetMessageJson(JsonRpcMessage message)
{
try
{
return this.MessageHandler.Formatter.GetJsonText(message);
}
catch (Exception ex)
{
return $"<JSON representation not available: {ex.Message}>";
}
}

private async ValueTask TransmitAsync(JsonRpcMessage message, CancellationToken cancellationToken)
{
try
{
this.TraceMessageSent(message);
bool etwEnabled = JsonRpcEventSource.Instance.IsEnabled(System.Diagnostics.Tracing.EventLevel.Informational, System.Diagnostics.Tracing.EventKeywords.None);
if (etwEnabled)
{
Expand Down
37 changes: 23 additions & 14 deletions src/StreamJsonRpc/LengthHeaderMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@

namespace StreamJsonRpc
{
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft;
using Nerdbank.Streams;
using StreamJsonRpc.Protocol;
using StreamJsonRpc.Reflection;

/// <summary>
/// A minimal header for each message that simply declares content length.
Expand All @@ -30,9 +27,9 @@ public class LengthHeaderMessageHandler : PipeMessageHandler
private readonly IJsonRpcMessageFormatter formatter;

/// <summary>
/// A wrapper to use for the <see cref="PipeMessageHandler.Writer"/> when we need to count bytes written.
/// The <see cref="IBufferWriter{T}"/> sent to the <see cref="formatter"/> to write the message.
/// </summary>
private PrefixingBufferWriter<byte>? prefixingWriter;
private readonly Sequence<byte> contentSequenceBuilder = new Sequence<byte>(ArrayPool<byte>.Shared);

/// <summary>
/// Initializes a new instance of the <see cref="LengthHeaderMessageHandler"/> class.
Expand Down Expand Up @@ -94,17 +91,29 @@ protected override void Write(JsonRpcMessage content, CancellationToken cancella
{
Assumes.NotNull(this.Writer);

if (this.prefixingWriter == null)
try
{
this.prefixingWriter = new PrefixingBufferWriter<byte>(this.Writer, sizeof(int));
}
// Write out the actual message content.
this.formatter.Serialize(this.contentSequenceBuilder, content);
ReadOnlySequence<byte> contentSequence = this.contentSequenceBuilder.AsReadOnlySequence;

// Write out the actual message content, counting all written bytes.
this.formatter.Serialize(this.prefixingWriter, content);
// Some formatters (e.g. MessagePackFormatter) needs the encoded form in order to produce JSON for tracing.
// Other formatters (e.g. JsonMessageFormatter) would prefer to do its own tracing while it still has a JToken.
// We only help the formatters that need the byte-encoded form here. The rest can do it themselves.
if (this.Formatter is IJsonRpcFormatterTracingCallbacks tracer)
{
tracer.OnSerializationComplete(content, contentSequence);
}

// Now go back and fill in the header with the actual content length.
Utilities.Write(this.prefixingWriter.Prefix.Span, checked((int)this.prefixingWriter.Length));
this.prefixingWriter.Commit();
// Now go back and fill in the header with the actual content length.
Utilities.Write(this.Writer.GetSpan(sizeof(int)), checked((int)contentSequence.Length));
this.Writer.Advance(sizeof(int));
contentSequence.CopyTo(this.Writer);
}
finally
{
this.contentSequenceBuilder.Reset();
}
}
}
}
34 changes: 31 additions & 3 deletions src/StreamJsonRpc/MessagePackFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace StreamJsonRpc
/// The README on that project site describes use cases and its performance compared to alternative
/// .NET MessagePack implementations and this one appears to be the best by far.
/// </remarks>
public class MessagePackFormatter : IJsonRpcMessageFormatter, IJsonRpcInstanceContainer, IJsonRpcFormatterState, IDisposable
public class MessagePackFormatter : IJsonRpcMessageFormatter, IJsonRpcInstanceContainer, IJsonRpcFormatterState, IJsonRpcFormatterTracingCallbacks, IDisposable
{
/// <summary>
/// The constant "jsonrpc".
Expand Down Expand Up @@ -217,7 +217,15 @@ public void SetMessagePackSerializerOptions(MessagePackSerializerOptions options
}

/// <inheritdoc/>
public JsonRpcMessage Deserialize(ReadOnlySequence<byte> contentBuffer) => MessagePackSerializer.Deserialize<JsonRpcMessage>(contentBuffer, this.messageSerializationOptions);
public JsonRpcMessage Deserialize(ReadOnlySequence<byte> contentBuffer)
{
JsonRpcMessage message = MessagePackSerializer.Deserialize<JsonRpcMessage>(contentBuffer, this.messageSerializationOptions);

IJsonRpcTracingCallbacks? tracingCallbacks = this.rpc;
tracingCallbacks?.OnMessageDeserialized(message, new ToStringHelper(contentBuffer, this.messageSerializationOptions));

return message;
}

/// <inheritdoc/>
public void Serialize(IBufferWriter<byte> contentBuffer, JsonRpcMessage message)
Expand All @@ -235,7 +243,13 @@ public void Serialize(IBufferWriter<byte> contentBuffer, JsonRpcMessage message)
}

/// <inheritdoc/>
public object GetJsonText(JsonRpcMessage message) => message is IJsonRpcMessagePackRetention retainedMsgPack ? MessagePackSerializer.ConvertToJson(retainedMsgPack.OriginalMessagePack, this.messageSerializationOptions) : MessagePackSerializer.SerializeToJson(message, this.messageSerializationOptions);
public object GetJsonText(JsonRpcMessage message) => message is IJsonRpcMessagePackRetention retainedMsgPack ? MessagePackSerializer.ConvertToJson(retainedMsgPack.OriginalMessagePack, this.messageSerializationOptions) : throw new NotSupportedException();

void IJsonRpcFormatterTracingCallbacks.OnSerializationComplete(JsonRpcMessage message, ReadOnlySequence<byte> encodedMessage)
{
IJsonRpcTracingCallbacks? tracingCallbacks = this.rpc;
tracingCallbacks?.OnMessageSerialized(message, new ToStringHelper(encodedMessage, this.messageSerializationOptions));
}

/// <inheritdoc/>
public void Dispose()
Expand Down Expand Up @@ -424,6 +438,20 @@ internal void WriteRaw(ref MessagePackWriter writer)
}
}

private class ToStringHelper
{
private readonly ReadOnlySequence<byte> encodedMessage;
private readonly MessagePackSerializerOptions options;

internal ToStringHelper(ReadOnlySequence<byte> encodedMessage, MessagePackSerializerOptions options)
{
this.encodedMessage = encodedMessage;
this.options = options;
}

public override string ToString() => MessagePackSerializer.ConvertToJson(this.encodedMessage, this.options);
}

private class RequestIdFormatter : IMessagePackFormatter<RequestId>
{
internal static readonly RequestIdFormatter Instance = new RequestIdFormatter();
Expand Down
21 changes: 21 additions & 0 deletions src/StreamJsonRpc/Reflection/IJsonRpcFormatterTracingCallbacks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace StreamJsonRpc.Reflection
{
using System.Buffers;
using StreamJsonRpc.Protocol;

/// <summary>
/// Optionally implemented by a <see cref="IJsonRpcMessageFormatter"/> when it needs the fully serialized sequence in order to trace the JSON representation of the message.
/// </summary>
public interface IJsonRpcFormatterTracingCallbacks
{
/// <summary>
/// Invites the formatter to call <see cref="IJsonRpcTracingCallbacks.OnMessageSerialized(JsonRpcMessage, object)"/> with the JSON representation of the message just serialized..
/// </summary>
/// <param name="message">The message that was just serialized.</param>
/// <param name="encodedMessage">The encoded copy of the message, as it recently came from the <see cref="IJsonRpcMessageFormatter.Serialize(IBufferWriter{byte}, JsonRpcMessage)"/> method.</param>
void OnSerializationComplete(JsonRpcMessage message, ReadOnlySequence<byte> encodedMessage);
}
}
Loading