Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove per connection PipeOptions #30769

Merged
merged 1 commit into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -21,6 +20,8 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable
private readonly SocketTransportOptions _options;
private readonly MemoryPool<byte> _memoryPool;
private readonly SocketsTrace _trace;
private readonly PipeOptions _inputOptions;
private readonly PipeOptions _outputOptions;

public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILoggerFactory loggerFactory)
{
Expand All @@ -38,6 +39,16 @@ public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILogger
_memoryPool = options.Value.MemoryPoolFactory();
var logger = loggerFactory.CreateLogger("Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Client");
_trace = new SocketsTrace(logger);

var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;

// These are the same, it's either the thread pool or inline
var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
var transportScheduler = applicationScheduler;

_inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
_outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
}

public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default)
Expand All @@ -59,12 +70,11 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
var socketConnection = new SocketConnection(
socket,
_memoryPool,
PipeScheduler.ThreadPool,
_inputOptions.ReaderScheduler, // This is either threadpool or inline
_trace,
_options.MaxReadBufferSize,
_options.MaxWriteBufferSize,
_options.WaitForDataBeforeAllocatingBuffer,
_options.UnsafePreferInlineScheduling);
_inputOptions,
_outputOptions,
_options.WaitForDataBeforeAllocatingBuffer);

socketConnection.Start();
return socketConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ internal SocketConnection(Socket socket,
MemoryPool<byte> memoryPool,
PipeScheduler transportScheduler,
ISocketsTrace trace,
long? maxReadBufferSize = null,
long? maxWriteBufferSize = null,
bool waitForData = true,
bool useInlineSchedulers = false)
PipeOptions inputOptions,
PipeOptions outputOptions,
bool waitForData = true)
{
Debug.Assert(socket != null);
Debug.Assert(memoryPool != null);
Expand All @@ -60,23 +59,9 @@ internal SocketConnection(Socket socket,
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

var applicationScheduler = PipeScheduler.ThreadPool;
if (useInlineSchedulers)
{
transportScheduler = PipeScheduler.Inline;
awaiterScheduler = PipeScheduler.Inline;
applicationScheduler = PipeScheduler.Inline;
}

_receiver = new SocketReceiver(_socket, awaiterScheduler);
_sender = new SocketSender(_socket, awaiterScheduler);

maxReadBufferSize ??= 0;
maxWriteBufferSize ??= 0;

var inputOptions = new PipeOptions(MemoryPool, applicationScheduler, transportScheduler, maxReadBufferSize.Value, maxReadBufferSize.Value / 2, useSynchronizationContext: false);
var outputOptions = new PipeOptions(MemoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize.Value, maxWriteBufferSize.Value / 2, useSynchronizationContext: false);

var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);

// Set the transport and connection id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets
internal sealed class SocketConnectionListener : IConnectionListener
{
private readonly MemoryPool<byte> _memoryPool;
private readonly int _numSchedulers;
private readonly PipeScheduler[] _schedulers;
private readonly int _settingsCount;
private readonly Settings[] _settings;
private readonly ISocketsTrace _trace;
private Socket? _listenSocket;
private int _schedulerIndex;
private int _settingsIndex;
private readonly SocketTransportOptions _options;
private SafeSocketHandle? _socketHandle;

Expand All @@ -38,21 +38,43 @@ internal SocketConnectionListener(
_memoryPool = _options.MemoryPoolFactory();
var ioQueueCount = options.IOQueueCount;

var maxReadBufferSize = _options.MaxReadBufferSize ?? 0;
var maxWriteBufferSize = _options.MaxWriteBufferSize ?? 0;
var applicationScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;

if (ioQueueCount > 0)
{
_numSchedulers = ioQueueCount;
_schedulers = new IOQueue[_numSchedulers];
_settingsCount = ioQueueCount;
_settings = new Settings[_settingsCount];

for (var i = 0; i < _numSchedulers; i++)
for (var i = 0; i < _settingsCount; i++)
{
_schedulers[i] = new IOQueue();
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : new IOQueue();

_settings[i] = new Settings
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false)
};
}
}
else
{
var directScheduler = new PipeScheduler[] { PipeScheduler.ThreadPool };
_numSchedulers = directScheduler.Length;
_schedulers = directScheduler;
var transportScheduler = options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;

var directScheduler = new Settings[]
{
new Settings
{
Scheduler = transportScheduler,
InputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false),
OutputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false)
}
};

_settingsCount = directScheduler.Length;
_settings = directScheduler;
}
}

Expand Down Expand Up @@ -127,13 +149,19 @@ void BindSocket()
acceptSocket.NoDelay = _options.NoDelay;
}

var connection = new SocketConnection(acceptSocket, _memoryPool, _schedulers[_schedulerIndex], _trace,
_options.MaxReadBufferSize, _options.MaxWriteBufferSize, _options.WaitForDataBeforeAllocatingBuffer,
_options.UnsafePreferInlineScheduling);
var setting = _settings[_settingsIndex];

var connection = new SocketConnection(acceptSocket,
_memoryPool,
setting.Scheduler,
_trace,
setting.InputOptions,
setting.OutputOptions,
waitForData: _options.WaitForDataBeforeAllocatingBuffer);

connection.Start();

_schedulerIndex = (_schedulerIndex + 1) % _numSchedulers;
_settingsIndex = (_settingsIndex + 1) % _settingsCount;

return connection;
}
Expand Down Expand Up @@ -173,5 +201,12 @@ public ValueTask DisposeAsync()
_memoryPool.Dispose();
return default;
}

private class Settings
{
public PipeScheduler Scheduler { get; init; } = default!;
public PipeOptions InputOptions { get; init; } = default!;
public PipeOptions OutputOptions { get; init; } = default!;
}
Comment on lines +205 to +210
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: init + nullable is messy. Adding a ctor + readonly properties cleans things up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nullable is messy. I don't want to reset the PR for this 😄 . I will look at it afterwards if that's OK

}
}