Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PipeReader/PipeWriter support for RPC method parameters #344

Merged
merged 1 commit into from
Sep 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions src/StreamJsonRpc.Tests/DuplexPipeMarshalingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,49 @@ await this.clientRpc.InvokeWithParameterObjectAsync(
Assert.Equal<byte>(MemoryBuffer.Take(bytesToReceive), buffer.Take(bytesToReceive));
}

[Fact]
public async Task ClientCanSendPipeReaderToServer()
{
var pipe = new Pipe();
await pipe.Writer.WriteAsync(MemoryBuffer, this.TimeoutToken);
pipe.Writer.Complete();

int bytesReceived = await this.clientRpc.InvokeWithCancellationAsync<int>(
nameof(Server.AcceptPipeReader),
new object[] { ExpectedFileName, pipe.Reader },
this.TimeoutToken);

Assert.Equal(MemoryBuffer.Length, bytesReceived);
}

[Fact]
public async Task ClientCanSendPipeWriterToServer()
{
var pipe = new Pipe();

int bytesToReceive = MemoryBuffer.Length - 1;
await this.clientRpc.InvokeWithCancellationAsync(
nameof(Server.AcceptPipeWriter),
new object[] { pipe.Writer, bytesToReceive },
this.TimeoutToken);

// Read all that the server wanted us to know, and verify it.
// TODO: update this when we can detect that the server has finished transmission.
byte[] buffer = new byte[bytesToReceive + 1];
int receivedBytes = 0;
while (receivedBytes < bytesToReceive)
{
ReadResult readResult = await pipe.Reader.ReadAsync(this.TimeoutToken);
foreach (ReadOnlyMemory<byte> segment in readResult.Buffer)
{
segment.CopyTo(buffer.AsMemory(receivedBytes));
receivedBytes += segment.Length;
}
}

Assert.Equal<byte>(MemoryBuffer.Take(bytesToReceive), buffer.Take(bytesToReceive));
}

[Theory]
[InlineData(true)]
[InlineData(false)]
Expand Down Expand Up @@ -568,6 +611,35 @@ public async Task AcceptWritablePipe(IDuplexPipe content, int lengthToWrite, Can
content.Output.Complete();
}

public async Task<long> AcceptPipeReader(string fileName, PipeReader reader, CancellationToken cancellationToken)
{
Assert.Equal(ExpectedFileName, fileName);
var ms = new MemoryStream();
using (Stream contentStream = reader.AsStream())
{
await contentStream.CopyToAsync(ms, 4096, cancellationToken);
Assert.Equal<byte>(MemoryBuffer, ms.ToArray());
}

return ms.Length;
}

public async Task AcceptPipeWriter(PipeWriter writer, int lengthToWrite, CancellationToken cancellationToken)
{
const int ChunkSize = 5;
int writtenBytes = 0;
while (writtenBytes < lengthToWrite)
{
// Write in small chunks to verify that it needn't be written all at once.
int bytesToWrite = Math.Min(lengthToWrite - writtenBytes, ChunkSize);
await writer.WriteAsync(MemoryBuffer.AsMemory(writtenBytes, bytesToWrite), cancellationToken);
await writer.FlushAsync(cancellationToken);
writtenBytes += bytesToWrite;
}

writer.Complete();
}

public async Task<long> AcceptReadableStream(string fileName, Stream content, CancellationToken cancellationToken)
{
Assert.Equal(ExpectedFileName, fileName);
Expand Down
2 changes: 1 addition & 1 deletion src/StreamJsonRpc.Tests/StreamJsonRpc.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="Microsoft.AspNetCore.TestHost" Version="2.2.0" />
<PackageReference Include="Microsoft.AspNetCore" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="Microsoft.VisualStudio.Validation" Version="15.3.58" />
<PackageReference Include="Microsoft.VisualStudio.Validation" Version="15.5.31" />
<PackageReference Include="System.IO.Pipes" Version="4.3.0" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
<PackageReference Include="xunit.combinatorial" Version="1.2.7" />
Expand Down
46 changes: 46 additions & 0 deletions src/StreamJsonRpc/JsonMessageFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public JsonMessageFormatter(Encoding encoding)
new JsonProgressServerConverter(this),
new JsonProgressClientConverter(this),
new DuplexPipeConverter(this),
new PipeReaderConverter(this),
new PipeWriterConverter(this),
new StreamConverter(this),
},
};
Expand Down Expand Up @@ -746,6 +748,50 @@ public override void WriteJson(JsonWriter writer, IDuplexPipe value, JsonSeriali
}
}

private class PipeReaderConverter : JsonConverter<PipeReader>
{
private readonly JsonMessageFormatter jsonMessageFormatter;

public PipeReaderConverter(JsonMessageFormatter jsonMessageFormatter)
{
this.jsonMessageFormatter = jsonMessageFormatter ?? throw new ArgumentNullException(nameof(jsonMessageFormatter));
}

public override PipeReader ReadJson(JsonReader reader, Type objectType, PipeReader existingValue, bool hasExistingValue, JsonSerializer serializer)
{
int? tokenId = JToken.Load(reader).Value<int?>();
return this.jsonMessageFormatter.duplexPipeTracker.GetPipeReader(tokenId);
}

public override void WriteJson(JsonWriter writer, PipeReader value, JsonSerializer serializer)
{
var token = this.jsonMessageFormatter.duplexPipeTracker.GetToken(value);
writer.WriteValue(token);
}
}

private class PipeWriterConverter : JsonConverter<PipeWriter>
{
private readonly JsonMessageFormatter jsonMessageFormatter;

public PipeWriterConverter(JsonMessageFormatter jsonMessageFormatter)
{
this.jsonMessageFormatter = jsonMessageFormatter ?? throw new ArgumentNullException(nameof(jsonMessageFormatter));
}

public override PipeWriter ReadJson(JsonReader reader, Type objectType, PipeWriter existingValue, bool hasExistingValue, JsonSerializer serializer)
{
int? tokenId = JToken.Load(reader).Value<int?>();
return this.jsonMessageFormatter.duplexPipeTracker.GetPipeWriter(tokenId);
}

public override void WriteJson(JsonWriter writer, PipeWriter value, JsonSerializer serializer)
{
var token = this.jsonMessageFormatter.duplexPipeTracker.GetToken(value);
writer.WriteValue(token);
}
}

private class StreamConverter : JsonConverter<Stream>
{
private readonly JsonMessageFormatter jsonMessageFormatter;
Expand Down
4 changes: 4 additions & 0 deletions src/StreamJsonRpc/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ StreamJsonRpc.JsonRpc.TraceEvents.ProgressNotificationError = 16 -> StreamJsonRp
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.Dispose() -> void
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.GetPipe(int? token) -> System.IO.Pipelines.IDuplexPipe
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.GetPipeReader(int? token) -> System.IO.Pipelines.PipeReader
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.GetPipeWriter(int? token) -> System.IO.Pipelines.PipeWriter
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.GetToken(System.IO.Pipelines.IDuplexPipe duplexPipe) -> int?
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.GetToken(System.IO.Pipelines.PipeReader reader) -> int?
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.GetToken(System.IO.Pipelines.PipeWriter writer) -> int?
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.MessageFormatterDuplexPipeTracker() -> void
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.MultiplexingStream.get -> Nerdbank.Streams.MultiplexingStream
StreamJsonRpc.Reflection.MessageFormatterDuplexPipeTracker.MultiplexingStream.set -> void
Expand Down
62 changes: 62 additions & 0 deletions src/StreamJsonRpc/Reflection/MessageFormatterDuplexPipeTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,30 @@ public MessageFormatterDuplexPipeTracker()
return channel.Id;
}

/// <summary>
/// Creates a token to represent a <see cref="PipeReader"/> as it is transmitted from the client to an RPC server as a method argument.
/// </summary>
/// <param name="reader">The client pipe that is to be shared with the RPC server. May be null.</param>
/// <returns>The token to use as the RPC method argument; or <c>null</c> if <paramref name="reader"/> was <c>null</c>.</returns>
/// <remarks>
/// This method should only be called while serializing requests that include an ID (i.e. requests for which we expect a response).
/// When the response is received, a call should always be made to <see cref="OnResponseReceived(long, bool)"/>.
/// </remarks>
/// <exception cref="NotSupportedException">Thrown if no <see cref="MultiplexingStream"/> was provided to the constructor.</exception>
public int? GetToken(PipeReader reader) => this.GetToken(reader != null ? new DuplexPipe(reader) : null);

/// <summary>
/// Creates a token to represent a <see cref="PipeWriter"/> as it is transmitted from the client to an RPC server as a method argument.
/// </summary>
/// <param name="writer">The client pipe that is to be shared with the RPC server. May be null.</param>
/// <returns>The token to use as the RPC method argument; or <c>null</c> if <paramref name="writer"/> was <c>null</c>.</returns>
/// <remarks>
/// This method should only be called while serializing requests that include an ID (i.e. requests for which we expect a response).
/// When the response is received, a call should always be made to <see cref="OnResponseReceived(long, bool)"/>.
/// </remarks>
/// <exception cref="NotSupportedException">Thrown if no <see cref="MultiplexingStream"/> was provided to the constructor.</exception>
public int? GetToken(PipeWriter writer) => this.GetToken(writer != null ? new DuplexPipe(writer) : null);

/// <summary>
/// Creates an <see cref="IDuplexPipe"/> from a given token as it is received at the RPC server as a method argument.
/// </summary>
Expand Down Expand Up @@ -164,6 +188,44 @@ public IDuplexPipe GetPipe(int? token)
return channel;
}

/// <summary>
/// Creates a <see cref="PipeReader"/> from a given token as it is received at the RPC server as a method argument.
/// </summary>
/// <param name="token">The method argument, which was originally obtained by the client using the <see cref="GetToken(IDuplexPipe)"/> method.</param>
/// <returns>The <see cref="PipeReader"/> from the token; or <c>null</c> if <paramref name="token"/> was <c>null</c>.</returns>
/// <exception cref="InvalidOperationException">Thrown if the token does not match up with an out of band channel offered by the client.</exception>
/// <exception cref="NotSupportedException">Thrown if no <see cref="MultiplexingStream"/> was provided to the constructor.</exception>
public PipeReader GetPipeReader(int? token)
{
IDuplexPipe duplexPipe = this.GetPipe(token);
if (duplexPipe != null)
AArnott marked this conversation as resolved.
Show resolved Hide resolved
{
duplexPipe.Output.Complete();
return duplexPipe.Input;
}

return null;
}

/// <summary>
/// Creates a <see cref="PipeWriter"/> from a given token as it is received at the RPC server as a method argument.
/// </summary>
/// <param name="token">The method argument, which was originally obtained by the client using the <see cref="GetToken(IDuplexPipe)"/> method.</param>
/// <returns>The <see cref="PipeWriter"/> from the token; or <c>null</c> if <paramref name="token"/> was <c>null</c>.</returns>
/// <exception cref="InvalidOperationException">Thrown if the token does not match up with an out of band channel offered by the client.</exception>
/// <exception cref="NotSupportedException">Thrown if no <see cref="MultiplexingStream"/> was provided to the constructor.</exception>
public PipeWriter GetPipeWriter(int? token)
{
IDuplexPipe duplexPipe = this.GetPipe(token);
if (duplexPipe != null)
{
duplexPipe.Input.Complete();
return duplexPipe.Output;
}

return null;
}

/// <summary>
/// Notifies this tracker when a response to any request is sent
/// so that appropriate channel and state cleanup can take place.
Expand Down
2 changes: 1 addition & 1 deletion src/StreamJsonRpc/StreamJsonRpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<!-- <PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="2.9.4" PrivateAssets="all" /> -->
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="16.0.102" PrivateAssets="all" />
<PackageReference Include="Microsoft.VisualStudio.Threading" Version="16.0.102" />
<PackageReference Include="Nerdbank.Streams" Version="2.3.32" />
<PackageReference Include="Nerdbank.Streams" Version="2.4.12-alpha" />
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
<PackageReference Include="Roslyn.Diagnostics.Analyzers" Version="2.9.3" PrivateAssets="all" />
<PackageReference Include="System.Collections.Immutable" Version="1.5.0" />
Expand Down