Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call Complete on SubchannelCallTracker after HttpContent has been disposed #2139

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,18 @@ protected override async Task<HttpResponseMessage> SendAsync(
{
var responseMessage = await responseMessageTask.ConfigureAwait(false);

// TODO(JamesNK): This doesn't take into account long running streams.
// If there is response content then we need to wait until it is read to the end
// or the request is disposed.
result.SubchannelCallTracker?.Complete(new CompletionContext
if (result.SubchannelCallTracker is not null)
{
Address = address
});
if (responseMessage.Content is not null)
{
responseMessage.Content = new HttpContentWrapper(responseMessage.Content,
() => result.SubchannelCallTracker.Complete(new CompletionContext { Address = address }));
}
else
{
result.SubchannelCallTracker.Complete(new CompletionContext { Address = address });
}
}

return responseMessage;
}
Expand Down
71 changes: 71 additions & 0 deletions src/Grpc.Net.Client/Balancer/Internal/HttpContentWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System.Net;

namespace Grpc.Net.Client.Balancer.Internal;

internal sealed class HttpContentWrapper : HttpContent
{
private readonly HttpContent _inner;
private readonly Action _disposeAction;
private bool _disposed;

public HttpContentWrapper(HttpContent inner, Action disposeAction)
{
_inner = inner;
_disposeAction = disposeAction;

foreach (var kvp in inner.Headers)
{
Headers.TryAddWithoutValidation(kvp.Key, kvp.Value.ToArray());
}
}

#if NET5_0_OR_GREATER

protected override void SerializeToStream(Stream stream, TransportContext? context, CancellationToken cancellationToken)
{
using var content = _inner.ReadAsStream(cancellationToken);
content.CopyTo(stream);
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken)
{
var content = await _inner.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
await using (content.ConfigureAwait(false))
{
await content.CopyToAsync(stream, cancellationToken).ConfigureAwait(false);
}
}

#endif

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
var content = await _inner.ReadAsStreamAsync().ConfigureAwait(false);
#if NET5_0_OR_GREATER
await using (content.ConfigureAwait(false))
#else
using (content)
#endif
{
await content.CopyToAsync(stream).ConfigureAwait(false);
}
}

protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing && !_disposed)
{
_disposeAction();
_inner.Dispose();
_disposed = true;
}
}
}