Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional HTTP/2 connections created when active streams limit is reached #38748

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2708f8b
SocketsHttpHandler supports multiple HTTP/2 connections
alnikola Jun 23, 2020
9852b24
Connection invalidation fixed
alnikola Jun 24, 2020
d97e37e
Reduce test connection timeout
alnikola Jun 24, 2020
1f0e0f2
- Code duplication removed
alnikola Jul 3, 2020
687d7cd
Simplified connection add and invalidate logic
alnikola Jul 6, 2020
9b1fb8f
Advanced request scheduling on multiple HTTP/2 connections algorithm …
alnikola Jul 9, 2020
8fb4645
Retry message removed
alnikola Jul 9, 2020
573acf6
Various small comments resolved
alnikola Jul 9, 2020
8683215
Hardened concurrent updates to shared fields to make them safe on wea…
alnikola Jul 10, 2020
6ed481b
Merged with master
alnikola Jul 10, 2020
18e14dc
NetEventSource.IsEnabled call fixed
alnikola Jul 13, 2020
21b4208
Increasing the maximum of concurrent streams on an existing connectio…
alnikola Jul 15, 2020
e073212
- Http2Connection.AcquireStreamSlot doesn't check CreditManager.Curre…
alnikola Jul 15, 2020
3cc80f6
Increase connection timeout and reduce request count to harden and sp…
alnikola Jul 16, 2020
29e3590
Logging to debug hanging test on CI
alnikola Jul 16, 2020
adae2ed
Fix test logging
alnikola Jul 16, 2020
ef22afe
Turn VerifySendTasks into an instance method
alnikola Jul 16, 2020
e200f30
More test logging
alnikola Jul 16, 2020
90d2bce
Release stream slot before retry
alnikola Jul 16, 2020
1278481
Add ConfigureAwait(false) to all awaits in tests
alnikola Jul 16, 2020
b546ae4
More logging and timeouts
alnikola Jul 17, 2020
11a171d
Remove ReleaseStreamSlot call when no credits available
alnikola Jul 17, 2020
e36cfe0
Test logging removed
alnikola Jul 17, 2020
73c38bb
Missing ConfigureAwait(false) added
alnikola Jul 17, 2020
dce2cb3
Fix retry triggered under new connection construction lock
alnikola Jul 17, 2020
1896ef1
- Stream slot is correctly acquired after connection was created
alnikola Jul 18, 2020
2064bcf
Merge branch 'master' into alnikola/35088-more-http2-conn-after-strea…
alnikola Jul 18, 2020
2436bd4
Hanging tests fixed
alnikola Jul 19, 2020
1dac8b4
More hanging tests fixes
alnikola Jul 20, 2020
557decd
Logging
alnikola Jul 20, 2020
0607bcf
PrepareConnection makes sure the new MaxConcurrentStreams value is ap…
alnikola Jul 21, 2020
b177384
Expired connections removal in CleanCacheAndDisposeIfUnused fixed
alnikola Jul 21, 2020
7f0dcf2
Fix asserts
alnikola Jul 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@ public class Http2LoopbackConnection : GenericLoopbackConnection
private Stream _connectionStream;
private TaskCompletionSource<bool> _ignoredSettingsAckPromise;
private bool _ignoreWindowUpdates;
public static TimeSpan Timeout => Http2LoopbackServer.Timeout;
private readonly TimeSpan _timeout;
private int _lastStreamId;

private readonly byte[] _prefix;
public string PrefixString => Encoding.UTF8.GetString(_prefix, 0, _prefix.Length);
public bool IsInvalid => _connectionSocket == null;
public Stream Stream => _connectionStream;
public Task<bool> SettingAckWaiter => _ignoredSettingsAckPromise?.Task;

public Http2LoopbackConnection(Socket socket, Http2Options httpOptions)
: this(socket, httpOptions, Http2LoopbackServer.Timeout)
{
}

public Http2LoopbackConnection(Socket socket, Http2Options httpOptions, TimeSpan timeout)
{
_connectionSocket = socket;
_connectionStream = new NetworkStream(_connectionSocket, true);
_timeout = timeout;

if (httpOptions.UseSsl)
{
Expand Down Expand Up @@ -81,12 +88,12 @@ public async Task SendConnectionPrefaceAsync()
await WriteFrameAsync(emptySettings).ConfigureAwait(false);

// Receive and ACK the client settings frame.
Frame clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
clientSettings.Flags = clientSettings.Flags | FrameFlags.Ack;
await WriteFrameAsync(clientSettings).ConfigureAwait(false);

// Receive the client ACK of the server settings frame.
clientSettings = await ReadFrameAsync(Timeout).ConfigureAwait(false);
clientSettings = await ReadFrameAsync(_timeout).ConfigureAwait(false);
}

public async Task WriteFrameAsync(Frame frame)
Expand Down Expand Up @@ -225,7 +232,7 @@ public void IgnoreWindowUpdates()

public async Task ReadRstStreamAsync(int streamId)
{
Frame frame = await ReadFrameAsync(Timeout);
Frame frame = await ReadFrameAsync(_timeout);

if (frame == null)
{
Expand All @@ -248,7 +255,7 @@ public async Task WaitForClientDisconnectAsync(bool ignoreUnexpectedFrames = fal
{
IgnoreWindowUpdates();

Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame != null)
{
if (!ignoreUnexpectedFrames)
Expand Down Expand Up @@ -310,7 +317,7 @@ public async Task<int> ReadRequestHeaderAsync()
public async Task<HeadersFrame> ReadRequestHeaderFrameAsync()
{
// Receive HEADERS frame for request.
Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null)
{
throw new IOException("Failed to read Headers frame.");
Expand Down Expand Up @@ -476,7 +483,7 @@ public async Task<byte[]> ReadBodyAsync(bool expectEndOfStream = false)

do
{
frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null && expectEndOfStream)
{
break;
Expand Down Expand Up @@ -516,7 +523,7 @@ public async Task<byte[]> ReadBodyAsync(bool expectEndOfStream = false)
HttpRequestData requestData = new HttpRequestData();

// Receive HEADERS frame for request.
Frame frame = await ReadFrameAsync(Timeout).ConfigureAwait(false);
Frame frame = await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (frame == null)
{
throw new IOException("Failed to read Headers frame.");
Expand Down Expand Up @@ -567,7 +574,7 @@ public async Task PingPong()
byte[] pingData = new byte[8] { 1, 2, 3, 4, 50, 60, 70, 80 };
PingFrame ping = new PingFrame(pingData, FrameFlags.None, 0);
await WriteFrameAsync(ping).ConfigureAwait(false);
PingFrame pingAck = (PingFrame)await ReadFrameAsync(Timeout).ConfigureAwait(false);
PingFrame pingAck = (PingFrame)await ReadFrameAsync(_timeout).ConfigureAwait(false);
if (pingAck == null || pingAck.Type != FrameType.Ping || !pingAck.AckFlag)
{
throw new Exception("Expected PING ACK");
Expand Down
29 changes: 22 additions & 7 deletions src/libraries/Common/tests/System/Net/Http/Http2LoopbackServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ private void RemoveInvalidConnections()
_connections.RemoveAll((c) => c.IsInvalid);
}

public async Task<Http2LoopbackConnection> AcceptConnectionAsync()
public Task<Http2LoopbackConnection> AcceptConnectionAsync()
{
return AcceptConnectionAsync(null);
}

public async Task<Http2LoopbackConnection> AcceptConnectionAsync(TimeSpan? timeout)
{
RemoveInvalidConnections();

Expand All @@ -85,7 +90,7 @@ public async Task<Http2LoopbackConnection> AcceptConnectionAsync()

Socket connectionSocket = await _listenSocket.AcceptAsync().ConfigureAwait(false);

Http2LoopbackConnection connection = new Http2LoopbackConnection(connectionSocket, _options);
Http2LoopbackConnection connection = timeout != null ? new Http2LoopbackConnection(connectionSocket, _options, timeout.Value) : new Http2LoopbackConnection(connectionSocket, _options);
_connections.Add(connection);

return connection;
Expand All @@ -96,15 +101,25 @@ public override async Task<GenericLoopbackConnection> EstablishGenericConnection
return await EstablishConnectionAsync();
}

public async Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
public Task<Http2LoopbackConnection> EstablishConnectionAsync(params SettingsEntry[] settingsEntries)
{
(Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(settingsEntries).ConfigureAwait(false);
return EstablishConnectionAsync(null, null, settingsEntries);
}

public async Task<Http2LoopbackConnection> EstablishConnectionAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
(Http2LoopbackConnection connection, _) = await EstablishConnectionGetSettingsAsync(timeout, ackTimeout, settingsEntries).ConfigureAwait(false);
return connection;
}

public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
public Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(params SettingsEntry[] settingsEntries)
{
return EstablishConnectionGetSettingsAsync(null, null, settingsEntries);
}

public async Task<(Http2LoopbackConnection, SettingsFrame)> EstablishConnectionGetSettingsAsync(TimeSpan? timeout, TimeSpan? ackTimeout, params SettingsEntry[] settingsEntries)
{
Http2LoopbackConnection connection = await AcceptConnectionAsync().ConfigureAwait(false);
Http2LoopbackConnection connection = await AcceptConnectionAsync(timeout).ConfigureAwait(false);

// Receive the initial client settings frame.
Frame receivedFrame = await connection.ReadFrameAsync(Timeout).ConfigureAwait(false);
Expand All @@ -129,7 +144,7 @@ public async Task<Http2LoopbackConnection> EstablishConnectionAsync(params Setti
await connection.WriteFrameAsync(settingsAck).ConfigureAwait(false);

// The client will send us a SETTINGS ACK eventually, but not necessarily right away.
await connection.ExpectSettingsAckAsync();
await connection.ExpectSettingsAckAsync((int)(ackTimeout?.TotalMilliseconds ?? 5000));

return (connection, clientSettingsFrame);
}
Expand Down
1 change: 1 addition & 0 deletions src/libraries/System.Net.Http/ref/System.Net.Http.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public SocketsHttpHandler() { }
protected override void Dispose(bool disposing) { }
protected internal override System.Net.Http.HttpResponseMessage Send(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal override System.Threading.Tasks.Task<System.Net.Http.HttpResponseMessage> SendAsync(System.Net.Http.HttpRequestMessage request, System.Threading.CancellationToken cancellationToken) { throw null; }
public int MaxHttp2ConnectionsPerServer { get { throw null; } set { } }
}
public partial class StreamContent : System.Net.Http.HttpContent
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,11 @@ public TimeSpan Expect100ContinueTimeout

protected internal override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, CancellationToken cancellationToken) => throw new PlatformNotSupportedException();

public int MaxHttp2ConnectionsPerServer
{
get => throw new PlatformNotSupportedException();
set => throw new PlatformNotSupportedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ internal enum RequestRetryType
/// <summary>
/// The proxy failed, so the request should be retried on the next proxy.
/// </summary>
RetryOnNextProxy
RetryOnNextProxy,

/// The HTTP/2 connection reached the maximum number of streams and
/// another HTTP/2 connection must be created or found to serve the request.
RetryOnNextConnection
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ private object SyncObject
get => this;
}

public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
public bool TryRequestCreditNoWait(int amount)
alnikola marked this conversation as resolved.
Show resolved Hide resolved
{
lock (SyncObject)
{
if (_disposed)
{
throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
}
return TryRequestCreditNoLock(amount) > 0;
}
}

public ValueTask<int> RequestCreditAsync(int amount, CancellationToken cancellationToken)
{
lock (SyncObject)
{
// If we can satisfy the request with credit already available, do so synchronously.
if (_current > 0)
{
Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");
int granted = TryRequestCreditNoLock(amount);

int granted = Math.Min(amount, _current);
if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
_current -= granted;
if (granted > 0)
{
return new ValueTask<int>(granted);
}

Expand Down Expand Up @@ -155,5 +155,27 @@ public void Dispose()
}
}
}

private int TryRequestCreditNoLock(int amount)
{
Debug.Assert(Monitor.IsEntered(SyncObject), "Shouldn't be called outside lock.");

if (_disposed)
alnikola marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ObjectDisposedException($"{nameof(CreditManager)}:{_owner.GetType().Name}:{_name}");
}

if (_current > 0)
{
Debug.Assert(_waitersTail is null, "Shouldn't have waiters when credit is available");

int granted = Math.Min(amount, _current);
if (NetEventSource.Log.IsEnabled()) _owner.Trace($"{_name}. requested={amount}, current={_current}, granted={granted}");
_current -= granted;
return granted;
}
alnikola marked this conversation as resolved.
Show resolved Hide resolved

return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ internal sealed partial class Http2Connection : HttpConnectionBase, IDisposable

private readonly Channel<WriteQueueEntry> _writeChannel;
private bool _lastPendingWriterShouldFlush;
private int _freeStreamSlots;
alnikola marked this conversation as resolved.
Show resolved Hide resolved
// _maxConcurrentStreams == int.MaxValue in the beginning because SETTINGS frame is sent asynchronously, but we want to avoid too much retries,
// thus allow requests under the most commont stream limit to proceed.
// Once SETTINGS is received, _freeStreamSlots will be set to the actual value and requests above the limit will be retried.
// It takes an effect only if multiple HTTP/2 connections is enabled on the pool.
private const int InitialSlotsLimit = 100;

// This means that the pool has disposed us, but there may still be
// requests in flight that will continue to be processed.
Expand Down Expand Up @@ -115,11 +121,44 @@ public Http2Connection(HttpConnectionPool pool, Stream stream)
_maxConcurrentStreams = int.MaxValue;
_pendingWindowUpdate = 0;

if (_pool.EnableMultipleHttp2Connections)
{
_freeStreamSlots = InitialSlotsLimit;
}

if (NetEventSource.Log.IsEnabled()) TraceConnection(stream);
}

private object SyncObject => _httpStreams;

public bool AcquireStreamSlot()
{
int currentSlots = Volatile.Read(ref _freeStreamSlots);

while (true)
{
if (currentSlots == 0)
{
return false;
}

int reducedSlots = currentSlots - 1;
int oldFreeStreamSlots = Interlocked.CompareExchange(ref _freeStreamSlots, reducedSlots, currentSlots);
if (oldFreeStreamSlots == currentSlots)
{
// Slot was successfully acquired.
return true;
}
currentSlots = oldFreeStreamSlots;
// Other thread already updated _freeStreamSlots, so let's try acquiring a free slot again.
}
}

private void ReleaseStreamSlot()
{
Interlocked.Increment(ref _freeStreamSlots);
}

public async ValueTask SetupAsync()
{
_outgoingBuffer.EnsureAvailableSpace(s_http2ConnectionPreface.Length +
Expand Down Expand Up @@ -598,6 +637,18 @@ private void ChangeMaxConcurrentStreams(uint newValue)
int delta = effectiveValue - _maxConcurrentStreams;
_maxConcurrentStreams = effectiveValue;

// On the initial settings frame we have to set the value received from the server
// because delta calculation doesn't make sense in this case.
if (_expectingSettingsAck)
{
Volatile.Write(ref _freeStreamSlots, effectiveValue);
_pool.StreamSlotAvailable(effectiveValue);
}
else
{
Interlocked.Add(ref _freeStreamSlots, delta);
_pool.StreamSlotAvailable(delta);
}
_concurrentStreams.AdjustCredit(delta);
}

Expand Down Expand Up @@ -1203,7 +1254,17 @@ private async ValueTask<Http2Stream> SendHeadersAsync(HttpRequestMessage request
// in order to avoid consuming resources in potentially many requests waiting for access.
try
{
await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
if (_pool.EnableMultipleHttp2Connections)
{
if (!_concurrentStreams.TryRequestCreditNoWait(1))
{
throw new HttpRequestException(null, null, RequestRetryType.RetryOnNextConnection);
}
}
else
{
await _concurrentStreams.RequestCreditAsync(1, cancellationToken).ConfigureAwait(false);
}
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -1314,7 +1375,9 @@ await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, headerBytes, end
}
catch
{
ReleaseStreamSlot();
_concurrentStreams.AdjustCredit(1);
_pool.StreamSlotAvailable(1);
throw;
}
finally
Expand Down Expand Up @@ -1804,7 +1867,9 @@ private void RemoveStream(Http2Stream http2Stream)
}
}

ReleaseStreamSlot();
_concurrentStreams.AdjustCredit(1);
_pool.StreamSlotAvailable(1);
}

public sealed override string ToString() => $"{nameof(Http2Connection)}({_pool})"; // Description for diagnostic purposes
Expand Down
Loading