Skip to content

Commit

Permalink
HTTP adjustments to StreamsAvailable callback in options
Browse files Browse the repository at this point in the history
  • Loading branch information
ManickaP committed May 16, 2024
1 parent 17df37d commit 71869db
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,30 +112,23 @@ public static async ValueTask<SslStream> EstablishSslConnectionAsync(SslClientAu
[SupportedOSPlatform("windows")]
[SupportedOSPlatform("linux")]
[SupportedOSPlatform("macos")]
public static async ValueTask<QuicConnection> ConnectQuicAsync(HttpRequestMessage request, DnsEndPoint endPoint, TimeSpan idleTimeout, SslClientAuthenticationOptions clientAuthenticationOptions, CancellationToken cancellationToken)
public static async ValueTask<QuicConnection> ConnectQuicAsync(HttpRequestMessage request, DnsEndPoint endPoint, TimeSpan idleTimeout, SslClientAuthenticationOptions clientAuthenticationOptions, QuicConnectionStreamsAvailableCallback streamsAvailableCallback, CancellationToken cancellationToken)
{
clientAuthenticationOptions = SetUpRemoteCertificateValidationCallback(clientAuthenticationOptions, request);

try
{
QuicConnection quicConnection = await QuicConnection.ConnectAsync(new QuicClientConnectionOptions()
return await QuicConnection.ConnectAsync(new QuicClientConnectionOptions()
{
MaxInboundBidirectionalStreams = 0, // Client doesn't support inbound streams: https://www.rfc-editor.org/rfc/rfc9114.html#name-bidirectional-streams. An extension might change this.
MaxInboundUnidirectionalStreams = 5, // Minimum is 3: https://www.rfc-editor.org/rfc/rfc9114.html#unidirectional-streams (1x control stream + 2x QPACK). Set to 100 if/when support for PUSH streams is added.
IdleTimeout = idleTimeout,
DefaultStreamErrorCode = (long)Http3ErrorCode.RequestCancelled,
DefaultCloseErrorCode = (long)Http3ErrorCode.NoError,
RemoteEndPoint = endPoint,
ClientAuthenticationOptions = clientAuthenticationOptions
ClientAuthenticationOptions = clientAuthenticationOptions,
StreamsAvailableCallback = streamsAvailableCallback,
}, cancellationToken).ConfigureAwait(false);

if (quicConnection.NegotiatedApplicationProtocol != SslApplicationProtocol.Http3)
{
await quicConnection.DisposeAsync().ConfigureAwait(false);
throw new HttpRequestException(HttpRequestError.ConnectionError, "QUIC connected but no HTTP/3 indicated via ALPN.", null, RequestRetryType.RetryOnConnectionFailure);
}

return quicConnection;
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,23 @@ private async Task InjectNewHttp3ConnectionAsync(RequestQueue<Http3Connection>.Q
waiter.ConnectionCancellationTokenSource = cts;
try
{
QuicConnection quicConnection = await ConnectHelper.ConnectQuicAsync(queueItem.Request, new DnsEndPoint(authority.IdnHost, authority.Port), _poolManager.Settings._pooledConnectionIdleTimeout, _sslOptionsHttp3!, cts.Token).ConfigureAwait(false);

// if the authority was sent as an option through alt-svc then include alt-used header
connection = new Http3Connection(this, authority, quicConnection, includeAltUsedHeader: _http3Authority == authority);
connection = new Http3Connection(this, authority, includeAltUsedHeader: _http3Authority == authority);

QuicConnection quicConnection = await ConnectHelper.ConnectQuicAsync(queueItem.Request, new DnsEndPoint(authority.IdnHost, authority.Port), _poolManager.Settings._pooledConnectionIdleTimeout, _sslOptionsHttp3!, connection.StreamsAvailableCallback, cts.Token).ConfigureAwait(false);
if (quicConnection.NegotiatedApplicationProtocol != SslApplicationProtocol.Http3)
{
await quicConnection.DisposeAsync().ConfigureAwait(false);
throw new HttpRequestException(HttpRequestError.ConnectionError, "QUIC connected but no HTTP/3 indicated via ALPN.", null, RequestRetryType.RetryOnConnectionFailure);
}
connection.InitQuicConnection(quicConnection);
}
catch (Exception e)
{
connectionException = e is OperationCanceledException oce && oce.CancellationToken == cts.Token && !waiter.CancelledByOriginatingRequestCompletion ?
CreateConnectTimeoutException(oce) :
e;
connection = null;
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ internal sealed partial class Http2Connection : HttpConnectionBase

private TaskCompletionSourceWithCancellation<bool>? _initialSettingsReceived;

private readonly HttpConnectionPool _pool;
private readonly Stream _stream;

// NOTE: These are mutable structs; do not make these readonly.
Expand Down Expand Up @@ -132,7 +131,6 @@ internal enum KeepAliveState
public Http2Connection(HttpConnectionPool pool, Stream stream, IPEndPoint? remoteEndPoint)
: base(pool, remoteEndPoint)
{
_pool = pool;
_stream = stream;

_incomingBuffer = new ArrayBuffer(initialSize: 0, usePool: true);
Expand Down Expand Up @@ -1793,18 +1791,6 @@ private bool ForceSendConnectionWindowUpdate()
return true;
}

public override long GetIdleTicks(long nowTicks)
{
// The pool is holding the lock as part of its scavenging logic.
// We must not lock on Http2Connection.SyncObj here as that could lead to lock ordering problems.
Debug.Assert(_pool.HasSyncObjLock);

// There is a race condition here where the connection pool may see this connection as idle right before
// we start processing a new request and start its disposal. This is okay as we will either
// return false from TryReserveStream, or process pending requests before tearing down the transport.
return _streamsInUse == 0 ? base.GetIdleTicks(nowTicks) : 0;
}

/// <summary>Abort all streams and cause further processing to fail.</summary>
/// <param name="abortException">Exception causing Abort to be called.</param>
private void Abort(Exception abortException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace System.Net.Http
[SupportedOSPlatform("macos")]
internal sealed class Http3Connection : HttpConnectionBase
{
private readonly HttpConnectionPool _pool;
private readonly HttpAuthority _authority;
private readonly byte[]? _altUsedEncodedHeader;
private QuicConnection? _connection;
Expand All @@ -33,7 +32,7 @@ internal sealed class Http3Connection : HttpConnectionBase

// Our control stream.
private QuicStream? _clientControl;
private Task _sendSettingsTask;
private Task? _sendSettingsTask;

// Server-advertised SETTINGS_MAX_FIELD_SECTION_SIZE
// https://www.rfc-editor.org/rfc/rfc9114.html#section-7.2.4.1-2.2.1
Expand All @@ -54,9 +53,8 @@ internal sealed class Http3Connection : HttpConnectionBase
public Exception? AbortException => Volatile.Read(ref _abortException);
private object SyncObj => _activeRequests;

private int _reservedStreams;
private int _availableRequestStreamsCount;
private TaskCompletionSource<bool>? _availableStreamsWaiter;
private bool _streamsAvailableRegistered;

/// <summary>
/// If true, we've received GOAWAY, are aborting due to a connection-level error, or are disposing due to pool limits.
Expand All @@ -70,15 +68,10 @@ private bool ShuttingDown
}
}


public int AvailableRequestStreamsCount => _connection?.AvailableBidirectionalStreamsCount ?? 0;

public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicConnection connection, bool includeAltUsedHeader)
: base(pool, connection.RemoteEndPoint)
public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, bool includeAltUsedHeader)
: base(pool)
{
_pool = pool;
_authority = authority;
_connection = connection;

if (includeAltUsedHeader)
{
Expand All @@ -94,6 +87,13 @@ public Http3Connection(HttpConnectionPool pool, HttpAuthority authority, QuicCon
// Use this as an initial value before we receive the SETTINGS frame.
_maxHeaderListSize = maxHeaderListSize;
}
}

public void InitQuicConnection(QuicConnection connection)
{
MarkConnectionAsEstablished(connection.RemoteEndPoint);

_connection = connection;

// Errors are observed via Abort().
_sendSettingsTask = SendSettingsAsync();
Expand Down Expand Up @@ -161,7 +161,7 @@ private void CheckForShutdown()
if (_clientControl != null)
{
await _sendSettingsTask.ConfigureAwait(false);
await _sendSettingsTask!.ConfigureAwait(false);
await _clientControl.DisposeAsync().ConfigureAwait(false);
_clientControl = null;
}
Expand All @@ -176,11 +176,18 @@ public bool TryReserveStream()
{
lock (SyncObj)
{
if (_reservedStreams >= AvailableRequestStreamsCount)
Debug.Assert(_availableRequestStreamsCount >= 0);

if (_availableRequestStreamsCount == 0)
{
return false;
}
++_reservedStreams;

if (_activeRequests.Count == 0)
{
MarkConnectionAsNotIdle();
}
--_availableRequestStreamsCount;
return true;
}
}
Expand All @@ -189,38 +196,42 @@ public void ReleaseStream()
{
lock (SyncObj)
{
Debug.Assert(_reservedStreams > 0);
--_reservedStreams;
Debug.Assert(_availableRequestStreamsCount >= 0);

++_availableRequestStreamsCount;
}
}

public void StreamsAvailableCallback(QuicConnection sender, int bidirectionalStreamsCountIncrement, int _)
{
Debug.Assert(_connection is null || sender == _connection);

lock (SyncObj)
{
Debug.Assert(_availableRequestStreamsCount >= 0);

_availableRequestStreamsCount += bidirectionalStreamsCountIncrement;
_availableStreamsWaiter?.SetResult(!ShuttingDown);
_availableStreamsWaiter = null;
}
}

public Task<bool> WaitForAvailableStreamsAsync()
{
lock (SyncObj)
{
Debug.Assert(_availableRequestStreamsCount >= 0);

if (ShuttingDown)
{
return Task.FromResult(false);
}
if (_reservedStreams < AvailableRequestStreamsCount)
if (_availableRequestStreamsCount > 0)
{
return Task.FromResult(true);
}

Debug.Assert(_availableStreamsWaiter is null);
_availableStreamsWaiter = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_streamsAvailableRegistered)
{
_connection!.StreamsAvailable += (_, _) =>
{
lock (SyncObj)
{
_availableStreamsWaiter?.SetResult(!ShuttingDown);
_availableStreamsWaiter = null;
}
};
_streamsAvailableRegistered = true;
}
return _availableStreamsWaiter.Task;
}
}
Expand All @@ -243,11 +254,6 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, lon
requestStream = new Http3RequestStream(request, this, quicStream);
lock (SyncObj)
{
if (_activeRequests.Count == 0)
{
MarkConnectionAsNotIdle();
}

_activeRequests.Add(quicStream, requestStream);
}
}
Expand Down Expand Up @@ -427,18 +433,6 @@ public void RemoveStream(QuicStream stream)
}
}

public override long GetIdleTicks(long nowTicks)
{
// The pool is holding the lock as part of its scavenging logic.
// We must not lock on Http3Connection.SyncObj here as that could lead to lock ordering problems.
Debug.Assert(_pool.HasSyncObjLock);

// There is a race condition here where the connection pool may see this connection as idle right before
// we start processing a new request and start its disposal. This is okay as we will either
// return false from TryReserveStream, or process pending requests before tearing down the transport.
return _activeRequests.Count == 0 && _reservedStreams == 0 ? base.GetIdleTicks(nowTicks) : 0;
}

public override void Trace(string message, [CallerMemberName] string? memberName = null) =>
Trace(0, message, memberName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ internal sealed partial class HttpConnection : HttpConnectionBase
private static readonly ulong s_http10Bytes = BitConverter.ToUInt64("HTTP/1.0"u8);
private static readonly ulong s_http11Bytes = BitConverter.ToUInt64("HTTP/1.1"u8);

private readonly HttpConnectionPool _pool;
internal readonly Stream _stream;
private readonly TransportContext? _transportContext;

Expand Down Expand Up @@ -77,10 +76,8 @@ public HttpConnection(
IPEndPoint? remoteEndPoint)
: base(pool, remoteEndPoint)
{
Debug.Assert(pool != null);
Debug.Assert(stream != null);

_pool = pool;
_stream = stream;

_transportContext = transportContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ namespace System.Net.Http
{
internal abstract class HttpConnectionBase : IDisposable, IHttpTrace
{
protected readonly HttpConnectionPool _pool;

private static long s_connectionCounter = -1;

// May be null if none of the counters were enabled when the connection was established.
private readonly ConnectionMetrics? _connectionMetrics;
private ConnectionMetrics? _connectionMetrics;

// Indicates whether we've counted this connection as established, so that we can
// avoid decrementing the counter once it's closed in case telemetry was enabled in between.
private readonly bool _httpTelemetryMarkedConnectionAsOpened;
private bool _httpTelemetryMarkedConnectionAsOpened;

private readonly long _creationTickCount = Environment.TickCount64;
private long _idleSinceTickCount;
private long? _idleSinceTickCount;

/// <summary>Cached string for the last Date header received on this connection.</summary>
private string? _lastDateHeaderValue;
Expand All @@ -35,13 +37,23 @@ internal abstract class HttpConnectionBase : IDisposable, IHttpTrace

public long Id { get; } = Interlocked.Increment(ref s_connectionCounter);

public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint)
public HttpConnectionBase(HttpConnectionPool pool)
{
Debug.Assert(this is HttpConnection or Http2Connection or Http3Connection);
Debug.Assert(pool.Settings._metrics is not null);
Debug.Assert(pool != null);
_pool = pool;
}
public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint)
: this(pool)
{
MarkConnectionAsEstablished(remoteEndPoint);
}

SocketsHttpHandlerMetrics metrics = pool.Settings._metrics;
protected void MarkConnectionAsEstablished(IPEndPoint? remoteEndPoint)
{
Debug.Assert(_pool.Settings._metrics is not null);

SocketsHttpHandlerMetrics metrics = _pool.Settings._metrics;
if (metrics.OpenConnections.Enabled || metrics.ConnectionDuration.Enabled)
{
// While requests may report HTTP/1.0 as the protocol, we treat all HTTP/1.X connections as HTTP/1.1.
Expand All @@ -53,9 +65,9 @@ public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint)
_connectionMetrics = new ConnectionMetrics(
metrics,
protocol,
pool.IsSecure ? "https" : "http",
pool.OriginAuthority.HostValue,
pool.IsDefaultPort ? null : pool.OriginAuthority.Port,
_pool.IsSecure ? "https" : "http",
_pool.OriginAuthority.HostValue,
_pool.IsDefaultPort ? null : _pool.OriginAuthority.Port,
remoteEndPoint?.Address?.ToString());

_connectionMetrics.ConnectionEstablished();
Expand All @@ -67,9 +79,9 @@ public HttpConnectionBase(HttpConnectionPool pool, IPEndPoint? remoteEndPoint)
{
_httpTelemetryMarkedConnectionAsOpened = true;

string scheme = pool.IsSecure ? "https" : "http";
string host = pool.OriginAuthority.HostValue;
int port = pool.OriginAuthority.Port;
string scheme = _pool.IsSecure ? "https" : "http";
string host = _pool.OriginAuthority.HostValue;
int port = _pool.OriginAuthority.Port;

if (this is HttpConnection) HttpTelemetry.Log.Http11ConnectionEstablished(Id, scheme, host, port, remoteEndPoint);
else if (this is Http2Connection) HttpTelemetry.Log.Http20ConnectionEstablished(Id, scheme, host, port, remoteEndPoint);
Expand Down Expand Up @@ -101,6 +113,7 @@ public void MarkConnectionAsIdle()

public void MarkConnectionAsNotIdle()
{
_idleSinceTickCount = null;
_connectionMetrics?.IdleStateChanged(idle: false);
}

Expand Down Expand Up @@ -146,7 +159,7 @@ protected void TraceConnection(Stream stream)

public long GetLifetimeTicks(long nowTicks) => nowTicks - _creationTickCount;

public virtual long GetIdleTicks(long nowTicks) => nowTicks - _idleSinceTickCount;
public long GetIdleTicks(long nowTicks) => _idleSinceTickCount is long idleSinceTickCount ? nowTicks - idleSinceTickCount : 0;

/// <summary>Check whether a connection is still usable, or should be scavenged.</summary>
/// <returns>True if connection can be used.</returns>
Expand Down

0 comments on commit 71869db

Please sign in to comment.