Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool SocketSenders #30771

Merged
merged 8 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Servers/Kestrel/Kestrel.slnf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"src\\Servers\\Kestrel\\test\\Sockets.BindTests\\Sockets.BindTests.csproj",
"src\\Servers\\Kestrel\\test\\Sockets.FunctionalTests\\Sockets.FunctionalTests.csproj",
"src\\Servers\\Kestrel\\tools\\CodeGenerator\\CodeGenerator.csproj",
"src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj"
"src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj",
"src\\Testing\\src\\Microsoft.AspNetCore.Testing.csproj"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable
private readonly SocketsTrace _trace;
private readonly PipeOptions _inputOptions;
private readonly PipeOptions _outputOptions;
private readonly SocketSenderPool _socketSenderPool;

public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILoggerFactory loggerFactory)
{
Expand All @@ -46,9 +47,12 @@ public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILogger
// These are the same, it's either the thread pool or inline
var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
var transportScheduler = applicationScheduler;
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;
halter73 marked this conversation as resolved.
Show resolved Hide resolved

_inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
_outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
_socketSenderPool = new SocketSenderPool(awaiterScheduler);
}

public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default)
Expand All @@ -72,6 +76,7 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
_memoryPool,
_inputOptions.ReaderScheduler, // This is either threadpool or inline
_trace,
_socketSenderPool,
_inputOptions,
_outputOptions,
_options.WaitForDataBeforeAllocatingBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ internal sealed class SocketConnection : TransportConnection
private readonly Socket _socket;
private readonly ISocketsTrace _trace;
private readonly SocketReceiver _receiver;
private readonly SocketSender _sender;
private SocketSender? _sender;
private readonly SocketSenderPool _socketSenderPool;
private readonly IDuplexPipe _originalTransport;
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();

Expand All @@ -36,6 +37,7 @@ internal SocketConnection(Socket socket,
MemoryPool<byte> memoryPool,
PipeScheduler transportScheduler,
ISocketsTrace trace,
SocketSenderPool socketSenderPool,
PipeOptions inputOptions,
PipeOptions outputOptions,
bool waitForData = true)
Expand All @@ -48,6 +50,7 @@ internal SocketConnection(Socket socket,
MemoryPool = memoryPool;
_trace = trace;
_waitForData = waitForData;
_socketSenderPool = socketSenderPool;

LocalEndPoint = _socket.LocalEndPoint;
RemoteEndPoint = _socket.RemoteEndPoint;
Expand All @@ -59,8 +62,7 @@ internal SocketConnection(Socket socket,
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

_receiver = new SocketReceiver(_socket, awaiterScheduler);
_sender = new SocketSender(_socket, awaiterScheduler);
_receiver = new SocketReceiver(awaiterScheduler);

var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);

Expand Down Expand Up @@ -93,7 +95,7 @@ private async Task StartAsync()
await sendTask;

_receiver.Dispose();
_sender.Dispose();
_sender?.Dispose();
}
catch (Exception ex)
{
Expand Down Expand Up @@ -183,13 +185,13 @@ private async Task ProcessReceives()
if (_waitForData)
{
// Wait for data before allocating a buffer.
await _receiver.WaitForDataAsync();
await _receiver.WaitForDataAsync(_socket);
}

// Ensure we have some reasonable amount of buffer space
var buffer = input.GetMemory(MinAllocBufferSize);

var bytesReceived = await _receiver.ReceiveAsync(buffer);
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer);

if (bytesReceived == 0)
{
Expand Down Expand Up @@ -282,7 +284,12 @@ private async Task ProcessSends()
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await _sender.SendAsync(buffer);
_sender = _socketSenderPool.Rent();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the perf impact of renting and returning senders to the pool?

Copy link
Member Author

@davidfowl davidfowl Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my tests, I haven't seen any impact on throughput. I ran plaintext though. I'm not sure of a case where this would show up being a real problem. The contention should be low as the queues are sharded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused why we need to rent and return for each send? Can't we rent once at the beginning of ProcessSends and return after (and calling Reset accordingly).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire purpose of the change is to avoid that. Today we have one per connection, this reduces it from one per connection to one per send operation, hence why we go from 5000 in the test below to 8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So this reduces allocations with a pool as well as reducing the amount of time an object is used for.

If we have a limit of 1024, couldn't that be worse though when you get a sudden increase in traffic and then reduces? Is that something to be concerned about here?

Copy link
Member Author

@davidfowl davidfowl Mar 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically for that to happen, you would need to cause the TCP buffers in the kernel to fill up per connection by sending massive payloads and having the client not read them fast enough. The only way the number of concurrent operations exceeds the number of executing sends is when the sends themselves go async and this is very rare.

That said, the SocketSender is about 300 bytes a pop and we default to a maximum of 16 IOQueues. If by some glitch we end up with 1024 entries per IOQueue then we'd have 4MB of unfreeable memory.

await _sender.SendAsync(_socket, buffer);
// We don't return to the pool if there was an exception, and
// we keep the _sender assigned so that we can dispose it in StartAsync.
_socketSenderPool.Return(_sender);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if SendAsync errors? Should the return be in a try/finally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it errors, it's disposed (further up the stack where exceptions are caught). No it shouldn't bein a try/finally, we don't need to return things to the pool in the error case.

_sender = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make the _sender field a local var?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can dispose the sender in the error case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need some comments explaining why. Is there a test for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't mock the networking stack part of the sockets transport. There's just no need to reuse things in the face of exception, but I'll add a comment.

  • Another successful send will replenish the pool
  • The currently rented SocketAsyncEventArgs will be disposed (higher up the stack) (SocketConnection.Start)
  • The SocketAsyncEvents has a finalizer in the worst case scenario where this code becomes buggy in the future 😄

I don't see the value in making sure this gets disposed by trying to mock the SocketSender. We don't have tests making sure that the Socket itself is disposed.

}

output.AdvanceTo(end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketReceiver : SocketSenderReceiverBase
internal sealed class SocketReceiver : SocketAwaitableEventArgs
{
public SocketReceiver(Socket socket, PipeScheduler scheduler) : base(socket, scheduler)
public SocketReceiver(PipeScheduler ioScheduler) : base(ioScheduler)
{
}

public SocketAwaitableEventArgs WaitForDataAsync()
public SocketAwaitableEventArgs WaitForDataAsync(Socket socket)
{
_awaitableEventArgs.SetBuffer(Memory<byte>.Empty);
SetBuffer(Memory<byte>.Empty);

if (!_socket.ReceiveAsync(_awaitableEventArgs))
if (!socket.ReceiveAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}

public SocketAwaitableEventArgs ReceiveAsync(Memory<byte> buffer)
public SocketAwaitableEventArgs ReceiveAsync(Socket socket, Memory<byte> buffer)
{
_awaitableEventArgs.SetBuffer(buffer);
SetBuffer(buffer);

if (!_socket.ReceiveAsync(_awaitableEventArgs))
if (!socket.ReceiveAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,61 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketSender : SocketSenderReceiverBase
internal sealed class SocketSender : SocketAwaitableEventArgs
{
private List<ArraySegment<byte>>? _bufferList;

public SocketSender(Socket socket, PipeScheduler scheduler) : base(socket, scheduler)
public SocketSender(PipeScheduler scheduler) : base(scheduler)
{
}

public SocketAwaitableEventArgs SendAsync(in ReadOnlySequence<byte> buffers)
public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence<byte> buffers)
{
if (buffers.IsSingleSegment)
{
return SendAsync(buffers.First);
return SendAsync(socket, buffers.First);
}

if (!_awaitableEventArgs.MemoryBuffer.Equals(Memory<byte>.Empty))
{
_awaitableEventArgs.SetBuffer(null, 0, 0);
}

_awaitableEventArgs.BufferList = GetBufferList(buffers);
SetBufferList(buffers);

if (!_socket.SendAsync(_awaitableEventArgs))
if (!socket.SendAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}

private SocketAwaitableEventArgs SendAsync(ReadOnlyMemory<byte> memory)
public void Reset()
{
// The BufferList getter is much less expensive then the setter.
if (_awaitableEventArgs.BufferList != null)
// We clear the buffer and buffer list before we put it back into the pool
// it's a small performance hit but it removes the confusion when looking at dumps to see this still
// holds onto the buffer when it's back in the pool
if (BufferList != null)
{
BufferList = null;

_bufferList?.Clear();
}
else
{
_awaitableEventArgs.BufferList = null;
SetBuffer(null, 0, 0);
}
}

_awaitableEventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
private SocketAwaitableEventArgs SendAsync(Socket socket, ReadOnlyMemory<byte> memory)
{
SetBuffer(MemoryMarshal.AsMemory(memory));

if (!_socket.SendAsync(_awaitableEventArgs))
if (!socket.SendAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}

private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer)
private void SetBufferList(in ReadOnlySequence<byte> buffer)
{
Debug.Assert(!buffer.IsEmpty);
Debug.Assert(!buffer.IsSingleSegment);
Expand All @@ -68,18 +74,14 @@ private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer)
{
_bufferList = new List<ArraySegment<byte>>();
}
else
{
// Buffers are pooled, so it's OK to root them until the next multi-buffer write.
_bufferList.Clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much memory is expected to be used in the bufferlist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking how big the list will get?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we aren't clearing here, how much idle memory will be in the bufferlist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We clear in Return.

}

foreach (var b in buffer)
{
_bufferList.Add(b.GetArray());
}

return _bufferList;
// The act of setting this list, sets the buffers in the internal buffer list
BufferList = _bufferList;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.Threading;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal class SocketSenderPool : IDisposable
{
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you look to see how many were created with 5000 websockets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, 8 on my machine (8 cores).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 seems high to me. Is this the maximum number of sends occurring at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 seems high to me. Is this the maximum number of sends occurring at the same time?

What number do you like? A multiple of the number of cores?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this is a maximum, it doesn't mean we start with this many or that we'll have this many. It means we won't got past this many.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to be strict. I'll pick whatever number ya'll want though as long as we don't introduce locking into this code path.

Copy link
Member

@halter73 halter73 Mar 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can be strict without locking. You can use Interlocked.Increment/Decrement. ConcurrentQueue.Enqueue is already doing it's own Interlocked.Increment at a minimum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I've complicated the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there's no way that I can think of to make this accurate without locking. Review the code and let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?
// Similar to https://github.com/dotnet/aspnetcore/blob/90c45d94ac5f2127063e841964f87188fbda05bc/src/Servers/Kestrel/Transport.Libuv/src/Internal/WriteReqPool.cs#L10.
private const int MaxQueueSize = 1024;


private readonly ConcurrentQueue<SocketSender> _queue = new();
private int _count;
private readonly PipeScheduler _scheduler;
private bool _disposed;

public SocketSenderPool(PipeScheduler scheduler)
{
_scheduler = scheduler;
}

public SocketSender Rent()
{
if (_queue.TryDequeue(out var sender))
{
Interlocked.Decrement(ref _count);
return sender;
}
return new SocketSender(_scheduler);
}

public void Return(SocketSender sender)
{
// This counting isn't accurate, but it's good enough for what we need to avoid using _queue.Count which could be expensive
if (_disposed || Interlocked.Increment(ref _count) > MaxQueueSize)
{
Interlocked.Decrement(ref _count);
sender.Dispose();
return;
}

sender.Reset();
_queue.Enqueue(sender);
}

public void Dispose()
{
if (!_disposed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is thread safety an issue when disposing? What happens if there is a sender that hasn't been returned to the queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not a concern. It'll probably leak. We have that same issue with the memory pool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, in the face of ungraceful shutdown, bad things can happen with both the memory pool and now this pool.

{
_disposed = true;
while (_queue.TryDequeue(out var sender))
{
sender.Dispose();
}
}
}
}
}

This file was deleted.

Loading