Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a thread pool config var on Windows for choosing the number of IOCPs #105145

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,85 @@ namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
// Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids
// continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows
// continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for
// those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The
// config value is named for consistency with SocketAsyncEngine.Unix.cs.
private static readonly bool UnsafeInlineIOCompletionCallbacks =
Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";
private readonly nint[] _ioPorts = new nint[IOCompletionPortCount];
private uint _ioPortSelectorForRegister = unchecked((uint)-1);
private uint _ioPortSelectorForQueue = unchecked((uint)-1);
private IOCompletionPoller[]? _ioCompletionPollers;

private static readonly int IOCompletionPollerCount = GetIOCompletionPollerCount();
private static short DetermineIOCompletionPortCount()
{
const short DefaultIOPortCount = 1;
const short MaxIOPortCount = 1 << 10;

short ioPortCount =
AppContextConfigHelper.GetInt16Config(
"System.Threading.ThreadPool.IOCompletionPortCount",
"DOTNET_ThreadPool_IOCompletionPortCount",
DefaultIOPortCount,
allowNegative: false);
return ioPortCount == 0 ? DefaultIOPortCount : Math.Min(ioPortCount, MaxIOPortCount);
}

private static int GetIOCompletionPollerCount()
private static int DetermineIOCompletionPollerCount()
{
// Named for consistency with SocketAsyncEngine.Unix.cs, this environment variable is checked to override the exact
// number of IO completion poller threads to use. See the comment in SocketAsyncEngine.Unix.cs about its potential
// uses. For this implementation, the ProcessorsPerIOPollerThread config option below may be preferable as it may be
// less machine-specific.
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count))
int ioPollerCount;
if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count) &&
count != 0)
{
return Math.Min((int)count, MaxPossibleThreadCount);
ioPollerCount = (int)Math.Min(count, (uint)MaxPossibleThreadCount);
}

if (UnsafeInlineIOCompletionCallbacks)
else if (UnsafeInlineIOCompletionCallbacks)
{
// In this mode, default to ProcessorCount pollers to ensure that all processors can be utilized if more work
// happens on the poller threads
return Environment.ProcessorCount;
ioPollerCount = Environment.ProcessorCount;
}
else
{
int processorsPerPoller =
AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false);
ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1;
}

int processorsPerPoller =
AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false);
return (Environment.ProcessorCount - 1) / processorsPerPoller + 1;
if (IOCompletionPortCount == 1)
{
return ioPollerCount;
}

// Use at least one IO poller per port
if (ioPollerCount <= IOCompletionPortCount)
{
return IOCompletionPortCount;
}

// Use the same number of IO pollers per port, align up if necessary to make it even
int rem = ioPollerCount % IOCompletionPortCount;
if (rem != 0)
{
ioPollerCount += IOCompletionPortCount - rem;
}

return ioPollerCount;
}

private void InitializeIOOnWindows()
{
Debug.Assert(IOCompletionPollerCount % IOCompletionPortCount == 0);
int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount;
for (int i = 0; i < IOCompletionPortCount; i++)
{
_ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads);
}
}

private static nint CreateIOCompletionPort()
private static nint CreateIOCompletionPort(int numConcurrentThreads)
{
nint port =
Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, IOCompletionPollerCount);
Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, numConcurrentThreads);
if (port == 0)
{
int hr = Marshal.GetHRForLastWin32Error();
Expand All @@ -58,26 +100,32 @@ private static nint CreateIOCompletionPort()

public void RegisterForIOCompletionNotifications(nint handle)
{
Debug.Assert(_ioPort != 0);
Debug.Assert(_ioPorts != null);

if (_ioCompletionPollers == null)
{
EnsureIOCompletionPollers();
}

nint port = Interop.Kernel32.CreateIoCompletionPort(handle, _ioPort, UIntPtr.Zero, 0);
uint selectedPortIndex =
IOCompletionPortCount == 1
? 0
: Interlocked.Increment(ref _ioPortSelectorForRegister) % (uint)IOCompletionPortCount;
nint selectedPort = _ioPorts[selectedPortIndex];
Debug.Assert(selectedPort != 0);
nint port = Interop.Kernel32.CreateIoCompletionPort(handle, selectedPort, UIntPtr.Zero, 0);
if (port == 0)
{
ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error());
}

Debug.Assert(port == _ioPort);
Debug.Assert(port == selectedPort);
}

public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped)
{
Debug.Assert(nativeOverlapped != null);
Debug.Assert(_ioPort != 0);
Debug.Assert(_ioPorts != null);

if (_ioCompletionPollers == null)
{
Expand All @@ -89,7 +137,13 @@ public unsafe void QueueNativeOverlapped(NativeOverlapped* nativeOverlapped)
NativeRuntimeEventSource.Log.ThreadPoolIOEnqueue(nativeOverlapped);
}

if (!Interop.Kernel32.PostQueuedCompletionStatus(_ioPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped))
uint selectedPortIndex =
IOCompletionPortCount == 1
? 0
: Interlocked.Increment(ref _ioPortSelectorForQueue) % (uint)IOCompletionPortCount;
nint selectedPort = _ioPorts[selectedPortIndex];
Debug.Assert(selectedPort != 0);
if (!Interop.Kernel32.PostQueuedCompletionStatus(selectedPort, 0, UIntPtr.Zero, (IntPtr)nativeOverlapped))
{
ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error());
}
Expand All @@ -109,7 +163,7 @@ private void EnsureIOCompletionPollers()
IOCompletionPoller[] pollers = new IOCompletionPoller[IOCompletionPollerCount];
for (int i = 0; i < IOCompletionPollerCount; ++i)
{
pollers[i] = new IOCompletionPoller(_ioPort);
pollers[i] = new IOCompletionPoller(_ioPorts[i % IOCompletionPortCount]);
}

_ioCompletionPollers = pollers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ internal sealed partial class PortableThreadPool
private static readonly short ForcedMaxWorkerThreads =
AppContextConfigHelper.GetInt16Config("System.Threading.ThreadPool.MaxThreads", 0, false);

#if TARGET_WINDOWS
// Continuations of IO completions are dispatched to the ThreadPool from IO completion poller threads. This avoids
// continuations blocking/stalling the IO completion poller threads. Setting UnsafeInlineIOCompletionCallbacks allows
// continuations to run directly on the IO completion poller thread, but is inherently unsafe due to the potential for
// those threads to become stalled due to blocking. Sometimes, setting this config value may yield better latency. The
// config value is named for consistency with SocketAsyncEngine.Unix.cs.
private static readonly bool UnsafeInlineIOCompletionCallbacks =
Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS") == "1";

private static readonly short IOCompletionPortCount = DetermineIOCompletionPortCount();
private static readonly int IOCompletionPollerCount = DetermineIOCompletionPollerCount();
#endif
Comment on lines +44 to +55
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are now used by the constructor, which runs when ThreadPoolInstance is being initialized, so they need to be initialized before ThreadPoolInstance to have the correct values, moved them here to ensure that.


private static readonly int ThreadPoolThreadTimeoutMs = DetermineThreadPoolThreadTimeoutMs();

private static int DetermineThreadPoolThreadTimeoutMs()
Expand Down Expand Up @@ -107,11 +120,6 @@ private struct CacheLineSeparated
private long _memoryUsageBytes;
private long _memoryLimitBytes;

#if TARGET_WINDOWS
private readonly nint _ioPort;
private IOCompletionPoller[]? _ioCompletionPollers;
#endif

private readonly LowLevelLock _threadAdjustmentLock = new LowLevelLock();

private CacheLineSeparated _separated; // SOS's ThreadPool command depends on this name
Expand Down Expand Up @@ -149,7 +157,7 @@ private PortableThreadPool()
_separated.counts.NumThreadsGoal = _minThreads;

#if TARGET_WINDOWS
_ioPort = CreateIOCompletionPort();
InitializeIOOnWindows();
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<PropertyGroup>
<IncludeRemoteExecutor>true</IncludeRemoteExecutor>
<TargetFramework>$(NetCoreAppCurrent)</TargetFramework>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TestRuntime>true</TestRuntime>
</PropertyGroup>
<PropertyGroup Condition="'$(TargetOS)' == 'browser'">
Expand Down
100 changes: 100 additions & 0 deletions src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,106 @@ public static void PrioritizationExperimentConfigVarTest()
}).Dispose();
}

public static IEnumerable<object[]> IOCompletionPortCountConfigVarTest_Args =
from x in Enumerable.Range(0, 9)
select new object[] { x };

// Just verifies that some basic IO operations work with different IOCP counts
[ConditionalTheory(nameof(IsThreadingAndRemoteExecutorSupported), nameof(UsePortableThreadPool))]
[MemberData(nameof(IOCompletionPortCountConfigVarTest_Args))]
[PlatformSpecific(TestPlatforms.Windows)]
public static void IOCompletionPortCountConfigVarTest(int ioCompletionPortCount)
{
// Avoid contaminating the main process' environment
RemoteExecutor.Invoke(ioCompletionPortCountStr =>
{
int ioCompletionPortCount = int.Parse(ioCompletionPortCountStr);

const int PretendProcessorCount = 80;

// The actual test process below will inherit the config vars
Environment.SetEnvironmentVariable("DOTNET_PROCESSOR_COUNT", PretendProcessorCount.ToString());
Environment.SetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT", "7");
if (ioCompletionPortCount != 0)
{
Environment.SetEnvironmentVariable(
"DOTNET_ThreadPool_IOCompletionPortCount",
ioCompletionPortCount.ToString());
}

RemoteExecutor.Invoke(() =>
{
RunQueueNativeOverlappedTest();
RunAsyncIOTest().Wait();

static unsafe void RunQueueNativeOverlappedTest()
{
var done = new AutoResetEvent(false);
for (int i = 0; i < PretendProcessorCount; i++)
{
// Queue a NativeOverlapped, wait for the callback to run
var overlapped = new Overlapped();
NativeOverlapped* nativeOverlapped = overlapped.Pack((_, _, _) => done.Set(), null);
try
{
ThreadPool.UnsafeQueueNativeOverlapped(nativeOverlapped);
done.CheckedWait();
}
finally
{
if (nativeOverlapped != null)
{
Overlapped.Free(nativeOverlapped);
}
}
}
}

static async Task RunAsyncIOTest()
{
var done = new AutoResetEvent(false);

// Receiver
var t = ThreadTestHelpers.CreateGuardedThread(
out Action checkForThreadErrors,
out Action waitForThread,
async () =>
{
using var listener = new TcpListener(IPAddress.Loopback, 55555);
var receiveBuffer = new byte[1];
listener.Start();
done.Set(); // indicate listener started
while (true)
{
// Accept a connection, receive a byte
using var socket = await listener.AcceptSocketAsync();
int bytesRead =
await socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
Assert.Equal(1, bytesRead);
done.Set(); // indicate byte received
}
});
t.IsBackground = true;
t.Start();
done.CheckedWait(); // wait for listener to start

// Sender
var sendBuffer = new byte[1];
for (int i = 0; i < PretendProcessorCount / 2; i++)
{
// Connect, send a byte
using var client = new TcpClient();
await client.ConnectAsync(IPAddress.Loopback, 55555);
int bytesSent =
await client.Client.SendAsync(new ArraySegment<byte>(sendBuffer), SocketFlags.None);
Assert.Equal(1, bytesSent);
done.CheckedWait(); // wait for byte to the received
}
}
}).Dispose();
}, ioCompletionPortCount.ToString()).Dispose();
}

public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<IncludeRemoteExecutor>true</IncludeRemoteExecutor>
<!-- This test project is Windows only as it uses the Windows Threadpool -->
<TargetFramework>$(NetCoreAppCurrent)-windows</TargetFramework>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<TestRuntime>true</TestRuntime>
</PropertyGroup>
<ItemGroup>
Expand Down
Loading