Skip to content

Commit

Permalink
[retry only] Additional HTTP/2 connections created when active stream…
Browse files Browse the repository at this point in the history
…s limit is reached (#39439)

HTTP/2 standard commands clients to not open more than one HTTP/2 connection to the same server. At the same time, server has right to limit the maximum number of active streams per that HTTP/2 connection. These two directives combined impose limit on the number of requests concurrently send to the server. This limitation is justified in client to server scenarios, but become a bottleneck in server to server cases like gRPC. This PR introduces a new SocketsHttpHandler API enabling establishing additional HTTP/2 connections to the same server when the maximum stream limit is reached on the existing ones.

**Note**. This algorithm version uses only retries to make request choose another  connection when all stream slots are occupied. It does not implement stream credit management in `HttpConnectionPool` and therefore exhibit a sub-optimal request scheduling behavior in "request burst" and "infinite requests" scenarios.

Fixes #35088
  • Loading branch information
alnikola authored Jul 28, 2020
1 parent 1d05f27 commit 43670d5
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@ public class Http2LoopbackConnection : GenericLoopbackConnection
private Stream _connectionStream;
private TaskCompletionSource<bool> _ignoredSettingsAckPromise;
private bool _ignoreWindowUpdates;
public static TimeSpan Timeout => Http2LoopbackServer.Timeout;
private readonly TimeSpan _timeout;
private int _lastStreamId;

private readonly byte[] _prefix;
public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length);
public bool IsInvalid => _connectionSocket == null;
public Stream Stream => _connectionStream;
public Task<bool> SettingAckWaiter => _ignoredSettingsAckPromise?.Task;

public Http2LoopbackConnection(Socket socket, Http2Options httpOptions)
: this(socket, httpOptions, Http2LoopbackServer.Timeout)
{
}

public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout)
{
_connectionSocket = socket;
_connectionStream = new NetworkStream(_connectionSocket, true);
_timeout = timeout;

if (httpOptions.UseSsl)
{
Expand Down Expand Up @@ -81,12 +88,12 @@ public async Task SendConnectionPrefaceAsync()
await WriteFrameAsync(emptySettings).ConfigureAwait(false);

// Receive and ACK the client settings frame.
Frame clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
clientSettings.Flags = clientSettings.Flags | FrameFlags.Ack;
await WriteFrameAsync(clientSettings).ConfigureAwait(false);

// Receive the client ACK of the server settings frame.
clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
}

public async Task WriteFrameAsync(Frame frame)
Expand Down Expand Up @@ -225,7 +232,7 @@ public void IgnoreWindowUpdates()

public async Task ReadRstStreamAsync(int streamId)
{
Frame frame = await ReadFrameAsync(Timeout);
Frame frame = await ReadFrameAsync(_timeout);

if (frame == null)
{
Expand All @@ -248,7 +255,7 @@ public async Task WaitForClientDisconnectAsync(bool ignoreUnexpectedFrames = fal
{
IgnoreWindowUpdates();

Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame != null)
{
if (!ignoreUnexpectedFrames)
Expand Down Expand Up @@ -310,7 +317,7 @@ public async Task<int> ReadRequestHeaderAsync()
public async Task<HeadersFrame> ReadRequestHeaderFrameAsync()
{
// Receive HEADERS frame for request.
Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null)
{
throw new IOException("Failed to read Headers frame.");
Expand Down Expand Up @@ -476,7 +483,7 @@ public async Task<byte[]> ReadBodyAsync(bool expectEndOfStream = false)

do
{
frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null && expectEndOfStream)
{
break;
Expand Down Expand Up @@ -516,7 +523,7 @@ public async Task<byte[]> ReadBodyAsync(bool expectEndOfStream = false)
HttpRequestData requestData = new HttpRequestData();

// Receive HEADERS frame for request.
Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null)
{
throw new IOException("Failed to read Headers frame.");
Expand Down Expand Up @@ -567,7 +574,7 @@ public async Task PingPong()
byte[] pingData = new byte[8] { 1, 2, 3, 4, 50, 60, 70, 80 };
PingFrame ping = new PingFrame(pingData, FrameFlags.None, 0);
await WriteFrameAsync(ping).ConfigureAwait(false);
PingFrame pingAck = (PingFrame)await ReadFrameAsync(Timeout).ConfigureAwait(false);
PingFrame pingAck = (PingFrame)await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (pingAck == null || pingAck.Type != FrameType.Ping || !pingAck.AckFlag)
{
throw new Exception("Expected PING ACK");
Expand Down
29 changes: 22 additions & 7 deletions src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ private void RemoveInvalidConnections()
_connections.RemoveAll((c) => c.IsInvalid);
}

public async Task<Http2LoopbackConnection> AcceptConnectionAsync()
public Task<Http2LoopbackConnection> AcceptConnectionAsync()
{
return AcceptConnectionAsync(null);
}

public async Task<Http2LoopbackConnection> AcceptConnectionAsync(TimeSpan? timeout)
{
RemoveInvalidConnections();

Expand All @@ -85,7 +90,7 @@ public async Task<Http2LoopbackConnection> AcceptConnectionAsync()

Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false);

Http2LoopbackConnection connection = new Http2LoopbackConnection(connectionSocket, _options);
Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options);
_connections.Add(connection);

return connection;
Expand All @@ -96,15 +101,25 @@ public override async Task<GenericLoopbackConnection> EstablishGenericConnection
return await EstablishConnectionAsync();
}

public async Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
public Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
{
(Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(settingsEntries).ConfigureAwait(false);
return EstablishConnectionAsync(null, null, settingsEntries);
}

public async Task<Http2LoopbackConnection> EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
(Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(timeout, ackTimeout, settingsEntries).ConfigureAwait(false);
return connection;
}

public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
{
return EstablishConnectionGetSettingsAsync(null, null, settingsEntries);
}

public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
Http2LoopbackConnection connection = await AcceptConnectionAsync().ConfigureAwait(false);
Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false);

// Receive the initial client settings frame.
Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
Expand All @@ -129,7 +144,7 @@ public async Task<Http2LoopbackConnection> EstablishConnectionAsync(params Setti
await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false);

// The client will send us a SETTINGS ACK eventually, but not necessarily right away.
await connection.ExpectSettingsAckAsync();
await connection.ExpectSettingsAckAsync((int) (ackTimeout?.TotalMilliseconds ?? 5000));

return (connection, clientSettingsFrame);
}
Expand Down
1 change: 1 addition & 0 deletions src/libraries/System.Net.Http/ref/System.Net.Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public SocketsHttpHandler() { }
protected override void Dispose(bool disposing) { }
protected internal override System.Net.Http.HttpResponseMessage Send(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal override System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage> SendAsync(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
public bool EnableMultipleHttp2Connections { get { throw null; } set { } }
}
public partial class StreamContent : System.Net.Http.HttpContent
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,11 @@ public TimeSpan Expect100ContinueTimeout

protected internal override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, CancellationToken cancellationToken) => throw new PlatformNotSupportedException();

public bool EnableMultipleHttp2Connections
{
get => throw new PlatformNotSupportedException();
set => throw new PlatformNotSupportedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ internal enum RequestRetryType
/// <summary>
/// The proxy failed, so the request should be retried on the next proxy.
/// </summary>
RetryOnNextProxy
RetryOnNextProxy,

/// The HTTP/2 connection reached the maximum number of streams and
/// another HTTP/2 connection must be created or found to serve the request.
RetryOnNextConnection
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,32 @@ public CreditManager(IHttpTrace owner, string name, int initialCredit)
_current = initialCredit;
}

public bool IsCreditAvailable => Volatile.Read(ref _current) > 0;

private object SyncObject
{
// Generally locking on "this" is considered poor form, but this type is internal,
// and it's unnecessary overhead to allocate another object just for this purpose.
get => this;
}

public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
public bool TryRequestCreditNoWait(int amount)
{
lock (SyncObject)
{
if (_disposed)
{
throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
}
return TryRequestCreditNoLock(amount) > 0;
}
}

public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
{
lock (SyncObject)
{
// If we can satisfy the request with credit already available, do so synchronously.
if (_current > 0)
{
Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");
int granted = TryRequestCreditNoLock(amount);

int granted = Math.Min(amount, _current);
if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
_current -= granted;
if (granted > 0)
{
return new ValueTask<int>(granted);
}

Expand Down Expand Up @@ -155,5 +157,26 @@ public void Dispose()
}
}
}

private int TryRequestCreditNoLock(int amount)
{
Debug.Assert(Monitor.IsEntered(SyncObject), "Shouldn't be called outside lock.");

if (_disposed)
{
throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
}

if (_current > 0)
{
Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");

int granted = Math.Min(amount, _current);
if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
_current -= granted;
return granted;
}
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,15 @@ public Http2Connection(HttpConnectionPool pool, Stream stream)
_initialWindowSize = DefaultInitialWindowSize;
_maxConcurrentStreams = int.MaxValue;
_pendingWindowUpdate = 0;
_idleSinceTickCount = Environment.TickCount64;

if (NetEventSource.Log.IsEnabled()) TraceConnection(stream);
}

private object SyncObject => _httpStreams;

public bool CanAddNewStream => _concurrentStreams.IsCreditAvailable;

public async ValueTask SetupAsync()
{
_outgoingBuffer.EnsureAvailableSpace(s_http2ConnectionPreface.Length +
Expand Down Expand Up @@ -1203,7 +1206,17 @@ private async ValueTask<Http2Stream> SendHeadersAsync(HttpRequestMessage request
// in order to avoid consuming resources in potentially many requests waiting for access.
try
{
await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
if (_pool.EnableMultipleHttp2Connections)
{
if (!_concurrentStreams.TryRequestCreditNoWait(1))
{
throw new HttpRequestException(null, null, RequestRetryType.RetryOnNextConnection);
}
}
else
{
await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
Expand Down
Loading

0 comments on commit 43670d5

Please sign in to comment.