Skip to content

Commit

Permalink
Optimize client buffers during serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK committed Feb 17, 2020
1 parent d1b2b64 commit 95df486
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 71 deletions.
6 changes: 3 additions & 3 deletions perf/Grpc.AspNetCore.Microbenchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ static void Main(string[] args)
// Profiling option. This will call methods explicitly, in-process
static async System.Threading.Tasks.Task Main(string[] args)
{
var benchmark = new Client.CompressedUnaryServerCallHandlerBenchmark();
var benchmark = new Client.UnaryClientBenchmark();
benchmark.GlobalSetup();
for (var i = 0; i < 10000; i++)
{
await benchmark.CompressedSayHelloAsync();
await benchmark.SayHelloAsync();
}

System.Console.WriteLine("Press any key to start.");
System.Console.ReadKey();
for (var i = 0; i < 1; i++)
{
await benchmark.CompressedSayHelloAsync();
await benchmark.SayHelloAsync();
}

System.Console.WriteLine("Done. Press any key to exit.");
Expand Down
26 changes: 26 additions & 0 deletions src/Grpc.Net.Client/Internal/DefaultSerializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Grpc.Core;

Expand Down Expand Up @@ -112,6 +113,9 @@ public override IBufferWriter<byte> GetBufferWriter()

private IBufferWriter<byte> ResolveBufferWriter()
{
// TODO(JamesNK): I believe length should be known by the context before the buffer writer is
// fetched for the first time. Should be able to initialize a custom buffer writer with pooled
// array of the required size.
return _bufferWriter ??= new ArrayBufferWriter<byte>();
}

Expand All @@ -131,5 +135,27 @@ public override void Complete()
break;
}
}

public Memory<byte> GetHeader(bool isCompressed, int length)
{
// TODO(JamesNK): We can optimize header allocation when IBufferWriter is being used.
// IBufferWriter can be used to provide a buffer, either before or after message content.
var buffer = new byte[GrpcProtocolConstants.HeaderSize];

// Compression flag
buffer[0] = isCompressed ? (byte)1 : (byte)0;

// Message length
EncodeMessageLength(length, buffer.AsSpan(1, 4));

return buffer;
}

private static void EncodeMessageLength(int messageLength, Span<byte> destination)
{
Debug.Assert(destination.Length >= GrpcProtocolConstants.MessageDelimiterSize, "Buffer too small to encode message length.");

BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength);
}
}
}
3 changes: 3 additions & 0 deletions src/Grpc.Net.Client/Internal/GrpcProtocolConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ internal static class GrpcProtocolConstants
// deflate is not supported. .NET's DeflateStream does not support RFC1950 - https://github.com/dotnet/corefx/issues/7570
};

internal const int MessageDelimiterSize = 4; // how many bytes it takes to encode "Message-Length"
internal const int HeaderSize = MessageDelimiterSize + 1; // message length + compression flag

internal static readonly string DefaultMessageAcceptEncodingValue;

internal static readonly string UserAgentHeader;
Expand Down
128 changes: 60 additions & 68 deletions src/Grpc.Net.Client/Internal/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ namespace Grpc.Net.Client
{
internal static partial class StreamExtensions
{
private const int MessageDelimiterSize = 4; // how many bytes it takes to encode "Message-Length"
private const int HeaderSize = MessageDelimiterSize + 1; // message length + compression flag

private static readonly Status SendingMessageExceedsLimitStatus = new Status(StatusCode.ResourceExhausted, "Sending message exceeds the maximum configured message size.");
private static readonly Status ReceivedMessageExceedsLimitStatus = new Status(StatusCode.ResourceExhausted, "Received message exceeds the maximum configured message size.");
private static readonly Status NoMessageEncodingMessageStatus = new Status(StatusCode.Internal, "Request did not include grpc-encoding value with compressed message.");
Expand All @@ -49,21 +46,21 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
return new Status(StatusCode.Unimplemented, $"Unsupported grpc-encoding value '{unsupportedEncoding}'. Supported encodings: {string.Join(", ", supportedEncodings)}");
}

private static async Task<(uint length, bool compressed)?> ReadHeaderAsync(Stream responseStream, Memory<byte> header, CancellationToken cancellationToken)
private static async Task<(int length, bool compressed)?> ReadHeaderAsync(Stream responseStream, Memory<byte> header, CancellationToken cancellationToken)
{
int read;
var received = 0;
while ((read = await responseStream.ReadAsync(header.Slice(received, header.Length - received), cancellationToken).ConfigureAwait(false)) > 0)
while ((read = await responseStream.ReadAsync(header.Slice(received, GrpcProtocolConstants.HeaderSize - received), cancellationToken).ConfigureAwait(false)) > 0)
{
received += read;

if (received == header.Length)
if (received == GrpcProtocolConstants.HeaderSize)
{
break;
}
}

if (received < header.Length)
if (received < GrpcProtocolConstants.HeaderSize)
{
if (received == 0)
{
Expand All @@ -73,10 +70,18 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
throw new InvalidDataException("Unexpected end of content while reading the message header.");
}

// Read the header first
// - 1 byte flag for compression
// - 4 bytes for the content length
var compressed = ReadCompressedFlag(header.Span[0]);
var length = BinaryPrimitives.ReadUInt32BigEndian(header.Span.Slice(1));
var length = BinaryPrimitives.ReadUInt32BigEndian(header.Span.Slice(1, 4));

return (length, compressed);
if (length > int.MaxValue)
{
throw new InvalidDataException("Message too large.");
}

return ((int)length, compressed);
}

public static async ValueTask<TResponse?> ReadMessageAsync<TResponse>(
Expand All @@ -90,17 +95,17 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
CancellationToken cancellationToken)
where TResponse : class
{
byte[]? buffer = null;

try
{
GrpcCallLog.ReadingMessage(logger);
cancellationToken.ThrowIfCancellationRequested();

// Read the header first
// - 1 byte flag for compression
// - 4 bytes for the content length
var header = new byte[HeaderSize];
// Buffer is used to read header, then message content
buffer = ArrayPool<byte>.Shared.Rent(minimumLength: 1024);

var headerDetails = await ReadHeaderAsync(responseStream, header, cancellationToken).ConfigureAwait(false);
var headerDetails = await ReadHeaderAsync(responseStream, buffer, cancellationToken).ConfigureAwait(false);

if (headerDetails == null)
{
Expand All @@ -111,17 +116,22 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
var length = headerDetails.Value.length;
var compressed = headerDetails.Value.compressed;

if (length > int.MaxValue)
if (length > 0)
{
throw new InvalidDataException("Message too large.");
}
if (length > maximumMessageSize)
{
throw new RpcException(ReceivedMessageExceedsLimitStatus);
}

if (length > maximumMessageSize)
{
throw new RpcException(ReceivedMessageExceedsLimitStatus);
}
// Replace buffer if the message doesn't fit
if (buffer.Length < length)
{
ArrayPool<byte>.Shared.Return(buffer);
buffer = ArrayPool<byte>.Shared.Rent(length);
}

var messageData = await ReadMessageContent(responseStream, length, cancellationToken).ConfigureAwait(false);
await ReadMessageContent(responseStream, buffer, length, cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -138,7 +148,7 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
}

// Performance improvement would be to decompress without converting to an intermediary byte array
if (!TryDecompressMessage(logger, grpcEncoding, compressionProviders, messageData, out var decompressedMessage))
if (!TryDecompressMessage(logger, grpcEncoding, compressionProviders, buffer, length, out var decompressedMessage))
{
var supportedEncodings = new List<string>();
supportedEncodings.Add(GrpcProtocolConstants.IdentityGrpcEncoding);
Expand All @@ -150,10 +160,10 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
}
else
{
payload = new ReadOnlySequence<byte>(messageData);
payload = new ReadOnlySequence<byte>(buffer, 0, length);
}

GrpcCallLog.DeserializingMessage(logger, messageData.Length, typeof(TResponse));
GrpcCallLog.DeserializingMessage(logger, length, typeof(TResponse));

var deserializationContext = new DefaultDeserializationContext();
deserializationContext.SetPayload(payload);
Expand All @@ -164,7 +174,7 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
{
// Check that there is no additional content in the stream for a single message
// There is no ReadByteAsync on stream. Reuse header array with ReadAsync, we don't need it anymore
if (await responseStream.ReadAsync(header).ConfigureAwait(false) > 0)
if (await responseStream.ReadAsync(buffer).ConfigureAwait(false) > 0)
{
throw new InvalidDataException("Unexpected data after finished reading message.");
}
Expand All @@ -179,43 +189,44 @@ private static Status CreateUnknownMessageEncodingMessageStatus(string unsupport
GrpcCallLog.ErrorReadingMessage(logger, ex);
throw;
}
finally
{
if (buffer != null)
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}

private static async Task<byte[]> ReadMessageContent(Stream responseStream, uint length, CancellationToken cancellationToken)
private static async Task ReadMessageContent(Stream responseStream, Memory<byte> messageData, int length, CancellationToken cancellationToken)
{
// Read message content until content length is reached
byte[] messageData;
if (length > 0)
var received = 0;
int read;
while ((read = await responseStream.ReadAsync(messageData.Slice(received, length - received), cancellationToken).ConfigureAwait(false)) > 0)
{
var received = 0;
var read = 0;
messageData = new byte[length];
while ((read = await responseStream.ReadAsync(messageData.AsMemory(received, messageData.Length - received), cancellationToken).ConfigureAwait(false)) > 0)
{
received += read;
received += read;

if (received == messageData.Length)
{
break;
}
if (received == length)
{
break;
}
}
else

if (received < length)
{
messageData = Array.Empty<byte>();
throw new InvalidDataException("Unexpected end of content while reading the message content.");
}

return messageData;
}

private static bool TryDecompressMessage(ILogger logger, string compressionEncoding, Dictionary<string, ICompressionProvider> compressionProviders, byte[] messageData, [NotNullWhen(true)]out ReadOnlySequence<byte>? result)
private static bool TryDecompressMessage(ILogger logger, string compressionEncoding, Dictionary<string, ICompressionProvider> compressionProviders, byte[] messageData, int length, [NotNullWhen(true)]out ReadOnlySequence<byte>? result)
{
if (compressionProviders.TryGetValue(compressionEncoding, out var compressionProvider))
{
GrpcCallLog.DecompressingMessage(logger, compressionProvider.EncodingName);

var output = new MemoryStream();
using (var compressionStream = compressionProvider.CreateDecompressionStream(new MemoryStream(messageData)))
using (var compressionStream = compressionProvider.CreateDecompressionStream(new MemoryStream(messageData, 0, length, writable: true, publiclyVisible: true)))
{
compressionStream.CopyTo(output);
}
Expand Down Expand Up @@ -244,6 +255,7 @@ private static bool ReadCompressedFlag(byte flag)
}
}

// TODO(JamesNK): Reuse serialization content between message writes. Improve client/duplex streaming allocations.
public static async ValueTask WriteMessageAsync<TMessage>(
this Stream stream,
ILogger logger,
Expand Down Expand Up @@ -275,8 +287,8 @@ public static async ValueTask WriteMessageAsync<TMessage>(
}

var isCompressed =
GrpcProtocolHelpers.CanWriteCompressed(callOptions.WriteOptions) &&
!string.Equals(grpcEncoding, GrpcProtocolConstants.IdentityGrpcEncoding, StringComparison.Ordinal);
GrpcProtocolHelpers.CanWriteCompressed(callOptions.WriteOptions) &&
!string.Equals(grpcEncoding, GrpcProtocolConstants.IdentityGrpcEncoding, StringComparison.Ordinal);

if (isCompressed)
{
Expand All @@ -288,7 +300,7 @@ public static async ValueTask WriteMessageAsync<TMessage>(
data);
}

await WriteHeaderAsync(stream, data.Length, isCompressed, callOptions.CancellationToken).ConfigureAwait(false);
await stream.WriteAsync(serializationContext.GetHeader(isCompressed, data.Length), callOptions.CancellationToken).ConfigureAwait(false);
await stream.WriteAsync(data, callOptions.CancellationToken).ConfigureAwait(false);
await stream.FlushAsync(callOptions.CancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -322,25 +334,5 @@ private static ReadOnlyMemory<byte> CompressMessage(ILogger logger, string compr
// Should never reach here
throw new InvalidOperationException($"Could not find compression provider for '{compressionEncoding}'.");
}

private static ValueTask WriteHeaderAsync(Stream stream, int length, bool compress, CancellationToken cancellationToken)
{
var headerData = new byte[HeaderSize];

// Compression flag
headerData[0] = compress ? (byte)1 : (byte)0;

// Message length
EncodeMessageLength(length, headerData.AsSpan(1));

return stream.WriteAsync(headerData.AsMemory(0, headerData.Length), cancellationToken);
}

private static void EncodeMessageLength(int messageLength, Span<byte> destination)
{
Debug.Assert(destination.Length >= MessageDelimiterSize, "Buffer too small to encode message length.");

BinaryPrimitives.WriteUInt32BigEndian(destination, (uint)messageLength);
}
}
}
3 changes: 3 additions & 0 deletions src/Shared/DefaultDeserializationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public void SetPayload(in ReadOnlySequence<byte>? payload)
public override byte[] PayloadAsNewBuffer()
{
Debug.Assert(_payload != null, "Payload must be set.");

// The array returned by PayloadAsNewBuffer must be the exact message size.
// There is no opportunity here to return a pooled array.
return _payload.GetValueOrDefault().ToArray();
}

Expand Down

0 comments on commit 95df486

Please sign in to comment.