Skip to content

Commit

Permalink
Remove per connection PipeOptions (#30769)
Browse files Browse the repository at this point in the history
- This changes the socket transport to remove the per connection allocation of PipeOptions.
  • Loading branch information
davidfowl authored Mar 9, 2021
1 parent 6b5f156 commit 7e33542
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -21,6 +20,8 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable
private readonly SocketTransportOptions _options;
private readonly MemoryPool<byte> _memoryPool;
private readonly SocketsTrace _trace;
private readonly PipeOptions _inputOptions;
private readonly PipeOptions _outputOptions;

public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILoggerFactory loggerFactory)
{
Expand All @@ -38,6 +39,16 @@ public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILogger
_memoryPool = options.Value.MemoryPoolFactory();
var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Client");
_trace = new SocketsTrace(logger);

var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;

// These are the same, it's either the thread pool or inline
var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
var transportScheduler = applicationScheduler;

_inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
_outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
}

public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default)
Expand All @@ -59,12 +70,11 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
var socketConnection = new SocketConnection(
socket,
_memoryPool,
PipeScheduler.ThreadPool,
_inputOptions.ReaderScheduler, // This is either threadpool or inline
_trace,
_options.MaxReadBufferSize,
_options.MaxWriteBufferSize,
_options.WaitForDataBeforeAllocatingBuffer,
_options.UnsafePreferInlineScheduling);
_inputOptions,
_outputOptions,
_options.WaitForDataBeforeAllocatingBuffer);

socketConnection.Start();
return socketConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ internal SocketConnection(Socket socket,
MemoryPool<byte> memoryPool,
PipeScheduler transportScheduler,
ISocketsTrace trace,
long? maxReadBufferSize = null,
long? maxWriteBufferSize = null,
bool waitForData = true,
bool useInlineSchedulers = false)
PipeOptions inputOptions,
PipeOptions outputOptions,
bool waitForData = true)
{
Debug.Assert(socket != null);
Debug.Assert(memoryPool != null);
Expand All @@ -60,23 +59,9 @@ internal SocketConnection(Socket socket,
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

var applicationScheduler = PipeScheduler.ThreadPool;
if (useInlineSchedulers)
{
transportScheduler = PipeScheduler.Inline;
awaiterScheduler = PipeScheduler.Inline;
applicationScheduler = PipeScheduler.Inline;
}

_receiver = new SocketReceiver(_socket, awaiterScheduler);
_sender = new SocketSender(_socket, awaiterScheduler);

maxReadBufferSize ??= 0;
maxWriteBufferSize ??= 0;

var inputOptions = new PipeOptions(MemoryPool, applicationScheduler, transportScheduler, maxReadBufferSize.Value, maxReadBufferSize.Value / 2, useSynchronizationContext: false);
var outputOptions = new PipeOptions(MemoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize.Value, maxWriteBufferSize.Value / 2, useSynchronizationContext: false);

var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);

// Set the transport and connection id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
internal sealed class SocketConnectionListener : IConnectionListener
{
private readonly MemoryPool<byte> _memoryPool;
private readonly int _numSchedulers;
private readonly PipeScheduler[] _schedulers;
private readonly int _settingsCount;
private readonly Settings[] _settings;
private readonly ISocketsTrace _trace;
private Socket? _listenSocket;
private int _schedulerIndex;
private int _settingsIndex;
private readonly SocketTransportOptions _options;
private SafeSocketHandle? _socketHandle;

Expand All @@ -38,21 +38,43 @@ internal SocketConnectionListener(
_memoryPool = _options.MemoryPoolFactory();
var ioQueueCount = options.IOQueueCount;

var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
var applicationScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;

if (ioQueueCount > 0)
{
_numSchedulers = ioQueueCount;
_schedulers = new IOQueue[_numSchedulers];
_settingsCount = ioQueueCount;
_settings = new Settings[_settingsCount];

for (var i = 0; i < _numSchedulers; i++)
for (var i = 0; i < _settingsCount; i++)
{
_schedulers[i] = new IOQueue();
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : new IOQueue();

_settings[i] = new Settings
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false)
};
}
}
else
{
var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool };
_numSchedulers = directScheduler.Length;
_schedulers = directScheduler;
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;

var directScheduler = new Settings[]
{
new Settings
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false)
}
};

_settingsCount = directScheduler.Length;
_settings = directScheduler;
}
}

Expand Down Expand Up @@ -127,13 +149,19 @@ void BindSocket()
acceptSocket.NoDelay = _options.NoDelay;
}

var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[_schedulerIndex], _trace,
_options.MaxReadBufferSize, _options.MaxWriteBufferSize, _options.WaitForDataBeforeAllocatingBuffer,
_options.UnsafePreferInlineScheduling);
var setting = _settings[_settingsIndex];

var connection = new SocketConnection(acceptSocket,
_memoryPool,
setting.Scheduler,
_trace,
setting.InputOptions,
setting.OutputOptions,
waitForData: _options.WaitForDataBeforeAllocatingBuffer);

connection.Start();

_schedulerIndex = (_schedulerIndex + 1) % _numSchedulers;
_settingsIndex = (_settingsIndex + 1) % _settingsCount;

return connection;
}
Expand Down Expand Up @@ -173,5 +201,12 @@ public ValueTask DisposeAsync()
_memoryPool.Dispose();
return default;
}

private class Settings
{
public PipeScheduler Scheduler { get; init; } = default!;
public PipeOptions InputOptions { get; init; } = default!;
public PipeOptions OutputOptions { get; init; } = default!;
}
}
}

0 comments on commit 7e33542

Please sign in to comment.