Skip to content

Commit

Permalink
Client compression (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Jul 8, 2019
1 parent d855571 commit d6c8349
Show file tree
Hide file tree
Showing 23 changed files with 699 additions and 68 deletions.
4 changes: 2 additions & 2 deletions src/Grpc.AspNetCore.Server/Internal/GrpcProtocolHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static bool IsGrpcContentType(string contentType)
return false;
}

public static bool IsInvalidContentType(HttpContext httpContext, [NotNullWhenTrue]out string? error)
public static bool IsInvalidContentType(HttpContext httpContext, [NotNullWhen(true)]out string? error)
{
if (httpContext.Request.ContentType == null)
{
Expand Down Expand Up @@ -157,7 +157,7 @@ public static byte[] ParseBinaryHeader(string base64)
return Convert.FromBase64String(decodable);
}

internal static bool TryDecompressMessage(string compressionEncoding, List<ICompressionProvider> compressionProviders, byte[] messageData, [NotNullWhenTrue]out byte[]? result)
internal static bool TryDecompressMessage(string compressionEncoding, List<ICompressionProvider> compressionProviders, byte[] messageData, [NotNullWhen(true)]out byte[]? result)
{
foreach (var compressionProvider in compressionProviders)
{
Expand Down
34 changes: 34 additions & 0 deletions src/Grpc.AspNetCore.Server/Internal/NotNullWhenAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

namespace System.Runtime.CompilerServices
{
/// <summary>Specifies that when a method returns <see cref="ReturnValue"/>, the parameter will not be null even if the corresponding type allows it.</summary>
[AttributeUsage(AttributeTargets.Parameter, Inherited = false)]
internal sealed class NotNullWhenAttribute : Attribute
{
/// <summary>Initializes the attribute with the specified return value condition.</summary>
/// <param name="returnValue">
/// The return value condition. If the method returns this value, the associated parameter will not be null.
/// </param>
public NotNullWhenAttribute(bool returnValue) => ReturnValue = returnValue;

/// <summary>Gets the return value condition.</summary>
public bool ReturnValue { get; }
}
}
26 changes: 0 additions & 26 deletions src/Grpc.AspNetCore.Server/Internal/NotNullWhenTrueAttribute.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/Grpc.AspNetCore.Server/Internal/PipeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private enum ReadMessageResult
Stop
}

private static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, HttpContextServerCallContext context, [NotNullWhenTrue]out byte[]? message)
private static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, HttpContextServerCallContext context, [NotNullWhen(true)]out byte[]? message)
{
if (!TryReadHeader(buffer, out var compressed, out var messageLength))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System.IO;
using System.IO.Compression;

namespace Grpc.Net.Client.Internal.Compression
{
/// <summary>
/// GZIP compression provider.
/// </summary>
internal class GzipCompressionProvider : ICompressionProvider
{
private readonly CompressionLevel _defaultCompressionLevel;

/// <summary>
/// Initializes a new instance of the <see cref="GzipCompressionProvider"/> class with the specified <see cref="CompressionLevel"/>.
/// </summary>
/// <param name="defaultCompressionLevel">The default compression level to use when compressing data.</param>
public GzipCompressionProvider(CompressionLevel defaultCompressionLevel)
{
_defaultCompressionLevel = defaultCompressionLevel;
}

/// <summary>
/// The encoding name used in the 'grpc-encoding' and 'grpc-accept-encoding' request and response headers.
/// </summary>
public string EncodingName => "gzip";

/// <summary>
/// Create a new compression stream.
/// </summary>
/// <param name="stream">The stream that compressed data is written to.</param>
/// <param name="compressionLevel">The compression level.</param>
/// <returns>A stream used to compress data.</returns>
public Stream CreateCompressionStream(Stream stream, CompressionLevel? compressionLevel)
{
return new GZipStream(stream, compressionLevel ?? _defaultCompressionLevel);
}

/// <summary>
/// Create a new decompression stream.
/// </summary>
/// <param name="stream">The stream that compressed data is copied from.</param>
/// <returns>A stream used to decompress data.</returns>
public Stream CreateDecompressionStream(Stream stream)
{
return new GZipStream(stream, CompressionMode.Decompress);
}
}
}
49 changes: 49 additions & 0 deletions src/Grpc.Net.Client/Internal/Compression/ICompressionProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#endregion

using System.IO;
using System.IO.Compression;

namespace Grpc.Net.Client.Internal.Compression
{
/// <summary>
/// Provides a specific compression implementation to compress gRPC messages.
/// </summary>
internal interface ICompressionProvider
{
/// <summary>
/// The encoding name used in the 'grpc-encoding' and 'grpc-accept-encoding' request and response headers.
/// </summary>
string EncodingName { get; }

/// <summary>
/// Create a new compression stream.
/// </summary>
/// <param name="stream">The stream that compressed data is written to.</param>
/// <param name="compressionLevel">The compression level.</param>
/// <returns>A stream used to compress data.</returns>
Stream CreateCompressionStream(Stream stream, CompressionLevel? compressionLevel);

/// <summary>
/// Create a new decompression stream.
/// </summary>
/// <param name="stream">The stream that compressed data is copied from.</param>
/// <returns>A stream used to decompress data.</returns>
Stream CreateDecompressionStream(Stream stream);
}
}
31 changes: 25 additions & 6 deletions src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ public async Task<TResponse> GetResponseAsync()

// Trailers are only available once the response body had been read
var responseStream = await HttpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false);
var message = await responseStream.ReadSingleMessageAsync(Logger, Method.ResponseMarshaller.Deserializer, _callCts.Token).ConfigureAwait(false);
var message = await responseStream.ReadSingleMessageAsync(
Logger,
Method.ResponseMarshaller.Deserializer,
GrpcProtocolHelpers.GetGrpcEncoding(HttpResponse),
_callCts.Token).ConfigureAwait(false);
FinishResponse();

if (message == null)
Expand Down Expand Up @@ -373,7 +377,14 @@ private void SetMessageContent(TRequest request, HttpRequestMessage message)
message.Content = new PushStreamContent(
(stream) =>
{
return stream.WriteMessage<TRequest>(Logger, request, Method.RequestMarshaller.Serializer, Options.CancellationToken);
var grpcEncoding = GrpcProtocolHelpers.GetRequestEncoding(message.Headers);

return stream.WriteMessage<TRequest>(
Logger,
request,
Method.RequestMarshaller.Serializer,
grpcEncoding,
Options.CancellationToken);
},
GrpcProtocolConstants.GrpcContentTypeHeaderValue);
}
Expand Down Expand Up @@ -500,7 +511,7 @@ private HttpContentClientStreamWriter<TRequest, TResponse> CreateWriter(HttpRequ
},
GrpcProtocolConstants.GrpcContentTypeHeaderValue);

var writer = new HttpContentClientStreamWriter<TRequest, TResponse>(this, _writeStreamTcs.Task, _completeTcs);
var writer = new HttpContentClientStreamWriter<TRequest, TResponse>(this, message, _writeStreamTcs.Task, _completeTcs);
return writer;
}

Expand All @@ -513,18 +524,26 @@ private HttpRequestMessage CreateHttpRequestMessage()
// TE is required by some servers, e.g. C Core
// A missing TE header results in servers aborting the gRPC call
message.Headers.TE.Add(GrpcProtocolConstants.TEHeader);
message.Headers.Add(GrpcProtocolConstants.MessageAcceptEncodingHeader, GrpcProtocolConstants.MessageAcceptEncodingValue);

if (Options.Headers != null && Options.Headers.Count > 0)
{
foreach (var entry in Options.Headers)
{
// Deadline is set via CallOptions.Deadline
if (entry.Key == GrpcProtocolConstants.TimeoutHeader)
{
// grpc-timeout is set via CallOptions.Deadline
continue;
}

AddHeader(message.Headers, entry);
else if (entry.Key == GrpcProtocolConstants.CompressionRequestAlgorithmHeader)
{
// grpc-internal-encoding-request is used in the client to set message compression
message.Headers.Add(GrpcProtocolConstants.MessageEncodingHeader, entry.Value);
}
else
{
AddHeader(message.Headers, entry);
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/Grpc.Net.Client/Internal/GrpcProtocolConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

#endregion

using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Http.Headers;
using System.Reflection;
using Grpc.Net.Client.Internal.Compression;

namespace Grpc.Net.Client.Internal
{
Expand All @@ -34,8 +36,19 @@ internal static class GrpcProtocolConstants
internal const string StatusTrailer = "grpc-status";
internal const string MessageTrailer = "grpc-message";

internal const string IdentityGrpcEncoding = "identity";

internal const string MessageAcceptEncodingHeader = "grpc-accept-encoding";

internal const string CompressionRequestAlgorithmHeader = "grpc-internal-encoding-request";

internal static readonly List<ICompressionProvider> CompressionProviders = new List<ICompressionProvider>
{
new GzipCompressionProvider(System.IO.Compression.CompressionLevel.Fastest)
};

internal static readonly string MessageAcceptEncodingValue;

internal static readonly ProductInfoHeaderValue UserAgentHeader;
internal static readonly TransferCodingWithQualityHeaderValue TEHeader;

Expand All @@ -60,6 +73,8 @@ static GrpcProtocolConstants()
UserAgentHeader = ProductInfoHeaderValue.Parse(userAgent);

TEHeader = new TransferCodingWithQualityHeaderValue("trailers");

MessageAcceptEncodingValue = IdentityGrpcEncoding + "," + string.Join(',', CompressionProviders.Select(p => p.EncodingName));
}
}
}
Loading

0 comments on commit d6c8349

Please sign in to comment.