diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs b/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs index d587b531b396..c0c6772be049 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Client/SocketConnectionFactory.cs @@ -9,7 +9,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; -using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -21,6 +20,8 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable private readonly SocketTransportOptions _options; private readonly MemoryPool _memoryPool; private readonly SocketsTrace _trace; + private readonly PipeOptions _inputOptions; + private readonly PipeOptions _outputOptions; public SocketConnectionFactory(IOptions options, ILoggerFactory loggerFactory) { @@ -38,6 +39,16 @@ public SocketConnectionFactory(IOptions options, ILogger _memoryPool = options.Value.MemoryPoolFactory(); var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Client"); _trace = new SocketsTrace(logger); + + var maxReadBufferSize = _options.MaxReadBufferSize ?? 0; + var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0; + + // These are the same, it's either the thread pool or inline + var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool; + var transportScheduler = applicationScheduler; + + _inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false); + _outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false); } public async ValueTask ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default) @@ -59,12 +70,11 @@ public async ValueTask ConnectAsync(EndPoint endpoint, Cancel var socketConnection = new SocketConnection( socket, _memoryPool, - PipeScheduler.ThreadPool, + _inputOptions.ReaderScheduler, // This is either threadpool or inline _trace, - _options.MaxReadBufferSize, - _options.MaxWriteBufferSize, - _options.WaitForDataBeforeAllocatingBuffer, - _options.UnsafePreferInlineScheduling); + _inputOptions, + _outputOptions, + _options.WaitForDataBeforeAllocatingBuffer); socketConnection.Start(); return socketConnection; diff --git a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs index 1656e5b2b44a..cf81d90e372b 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs @@ -36,10 +36,9 @@ internal SocketConnection(Socket socket, MemoryPool memoryPool, PipeScheduler transportScheduler, ISocketsTrace trace, - long? maxReadBufferSize = null, - long? maxWriteBufferSize = null, - bool waitForData = true, - bool useInlineSchedulers = false) + PipeOptions inputOptions, + PipeOptions outputOptions, + bool waitForData = true) { Debug.Assert(socket != null); Debug.Assert(memoryPool != null); @@ -60,23 +59,9 @@ internal SocketConnection(Socket socket, // https://github.com/aspnet/KestrelHttpServer/issues/2573 var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline; - var applicationScheduler = PipeScheduler.ThreadPool; - if (useInlineSchedulers) - { - transportScheduler = PipeScheduler.Inline; - awaiterScheduler = PipeScheduler.Inline; - applicationScheduler = PipeScheduler.Inline; - } - _receiver = new SocketReceiver(_socket, awaiterScheduler); _sender = new SocketSender(_socket, awaiterScheduler); - maxReadBufferSize ??= 0; - maxWriteBufferSize ??= 0; - - var inputOptions = new PipeOptions(MemoryPool, applicationScheduler, transportScheduler, maxReadBufferSize.Value, maxReadBufferSize.Value / 2, useSynchronizationContext: false); - var outputOptions = new PipeOptions(MemoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize.Value, maxWriteBufferSize.Value / 2, useSynchronizationContext: false); - var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions); // Set the transport and connection id diff --git a/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs b/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs index eb34b4062b35..0571ad4c8765 100644 --- a/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.Sockets/src/SocketConnectionListener.cs @@ -17,11 +17,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets internal sealed class SocketConnectionListener : IConnectionListener { private readonly MemoryPool _memoryPool; - private readonly int _numSchedulers; - private readonly PipeScheduler[] _schedulers; + private readonly int _settingsCount; + private readonly Settings[] _settings; private readonly ISocketsTrace _trace; private Socket? _listenSocket; - private int _schedulerIndex; + private int _settingsIndex; private readonly SocketTransportOptions _options; private SafeSocketHandle? _socketHandle; @@ -38,21 +38,43 @@ internal SocketConnectionListener( _memoryPool = _options.MemoryPoolFactory(); var ioQueueCount = options.IOQueueCount; + var maxReadBufferSize = _options.MaxReadBufferSize ?? 0; + var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0; + var applicationScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool; + if (ioQueueCount > 0) { - _numSchedulers = ioQueueCount; - _schedulers = new IOQueue[_numSchedulers]; + _settingsCount = ioQueueCount; + _settings = new Settings[_settingsCount]; - for (var i = 0; i < _numSchedulers; i++) + for (var i = 0; i < _settingsCount; i++) { - _schedulers[i] = new IOQueue(); + var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : new IOQueue(); + + _settings[i] = new Settings + { + Scheduler = transportScheduler, + InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false), + OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false) + }; } } else { - var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool }; - _numSchedulers = directScheduler.Length; - _schedulers = directScheduler; + var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool; + + var directScheduler = new Settings[] + { + new Settings + { + Scheduler = transportScheduler, + InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false), + OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false) + } + }; + + _settingsCount = directScheduler.Length; + _settings = directScheduler; } } @@ -127,13 +149,19 @@ void BindSocket() acceptSocket.NoDelay = _options.NoDelay; } - var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[_schedulerIndex], _trace, - _options.MaxReadBufferSize, _options.MaxWriteBufferSize, _options.WaitForDataBeforeAllocatingBuffer, - _options.UnsafePreferInlineScheduling); + var setting = _settings[_settingsIndex]; + + var connection = new SocketConnection(acceptSocket, + _memoryPool, + setting.Scheduler, + _trace, + setting.InputOptions, + setting.OutputOptions, + waitForData: _options.WaitForDataBeforeAllocatingBuffer); connection.Start(); - _schedulerIndex = (_schedulerIndex + 1) % _numSchedulers; + _settingsIndex = (_settingsIndex + 1) % _settingsCount; return connection; } @@ -173,5 +201,12 @@ public ValueTask DisposeAsync() _memoryPool.Dispose(); return default; } + + private class Settings + { + public PipeScheduler Scheduler { get; init; } = default!; + public PipeOptions InputOptions { get; init; } = default!; + public PipeOptions OutputOptions { get; init; } = default!; + } } }