-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
* use struct wrapper for better perf * check the most common case first * don't access static variable in a loop * use Span instead of raw pointers * change the heuristic, single epoll thread is not always enough * simplify the heuristic and add a comment * apply the naming suggestions
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,13 +2,11 @@ | |
// The .NET Foundation licenses this file to you under the MIT license. | ||
// See the LICENSE file in the project root for more information. | ||
|
||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Diagnostics; | ||
using System.Runtime.CompilerServices; | ||
using System.Runtime.InteropServices; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace System.Net.Sockets | ||
{ | ||
|
@@ -56,22 +54,40 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) | |
|
||
private static readonly object s_lock = new object(); | ||
|
||
// In debug builds, force there to be 2 engines. In release builds, use half the number of processors when | ||
// there are at least 6. The lower bound is to avoid using multiple engines on systems which aren't servers. | ||
#pragma warning disable CA1802 // const works for debug, but needs to be static readonly for release | ||
private static readonly int s_engineCount = | ||
#if DEBUG | ||
2; | ||
#else | ||
Environment.ProcessorCount >= 6 ? Environment.ProcessorCount / 2 : 1; | ||
#endif | ||
#pragma warning restore CA1802 | ||
private static readonly int s_maxEngineCount = GetEngineCount(); | ||
|
||
private static int GetEngineCount() | ||
{ | ||
// The responsibility of SocketAsyncEngine is to get notifications from epoll|kqueue | ||
// and schedule corresponding work items to ThreadPool (socket reads and writes). | ||
// | ||
// Using TechEmpower benchmarks that generate a LOT of SMALL socket reads and writes under a VERY HIGH load | ||
// we have observed that a single engine is capable of keeping busy up to thirty x64 and eight ARM64 CPU Cores. | ||
// | ||
// The vast majority of real-life scenarios is never going to generate such a huge load (hundreds of thousands of requests per second) | ||
// and having a single producer should be almost always enough. | ||
// | ||
// We want to be sure that we can handle extreme loads and that's why we have decided to use these values. | ||
// | ||
// It's impossible to predict all possible scenarios so we have added a possibility to configure this value using environment variables. | ||
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count)) | ||
{ | ||
return (int)count; | ||
} | ||
|
||
Architecture architecture = RuntimeInformation.ProcessArchitecture; | ||
int coresPerEngine = architecture == Architecture.Arm64 || architecture == Architecture.Arm | ||
? 8 | ||
: 30; | ||
|
||
return Math.Max(1, (int)Math.Round(Environment.ProcessorCount / (double)coresPerEngine)); | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
adamsitnik
Author
Member
|
||
} | ||
|
||
// | ||
// The current engines. We replace an engine when it runs out of "handle" values. | ||
// Must be accessed under s_lock. | ||
// | ||
private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_engineCount]; | ||
private static readonly SocketAsyncEngine?[] s_currentEngines = new SocketAsyncEngine?[s_maxEngineCount]; | ||
private static int s_allocateFromEngine = 0; | ||
|
||
private readonly IntPtr _port; | ||
|
@@ -106,7 +122,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) | |
// | ||
private static readonly IntPtr MaxHandles = IntPtr.Size == 4 ? (IntPtr)int.MaxValue : (IntPtr)long.MaxValue; | ||
#endif | ||
private static readonly IntPtr MinHandlesForAdditionalEngine = s_engineCount == 1 ? MaxHandles : (IntPtr)32; | ||
private static readonly IntPtr MinHandlesForAdditionalEngine = s_maxEngineCount == 1 ? MaxHandles : (IntPtr)32; | ||
|
||
// | ||
// Sentinel handle value to identify events from the "shutdown pipe," used to signal an event loop to stop | ||
|
@@ -129,7 +145,7 @@ public bool TryRegister(SafeSocketHandle socket, out Interop.Error error) | |
// | ||
// Maps handle values to SocketAsyncContext instances. | ||
// | ||
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContext> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContext>(); | ||
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>(); | ||
|
||
// | ||
// Queue of events generated by EventLoop() that would be processed by the thread pool | ||
|
@@ -197,7 +213,7 @@ private static void AllocateToken(SocketAsyncContext context, out SocketAsyncEng | |
// Round-robin to the next engine once we have sufficient sockets on this one. | ||
if (!engine.HasLowNumberOfSockets) | ||
{ | ||
s_allocateFromEngine = (s_allocateFromEngine + 1) % s_engineCount; | ||
s_allocateFromEngine = (s_allocateFromEngine + 1) % s_maxEngineCount; | ||
} | ||
} | ||
} | ||
|
@@ -208,7 +224,8 @@ private IntPtr AllocateHandle(SocketAsyncContext context) | |
Debug.Assert(!IsFull, "Expected !IsFull"); | ||
|
||
IntPtr handle = _nextHandle; | ||
_handleToContextMap.TryAdd(handle, context); | ||
Debug.Assert(handle != ShutdownHandle, "ShutdownHandle must not be added to the dictionary"); | ||
_handleToContextMap.TryAdd(handle, new SocketAsyncContextWrapper(context)); | ||
|
||
_nextHandle = IntPtr.Add(_nextHandle, 1); | ||
_outstandingHandles = IntPtr.Add(_outstandingHandles, 1); | ||
|
@@ -318,8 +335,10 @@ private void EventLoop() | |
{ | ||
bool shutdown = false; | ||
Interop.Sys.SocketEvent* buffer = _buffer; | ||
ConcurrentDictionary<IntPtr, SocketAsyncContext> handleToContextMap = _handleToContextMap; | ||
ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> handleToContextMap = _handleToContextMap; | ||
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue; | ||
IntPtr shutdownHandle = ShutdownHandle; | ||
SocketAsyncContext? context = null; | ||
while (!shutdown) | ||
{ | ||
int numEvents = EventBufferCount; | ||
|
@@ -333,38 +352,36 @@ private void EventLoop() | |
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); | ||
|
||
bool enqueuedEvent = false; | ||
for (int i = 0; i < numEvents; i++) | ||
foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(buffer, numEvents)) | ||
{ | ||
IntPtr handle = buffer[i].Data; | ||
if (handle == ShutdownHandle) | ||
{ | ||
shutdown = true; | ||
} | ||
else | ||
IntPtr handle = socketEvent.Data; | ||
|
||
if (handleToContextMap.TryGetValue(handle, out SocketAsyncContextWrapper contextWrapper) && (context = contextWrapper.Context) != null) | ||
{ | ||
Debug.Assert(handle.ToInt64() < MaxHandles.ToInt64(), $"Unexpected values: handle={handle}, MaxHandles={MaxHandles}"); | ||
handleToContextMap.TryGetValue(handle, out SocketAsyncContext? context); | ||
if (context != null) | ||
|
||
Interop.Sys.SocketEvents events = context.HandleSyncEventsSpeculatively(socketEvent.Events); | ||
if (events != Interop.Sys.SocketEvents.None) | ||
{ | ||
Interop.Sys.SocketEvents events = buffer[i].Events; | ||
events = context.HandleSyncEventsSpeculatively(events); | ||
if (events != Interop.Sys.SocketEvents.None) | ||
{ | ||
var ev = new SocketIOEvent(context, events); | ||
eventQueue.Enqueue(ev); | ||
enqueuedEvent = true; | ||
|
||
// This is necessary when the JIT generates unoptimized code (debug builds, live debugging, | ||
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as | ||
// such code may keep the stack location live for longer than necessary | ||
ev = default; | ||
} | ||
var ev = new SocketIOEvent(context, events); | ||
eventQueue.Enqueue(ev); | ||
enqueuedEvent = true; | ||
|
||
// This is necessary when the JIT generates unoptimized code (debug builds, live debugging, | ||
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as | ||
// such code may keep the stack location live for longer than necessary | ||
context = null; | ||
ev = default; | ||
} | ||
|
||
// This is necessary when the JIT generates unoptimized code (debug builds, live debugging, | ||
// quick JIT, etc.) to ensure that the context does not remain referenced by this method, as | ||
// such code may keep the stack location live for longer than necessary | ||
context = null; | ||
contextWrapper = default; | ||
} | ||
else if (handle == shutdownHandle) | ||
{ | ||
shutdown = true; | ||
} | ||
} | ||
|
||
|
@@ -488,6 +505,18 @@ private bool TryRegister(SafeSocketHandle socket, IntPtr handle, out Interop.Err | |
return error == Interop.Error.SUCCESS; | ||
} | ||
|
||
// struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks | ||
// the goal is to have a dedicated generic instantiation and using: | ||
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&) | ||
// instead of: | ||
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&) | ||
private readonly struct SocketAsyncContextWrapper | ||
{ | ||
public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context; | ||
|
||
internal SocketAsyncContext Context { get; } | ||
} | ||
|
||
private readonly struct SocketIOEvent | ||
{ | ||
public SocketAsyncContext Context { get; } | ||
|
@adamsitnik did you mean to
Round
orCeil
here. UsingRound
it takes 45 cores to move to 2 epoll threads. UsingCeil
it takes 31.