From defd8ada3acab026649d827290a363be02fa86c0 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 5 Oct 2023 12:03:25 -0700 Subject: [PATCH 01/21] Schedule only when the queue is not empty --- .../System/Net/Sockets/SocketAsyncEngine.Unix.cs | 15 +++++++++------ .../src/System/Threading/ThreadPoolWorkQueue.cs | 15 +++++++++------ 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index aafcf158518a9c..9a4a0fa4c903d0 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -230,12 +230,15 @@ void IThreadPoolWorkItem.Execute() int startTimeMs = Environment.TickCount; - // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize - // processing of events, before processing more events. Following this, it is the responsibility of the new work - // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if - // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued - // socket events. - ScheduleToProcessEvents(); + if (!eventQueue.IsEmpty) + { + // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize + // processing of events, before processing more events. Following this, it is the responsibility of the new work + // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if + // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued + // socket events. + ScheduleToProcessEvents(); + } while (true) { diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index cc7c76922d9964..531b56141b082b 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -1110,12 +1110,15 @@ void IThreadPoolWorkItem.Execute() return; } - // An work item was successfully dequeued, and there may be more work items to process. Schedule a work item to - // parallelize processing of work items, before processing more work items. Following this, it is the responsibility - // of the new work item and the poller thread to schedule more work items as necessary. The parallelization may be - // necessary here if the user callback as part of handling the work item blocks for some reason that may have a - // dependency on other queued work items. - ScheduleForProcessing(); + if (!_workItems.IsEmpty) + { + // An work item was successfully dequeued, and there may be more work items to process. Schedule a work item to + // parallelize processing of work items, before processing more work items. Following this, it is the responsibility + // of the new work item and the poller thread to schedule more work items as necessary. The parallelization may be + // necessary here if the user callback as part of handling the work item blocks for some reason that may have a + // dependency on other queued work items. + ScheduleForProcessing(); + } ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals!; Debug.Assert(tl != null); From f1a7cfd8bd95d2c5963acea4966989cddc9dc2a8 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 10 Oct 2023 22:23:36 -0700 Subject: [PATCH 02/21] Don't parallelize dispatching unnecessarily --- .../System/Threading/ThreadPoolWorkQueue.cs | 131 ++++++++++-------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 531b56141b082b..46b991fabbb03d 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -638,8 +638,18 @@ internal static bool LocalFindAndPop(object callback) public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { + object? workItem; + if (_nextWorkItemToProcess != null) + { + workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); + if (workItem != null) + { + return workItem; + } + } + // Check for local work items - object? workItem = tl.workStealingQueue.LocalPop(); + workItem = tl.workStealingQueue.LocalPop(); if (workItem != null) { return workItem; @@ -763,6 +773,37 @@ public long GlobalCount // Dispatch (if YieldFromDispatchLoop is true), or performing periodic activities public const uint DispatchQuantumMs = 30; + private static object? _nextWorkItemToProcess; + + private static bool TryDequeue(out object? workItem, ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl) + { + bool missedSteal = false; + workItem = workQueue.Dequeue(tl, ref missedSteal); + + if (workItem == null) + { + if (s_assignableWorkItemQueueCount > 0) + { + workQueue.UnassignWorkItemQueue(tl); + } + + // + // No work. + // If we missed a steal, though, there may be more work in the queue. + // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread + // that owns the contended work-stealing queue will pick up its own workitems in the meantime, + // which will be more efficient than this thread doing it anyway. + // + if (missedSteal) + { + workQueue.EnsureThreadRequested(); + } + + return false; + } + return true; + } + /// /// Dispatches work items to this thread. /// @@ -784,13 +825,23 @@ internal static bool Dispatch() workQueue.MarkThreadRequestSatisfied(); object? workItem = null; + if (_nextWorkItemToProcess != null) + { + workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); + if (workItem == null) + { + // go through queues and try to dequeue + TryDequeue(out workItem, workQueue, tl); + } + } + { // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work // items get a chance to run in situations where worker threads are starved and work items that run also // take over the thread, sustaining starvation. For example, when worker threads are continually starved, // high-priority work items may always be queued and normal-priority work items may not get a chance to run. bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst; - if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal) + if (workItem == null && dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal) { workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst; ConcurrentQueue queue = @@ -801,41 +852,24 @@ internal static bool Dispatch() } } - if (workItem == null) + if (workItem == null && !TryDequeue(out workItem, workQueue, tl)) { - bool missedSteal = false; - workItem = workQueue.Dequeue(tl, ref missedSteal); - - if (workItem == null) - { - if (s_assignableWorkItemQueueCount > 0) - { - workQueue.UnassignWorkItemQueue(tl); - } - - // - // No work. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - if (missedSteal) - { - workQueue.EnsureThreadRequested(); - } - - // Tell the VM we're returning normally, not because Hill Climbing asked us to return. - return true; - } + // Tell the VM we're returning normally, not because Hill Climbing asked us to return. + return true; } - // A work item was successfully dequeued, and there may be more work items to process. Request a thread to - // parallelize processing of work items, before processing more work items. Following this, it is the - // responsibility of the new thread and other enqueuers to request more threads as necessary. The - // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some - // reason that may have a dependency on other queued work items. - workQueue.EnsureThreadRequested(); + object? secondWorkItem = null; + if (TryDequeue(out secondWorkItem, workQueue, tl)) + { + _nextWorkItemToProcess = secondWorkItem; + + // A work item was successfully dequeued, and there may be more work items to process. Request a thread to + // parallelize processing of work items, before processing more work items. Following this, it is the + // responsibility of the new thread and other enqueuers to request more threads as necessary. The + // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some + // reason that may have a dependency on other queued work items. + workQueue.EnsureThreadRequested(); + } // After this point, this method is no longer responsible for ensuring thread requests except for missed steals } @@ -860,34 +894,13 @@ internal static bool Dispatch() // while (true) { - if (workItem == null) + if (workItem == null && !TryDequeue(out workItem, workQueue, tl)) { - bool missedSteal = false; - workItem = workQueue.Dequeue(tl, ref missedSteal); - - if (workItem == null) - { - if (s_assignableWorkItemQueueCount > 0) - { - workQueue.UnassignWorkItemQueue(tl); - } - - // - // No work. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - if (missedSteal) - { - workQueue.EnsureThreadRequested(); - } - - return true; - } + // Tell the VM we're returning normally, not because Hill Climbing asked us to return. + return true; } + Debug.Assert(workItem != null); if (workQueue._loggingEnabled && FrameworkEventSource.Log.IsEnabled()) { FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem); From 24b4e4d132fdcb6788442045bdb55bd3baa85dad Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 11 Oct 2023 15:45:24 -0700 Subject: [PATCH 03/21] Update refactor --- .../System/Threading/ThreadPoolWorkQueue.cs | 135 ++++++++++-------- 1 file changed, 73 insertions(+), 62 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 46b991fabbb03d..20bccac2b9d4b7 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -639,6 +639,14 @@ internal static bool LocalFindAndPop(object callback) public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { object? workItem; + + // Check for local work items + workItem = tl.workStealingQueue.LocalPop(); + if (workItem != null) + { + return workItem; + } + if (_nextWorkItemToProcess != null) { workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); @@ -648,13 +656,6 @@ internal static bool LocalFindAndPop(object callback) } } - // Check for local work items - workItem = tl.workStealingQueue.LocalPop(); - if (workItem != null) - { - return workItem; - } - // Check for high-priority work items if (tl.isProcessingHighPriorityWorkItems) { @@ -775,33 +776,29 @@ public long GlobalCount private static object? _nextWorkItemToProcess; - private static bool TryDequeue(out object? workItem, ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl) + private static void TryDequeue(out object? workItem, out bool missedSteal, ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl) { - bool missedSteal = false; - workItem = workQueue.Dequeue(tl, ref missedSteal); - - if (workItem == null) - { - if (s_assignableWorkItemQueueCount > 0) - { - workQueue.UnassignWorkItemQueue(tl); - } - - // - // No work. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - if (missedSteal) + // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work + // items get a chance to run in situations where worker threads are starved and work items that run also + // take over the thread, sustaining starvation. For example, when worker threads are continually starved, + // high-priority work items may always be queued and normal-priority work items may not get a chance to run. + bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst; + if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal) + { + workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst; + ConcurrentQueue queue = + s_assignableWorkItemQueueCount > 0 ? tl.assignedGlobalWorkItemQueue : workQueue.workItems; + if (!queue.TryDequeue(out workItem) && s_assignableWorkItemQueueCount > 0) { - workQueue.EnsureThreadRequested(); + workQueue.workItems.TryDequeue(out workItem); } + } - return false; + missedSteal = false; + if (workItem == null) + { + workItem = workQueue.Dequeue(tl, ref missedSteal); } - return true; } /// @@ -824,55 +821,48 @@ internal static bool Dispatch() // Before dequeuing the first work item, acknowledge that the thread request has been satisfied workQueue.MarkThreadRequestSatisfied(); + bool anyMissedSteal = false; object? workItem = null; if (_nextWorkItemToProcess != null) { - workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); + TryDequeue(out workItem, out anyMissedSteal, workQueue, tl); + if (workItem == null) { - // go through queues and try to dequeue - TryDequeue(out workItem, workQueue, tl); + workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); } } { - // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work - // items get a chance to run in situations where worker threads are starved and work items that run also - // take over the thread, sustaining starvation. For example, when worker threads are continually starved, - // high-priority work items may always be queued and normal-priority work items may not get a chance to run. - bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst; - if (workItem == null && dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal) + object? secondWorkItem = null; + TryDequeue(secondWorkItem, out bool missedSteal, workQueue, tl); + + anyMissedSteal |= missedSteal; + if (secondWorkItem == null) { - workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst; - ConcurrentQueue queue = - s_assignableWorkItemQueueCount > 0 ? tl.assignedGlobalWorkItemQueue : workQueue.workItems; - if (!queue.TryDequeue(out workItem) && s_assignableWorkItemQueueCount > 0) + // This block should be left out right? + /* + if (s_assignableWorkItemQueueCount > 0) { - workQueue.workItems.TryDequeue(out workItem); + workQueue.UnassignWorkItemQueue(tl); } - } - - if (workItem == null && !TryDequeue(out workItem, workQueue, tl)) - { // Tell the VM we're returning normally, not because Hill Climbing asked us to return. return true; + */ } - - object? secondWorkItem = null; - if (TryDequeue(out secondWorkItem, workQueue, tl)) + else { - _nextWorkItemToProcess = secondWorkItem; + // _nextWorkItemToProcess = secondWorkItem; + tl.workStealingQueue.LocalPush(secondWorkItem); + } - // A work item was successfully dequeued, and there may be more work items to process. Request a thread to - // parallelize processing of work items, before processing more work items. Following this, it is the - // responsibility of the new thread and other enqueuers to request more threads as necessary. The - // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some - // reason that may have a dependency on other queued work items. + if (secondWorkItem != null || anyMissedSteal) + { + // Ensure a thread is requested and continue to process work items. workQueue.EnsureThreadRequested(); } - - // After this point, this method is no longer responsible for ensuring thread requests except for missed steals } + // After this point, this method is no longer responsible for ensuring thread requests except for missed steals // Has the desire for logging changed since the last time we entered? workQueue.RefreshLoggingEnabled(); @@ -894,13 +884,34 @@ internal static bool Dispatch() // while (true) { - if (workItem == null && !TryDequeue(out workItem, workQueue, tl)) + if (workItem == null) { - // Tell the VM we're returning normally, not because Hill Climbing asked us to return. - return true; + bool missedSteal = false; + workItem = workQueue.Dequeue(tl, ref missedSteal); + + if (workItem == null) + { + if (s_assignableWorkItemQueueCount > 0) + { + workQueue.UnassignWorkItemQueue(tl); + } + + // + // No work. + // If we missed a steal, though, there may be more work in the queue. + // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread + // that owns the contended work-stealing queue will pick up its own workitems in the meantime, + // which will be more efficient than this thread doing it anyway. + // + if (missedSteal) + { + workQueue.EnsureThreadRequested(); + } + + return true; + } } - Debug.Assert(workItem != null); if (workQueue._loggingEnabled && FrameworkEventSource.Log.IsEnabled()) { FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem); From cab32364b449620275065ffcafd39742138ac1a0 Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 11 Oct 2023 16:12:02 -0700 Subject: [PATCH 04/21] Fix build --- .../System/Threading/ThreadPoolWorkQueue.cs | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 20bccac2b9d4b7..93d3a9dc550237 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -778,6 +778,7 @@ public long GlobalCount private static void TryDequeue(out object? workItem, out bool missedSteal, ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl) { + workItem = null; // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work // items get a chance to run in situations where worker threads are starved and work items that run also // take over the thread, sustaining starvation. For example, when worker threads are continually starved, @@ -795,10 +796,7 @@ private static void TryDequeue(out object? workItem, out bool missedSteal, Threa } missedSteal = false; - if (workItem == null) - { - workItem = workQueue.Dequeue(tl, ref missedSteal); - } + workItem ??= workQueue.Dequeue(tl, ref missedSteal); } /// @@ -827,15 +825,12 @@ internal static bool Dispatch() { TryDequeue(out workItem, out anyMissedSteal, workQueue, tl); - if (workItem == null) - { - workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); - } + workItem ??= Interlocked.Exchange(ref _nextWorkItemToProcess, null); } { object? secondWorkItem = null; - TryDequeue(secondWorkItem, out bool missedSteal, workQueue, tl); + TryDequeue(out secondWorkItem, out bool missedSteal, workQueue, tl); anyMissedSteal |= missedSteal; if (secondWorkItem == null) @@ -846,6 +841,17 @@ internal static bool Dispatch() { workQueue.UnassignWorkItemQueue(tl); } + // + // No work. + // If we missed a steal, though, there may be more work in the queue. + // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread + // that owns the contended work-stealing queue will pick up its own workitems in the meantime, + // which will be more efficient than this thread doing it anyway. + // + if (missedSteal) + { + workQueue.EnsureThreadRequested(); + } // Tell the VM we're returning normally, not because Hill Climbing asked us to return. return true; */ @@ -861,8 +867,9 @@ internal static bool Dispatch() // Ensure a thread is requested and continue to process work items. workQueue.EnsureThreadRequested(); } + + // After this point, this method is no longer responsible for ensuring thread requests except for missed steals } - // After this point, this method is no longer responsible for ensuring thread requests except for missed steals // Has the desire for logging changed since the last time we entered? workQueue.RefreshLoggingEnabled(); From 512969238a4daaef8007f34012d31a782fa6e91d Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 12 Oct 2023 23:03:33 -0700 Subject: [PATCH 05/21] Update Dispatch --- .../src/System/Threading/ThreadPoolWorkQueue.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 93d3a9dc550237..043dcd89aeedd6 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -823,9 +823,12 @@ internal static bool Dispatch() object? workItem = null; if (_nextWorkItemToProcess != null) { - TryDequeue(out workItem, out anyMissedSteal, workQueue, tl); + workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); - workItem ??= Interlocked.Exchange(ref _nextWorkItemToProcess, null); + if (workItem == null) + { + TryDequeue(out workItem, out anyMissedSteal, workQueue, tl); + } } { From 7678ab2dfb86837923cad94f79f0aaaf8b424be7 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 13 Oct 2023 12:04:12 -0700 Subject: [PATCH 06/21] Update Dequeue and Dispatch --- .../System/Threading/ThreadPoolWorkQueue.cs | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 043dcd89aeedd6..550dbb72bef2c6 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -638,10 +638,8 @@ internal static bool LocalFindAndPop(object callback) public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { - object? workItem; - // Check for local work items - workItem = tl.workStealingQueue.LocalPop(); + object? workItem = tl.workStealingQueue.LocalPop(); if (workItem != null) { return workItem; @@ -776,9 +774,9 @@ public long GlobalCount private static object? _nextWorkItemToProcess; - private static void TryDequeue(out object? workItem, out bool missedSteal, ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl) + private static object? DequeueWithPriorityAlternation(ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl, out bool missedSteal) { - workItem = null; + object? workItem = null; // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work // items get a chance to run in situations where worker threads are starved and work items that run also // take over the thread, sustaining starvation. For example, when worker threads are continually starved, @@ -797,6 +795,8 @@ private static void TryDequeue(out object? workItem, out bool missedSteal, Threa missedSteal = false; workItem ??= workQueue.Dequeue(tl, ref missedSteal); + + return workItem; } /// @@ -819,31 +819,22 @@ internal static bool Dispatch() // Before dequeuing the first work item, acknowledge that the thread request has been satisfied workQueue.MarkThreadRequestSatisfied(); - bool anyMissedSteal = false; object? workItem = null; if (_nextWorkItemToProcess != null) { workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); - - if (workItem == null) - { - TryDequeue(out workItem, out anyMissedSteal, workQueue, tl); - } } + if (workItem == null) { - object? secondWorkItem = null; - TryDequeue(out secondWorkItem, out bool missedSteal, workQueue, tl); - - anyMissedSteal |= missedSteal; - if (secondWorkItem == null) + workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); + if (workItem == null) { - // This block should be left out right? - /* if (s_assignableWorkItemQueueCount > 0) { workQueue.UnassignWorkItemQueue(tl); } + // // No work. // If we missed a steal, though, there may be more work in the queue. @@ -855,19 +846,26 @@ internal static bool Dispatch() { workQueue.EnsureThreadRequested(); } - // Tell the VM we're returning normally, not because Hill Climbing asked us to return. + return true; - */ } - else + } + + { + object? secondWorkItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); + if (secondWorkItem != null && Interlocked.CompareExchange(ref _nextWorkItemToProcess, secondWorkItem, null) != null) { - // _nextWorkItemToProcess = secondWorkItem; tl.workStealingQueue.LocalPush(secondWorkItem); } - - if (secondWorkItem != null || anyMissedSteal) + if (secondWorkItem != null || missedSteal) { - // Ensure a thread is requested and continue to process work items. + // TODO: Update comment below + + // A work item was successfully dequeued, and there may be more work items to process. Request a thread to + // parallelize processing of work items, before processing more work items. Following this, it is the + // responsibility of the new thread and other enqueuers to request more threads as necessary. The + // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some + // reason that may have a dependency on other queued work items. workQueue.EnsureThreadRequested(); } From fb44db5567bf6e80a2e11cfd2d212a0cd3a563da Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 5 Oct 2023 12:03:25 -0700 Subject: [PATCH 07/21] Use a 3-stage scheme for parallelizing processing of IO events --- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 102 ++++++++++-------- 1 file changed, 55 insertions(+), 47 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 9a4a0fa4c903d0..edd419445a9e22 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -87,12 +87,14 @@ private static SocketAsyncEngine[] CreateEngines() // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); - // - // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is - // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is - // not scheduled. Changes are protected by atomic operations as appropriate. - // - private int _eventQueueProcessingRequested; + private enum EventQueueProcessingStage + { + NotScheduled, + Determining, + Scheduled + } + + private int _eventQueueProcessingStage; // // Registers the Socket with a SocketAsyncEngine, and returns the associated engine. @@ -190,9 +192,12 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); - if (handler.HandleSocketEvents(numEvents)) + if (handler.HandleSocketEvents(numEvents) && + Interlocked.Exchange( + ref _eventQueueProcessingStage, + (int)EventQueueProcessingStage.Scheduled) == (int)EventQueueProcessingStage.NotScheduled) { - ScheduleToProcessEvents(); + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } } @@ -202,45 +207,60 @@ private void EventLoop() } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void ScheduleToProcessEvents() + private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) { - // Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid - // over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item - // to be scheduled for parallelizing processing of events. - if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + if (!isEventQueueEmpty) { - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + _eventQueueProcessingStage = (int)EventQueueProcessingStage.Scheduled; } + else + { + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _eventQueueProcessingStage, + (int)EventQueueProcessingStage.NotScheduled, + (int)EventQueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining) + { + return; + } + } + + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } void IThreadPoolWorkItem.Execute() { - // Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer - // threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an - // enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is - // the last thread processing events, it must see the event queued by the enqueuer. - Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); - ConcurrentQueue eventQueue = _eventQueue; - if (!eventQueue.TryDequeue(out SocketIOEvent ev)) + SocketIOEvent ev; + while (true) { - return; - } + Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled); + _eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); - int startTimeMs = Environment.TickCount; + if (eventQueue.TryDequeue(out ev)) + { + break; + } - if (!eventQueue.IsEmpty) - { - // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize - // processing of events, before processing more events. Following this, it is the responsibility of the new work - // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if - // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued - // socket events. - ScheduleToProcessEvents(); + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _eventQueueProcessingStage, + (int)EventQueueProcessingStage.NotScheduled, + (int)EventQueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining) + { + return; + } } - while (true) + UpdateEventQueueProcessingStage(eventQueue.IsEmpty); + + int startTimeMs = Environment.TickCount; + do { ev.Context.HandleEvents(ev.Events); @@ -256,19 +276,7 @@ void IThreadPoolWorkItem.Execute() // using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater // impact on throughput compared to the threshold chosen below, though it is slight enough that it may not // matter much. Higher thresholds didn't seem to have any noticeable effect. - if (Environment.TickCount - startTimeMs >= 15) - { - break; - } - - if (!eventQueue.TryDequeue(out ev)) - { - return; - } - } - - // The queue was not observed to be empty, schedule another work item before yielding the thread - ScheduleToProcessEvents(); + } while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev)); } private void FreeNativeResources() From 65bbc5517e95b92cfbf6699fc5d7ca4bf7eef1ad Mon Sep 17 00:00:00 2001 From: Eduardo Velarde Date: Thu, 2 Nov 2023 13:06:50 -0700 Subject: [PATCH 08/21] Remove Memory Barrier --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index edd419445a9e22..353576c0a3999a 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -238,7 +238,6 @@ void IThreadPoolWorkItem.Execute() { Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled); _eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining; - Interlocked.MemoryBarrier(); if (eventQueue.TryDequeue(out ev)) { From e980fc1db6f9e9b9a9a6f59604ea9d2670f4e156 Mon Sep 17 00:00:00 2001 From: Eduardo Velarde Date: Thu, 2 Nov 2023 15:05:52 -0700 Subject: [PATCH 09/21] Add 3 stage to ThreadPoolTypedWorkItemQueue --- .../System/Threading/ThreadPoolWorkQueue.cs | 81 +++++++++++++------ 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 550dbb72bef2c6..0e1993cc315e33 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -1102,7 +1102,14 @@ internal sealed class ThreadPoolTypedWorkItemQueue : IThreadPoolWo where T : struct where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback { - private int _isScheduledForProcessing; + private enum QueueProcessingStage + { + NotScheduled, + Determining, + Scheduled + } + + private int _queueProcessingStage; private readonly ConcurrentQueue _workItems = new ConcurrentQueue(); public int Count => _workItems.Count; @@ -1114,44 +1121,66 @@ public void Enqueue(T workItem) } public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem); - public void CompleteBatchEnqueue() => ScheduleForProcessing(); - - private void ScheduleForProcessing() + public void CompleteBatchEnqueue() { - // Only one thread is requested at a time to avoid over-parallelization. Currently where this type is used, queued - // work is expected to be processed at high priority. The implementation could be modified to support different - // priorities if necessary. - if (Interlocked.CompareExchange(ref _isScheduledForProcessing, 1, 0) == 0) + if (Interlocked.Exchange( + ref _queueProcessingStage, + (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) { ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this); } } - void IThreadPoolWorkItem.Execute() + private void UpdateQueueProcessingStage(bool isQueueEmpty) { - Debug.Assert(_isScheduledForProcessing != 0); - - // This change needs to be visible to other threads that may enqueue work items before a work item is attempted to - // be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not schedule for - // processing, and the current thread is the last thread processing work items, the current thread must see the work - // item queued by the enqueuer. - _isScheduledForProcessing = 0; - Interlocked.MemoryBarrier(); - if (!_workItems.TryDequeue(out T workItem)) + if (!isQueueEmpty) { - return; + _queueProcessingStage = (int)QueueProcessingStage.Scheduled; + } + else + { + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + return; + } } - if (!_workItems.IsEmpty) + ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this); + } + + void IThreadPoolWorkItem.Execute() + { + T workItem; + while (true) { - // An work item was successfully dequeued, and there may be more work items to process. Schedule a work item to - // parallelize processing of work items, before processing more work items. Following this, it is the responsibility - // of the new work item and the poller thread to schedule more work items as necessary. The parallelization may be - // necessary here if the user callback as part of handling the work item blocks for some reason that may have a - // dependency on other queued work items. - ScheduleForProcessing(); + Debug.Assert(_queueProcessingStage == (int)QueueProcessingStage.Scheduled); + _queueProcessingStage = (int)QueueProcessingStage.Determining; + + if (_workItems.TryDequeue(out workItem)) + { + break; + } + + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + return; + } } + UpdateQueueProcessingStage(_workItems.IsEmpty); + ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals!; Debug.Assert(tl != null); Thread currentThread = tl.currentThread; From 003a8322e379cec36f3e79163630c4966dd69af4 Mon Sep 17 00:00:00 2001 From: Eduardo Velarde Date: Thu, 2 Nov 2023 21:48:56 -0700 Subject: [PATCH 10/21] Add 3-stage scheme to Thread pool dispatcher() --- .../System/Threading/ThreadPoolWorkQueue.cs | 70 +++++++++++++------ 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 0e1993cc315e33..866937397f9cc0 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -404,12 +404,19 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); + private enum QueueProcessingStage + { + NotScheduled, + Determining, + Scheduled + } + [StructLayout(LayoutKind.Sequential)] private struct CacheLineSeparated { private readonly Internal.PaddingFor32 pad1; - public int hasOutstandingThreadRequest; + public int queueProcessingStage; private readonly Internal.PaddingFor32 pad2; } @@ -573,24 +580,14 @@ public void RefreshLoggingEnabledFull() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void EnsureThreadRequested() { - // Only one thread is requested at a time to avoid over-parallelization - if (Interlocked.CompareExchange(ref _separated.hasOutstandingThreadRequest, 1, 0) == 0) + if (Interlocked.Exchange( + ref _separated.queueProcessingStage, + (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) { ThreadPool.RequestWorkerThread(); } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void MarkThreadRequestSatisfied() - { - // The change needs to be visible to other threads that may request a worker thread before a work item is attempted - // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a - // thread because it sees that a thread request is already outstanding, and the current thread is the last thread - // processing work items, the current thread must see the work item queued by the enqueuer. - _separated.hasOutstandingThreadRequest = 0; - Interlocked.MemoryBarrier(); - } - public void Enqueue(object callback, bool forceGlobal) { Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); @@ -816,20 +813,32 @@ internal static bool Dispatch() workQueue.AssignWorkItemQueue(tl); } - // Before dequeuing the first work item, acknowledge that the thread request has been satisfied - workQueue.MarkThreadRequestSatisfied(); - object? workItem = null; if (_nextWorkItemToProcess != null) { workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); } - if (workItem == null) + // Try to dequeue a work item, clean up and return if no item was found + while (workItem == null) { - workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); - if (workItem == null) + Debug.Assert(workQueue._separated.queueProcessingStage == (int)QueueProcessingStage.Scheduled); + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + + if ((workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal)) != null) { + break; + } + + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref workQueue._separated.queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + // no work items found, no work items enqueued so far if (s_assignableWorkItemQueueCount > 0) { workQueue.UnassignWorkItemQueue(tl); @@ -866,7 +875,26 @@ internal static bool Dispatch() // responsibility of the new thread and other enqueuers to request more threads as necessary. The // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some // reason that may have a dependency on other queued work items. - workQueue.EnsureThreadRequested(); + + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Scheduled; + // I thought we might want to add a MemoryBarrier here but not sure about it + ThreadPool.RequestWorkerThread(); + } + else + { + // the state should be Determining if no more work items were enqueued + // however, another thread might have done so and changed the state to Scheduled + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref workQueue._separated.queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Scheduled) + { + // Request another worker thread given that there is more work + ThreadPool.RequestWorkerThread(); + } } // After this point, this method is no longer responsible for ensuring thread requests except for missed steals From 46b8077ce90e35a40d2f9ce569c53a64b919babf Mon Sep 17 00:00:00 2001 From: Eduardo Velarde Date: Fri, 3 Nov 2023 15:15:27 -0700 Subject: [PATCH 11/21] Revert "Remove Memory Barrier" This reverts commit 52d488a9dd7a555deddf4b51c1043f53b1400a62. --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 353576c0a3999a..edd419445a9e22 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -238,6 +238,7 @@ void IThreadPoolWorkItem.Execute() { Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled); _eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); if (eventQueue.TryDequeue(out ev)) { From 853fd6baa579f55f092ba272a1a4bf6391bb67da Mon Sep 17 00:00:00 2001 From: Eduardo Velarde Date: Fri, 3 Nov 2023 16:30:44 -0700 Subject: [PATCH 12/21] Added MemoryBarrier calls --- .../src/System/Threading/ThreadPoolWorkQueue.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 866937397f9cc0..6da06ef0b2a872 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -824,6 +824,7 @@ internal static bool Dispatch() { Debug.Assert(workQueue._separated.queueProcessingStage == (int)QueueProcessingStage.Scheduled); workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); if ((workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal)) != null) { @@ -1189,6 +1190,7 @@ void IThreadPoolWorkItem.Execute() { Debug.Assert(_queueProcessingStage == (int)QueueProcessingStage.Scheduled); _queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); if (_workItems.TryDequeue(out workItem)) { From f574ef26959d46b5f09e555fffc1c301cc2f10ad Mon Sep 17 00:00:00 2001 From: Eduardo Velarde Date: Mon, 1 Apr 2024 19:12:46 -0700 Subject: [PATCH 13/21] Add comments --- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 16 ++++++++ .../System/Threading/ThreadPoolWorkQueue.cs | 38 +++++++++++++++---- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index edd419445a9e22..f070f6c549d31c 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -87,6 +87,15 @@ private static SocketAsyncEngine[] CreateEngines() // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + // The scheme works as following: + // From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them + // From Scheduled, the only transition is to Determining right before trying to dequeue an event + // From Determining, it can go to either NotScheduled (when no events are present in the queue (the previous work item processed all of them) + // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient) + // + // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed + // Another work item isn't enqueued to the thread pool hastily while the state is Determining, + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time private enum EventQueueProcessingStage { NotScheduled, @@ -192,6 +201,8 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); + // Only enqueue a work item if the stage is NotScheduled + // Otherwise there must be a work item already queued or executing, let it handle parallelization if (handler.HandleSocketEvents(numEvents) && Interlocked.Exchange( ref _eventQueueProcessingStage, @@ -211,10 +222,13 @@ private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) { if (!isEventQueueEmpty) { + // There are more events to process, set stage to Scheduled and enqueue a work item _eventQueueProcessingStage = (int)EventQueueProcessingStage.Scheduled; } else { + // The stage before update would naturally be Determining, in that case there is no more work to do + // However the event enqueuer may have set it to Scheduled if more events arrived so enqueue another work item to handle them int stageBeforeUpdate = Interlocked.CompareExchange( ref _eventQueueProcessingStage, @@ -245,6 +259,8 @@ void IThreadPoolWorkItem.Execute() break; } + // The stage before update would naturally be Determining, in that case there is no more work to do + // However the event enqueuer may have set it to Scheduled if more events were enqueued so let the work item try to dequeue again int stageBeforeUpdate = Interlocked.CompareExchange( ref _eventQueueProcessingStage, diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 6da06ef0b2a872..4879f3f805f434 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -404,6 +404,15 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); + // The scheme works as following: + // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them + // From Scheduled, the only transition is to Determining right before trying to dequeue an item + // From Determining, it can go to either NotScheduled when no items are present in the queue (the previous thread processed all of them) + // or Scheduled if the queue is still not empty (let the current thread handle parallelization as convinient) + // + // The goal is to avoid requesting more threads than necessary, while still ensuring that all items are processed + // Another thread isn't requested hastily while the state is Determining, + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time private enum QueueProcessingStage { NotScheduled, @@ -580,6 +589,8 @@ public void RefreshLoggingEnabledFull() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void EnsureThreadRequested() { + // Only request a thread if the stage is NotScheduled + // Otherwise let the current requested thread handle parallelization if (Interlocked.Exchange( ref _separated.queueProcessingStage, (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) @@ -831,6 +842,8 @@ internal static bool Dispatch() break; } + // The stage before update would naturally be Determining, in that case there is no more work to do + // However it may be Scheduled if more items were enqueued int stageBeforeUpdate = Interlocked.CompareExchange( ref workQueue._separated.queueProcessingStage, @@ -869,22 +882,18 @@ internal static bool Dispatch() } if (secondWorkItem != null || missedSteal) { - // TODO: Update comment below - // A work item was successfully dequeued, and there may be more work items to process. Request a thread to // parallelize processing of work items, before processing more work items. Following this, it is the // responsibility of the new thread and other enqueuers to request more threads as necessary. The // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some // reason that may have a dependency on other queued work items. - workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Scheduled; - // I thought we might want to add a MemoryBarrier here but not sure about it ThreadPool.RequestWorkerThread(); } else { - // the state should be Determining if no more work items were enqueued - // however, another thread might have done so and changed the state to Scheduled + // The stage before update would naturally be Determining, in that case there is no more work to do + // However the enqueuer may have set it to Scheduled if more items were enqueued so request another thread int stageBeforeUpdate = Interlocked.CompareExchange( ref workQueue._separated.queueProcessingStage, @@ -893,7 +902,6 @@ internal static bool Dispatch() Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); if (stageBeforeUpdate == (int)QueueProcessingStage.Scheduled) { - // Request another worker thread given that there is more work ThreadPool.RequestWorkerThread(); } } @@ -1131,6 +1139,15 @@ internal sealed class ThreadPoolTypedWorkItemQueue : IThreadPoolWo where T : struct where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback { + // The scheme works as following: + // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them + // From Scheduled, the only transition is to Determining right before trying to dequeue an item + // From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them) + // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient) + // + // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed + // Another TP work item isn't enqueued to the thread pool hastily while the state is Determining, + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time private enum QueueProcessingStage { NotScheduled, @@ -1152,6 +1169,8 @@ public void Enqueue(T workItem) public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem); public void CompleteBatchEnqueue() { + // Only enqueue a work item if the stage is NotScheduled + // Otherwise there must be a work item already queued or executing, let it handle parallelization if (Interlocked.Exchange( ref _queueProcessingStage, (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) @@ -1164,10 +1183,13 @@ private void UpdateQueueProcessingStage(bool isQueueEmpty) { if (!isQueueEmpty) { + // There are more items to process, set stage to Scheduled and enqueue a TP work item _queueProcessingStage = (int)QueueProcessingStage.Scheduled; } else { + // The stage before update would naturally be Determining, in that case there is no more work to do + // However the enqueuer may have set it to Scheduled if more items arrived so enqueue another work item to handle them int stageBeforeUpdate = Interlocked.CompareExchange( ref _queueProcessingStage, @@ -1197,6 +1219,8 @@ void IThreadPoolWorkItem.Execute() break; } + // The stage before update would naturally be Determining, in that case there is no more work to do + // However the enqueuer may have set it to Scheduled if more items were enqueued so let the TP work item try to dequeue again int stageBeforeUpdate = Interlocked.CompareExchange( ref _queueProcessingStage, From a95d1d56912e6bbe3b09789d5eb5d472e0ced907 Mon Sep 17 00:00:00 2001 From: Koundinya Veluri Date: Tue, 21 May 2024 10:33:39 -0700 Subject: [PATCH 14/21] Adjust a couple of things in thread pool dispatcher --- .../System/Threading/ThreadPoolWorkQueue.cs | 100 ++++++++++++------ 1 file changed, 68 insertions(+), 32 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 4879f3f805f434..e78074072a460f 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -785,6 +785,7 @@ public long GlobalCount private static object? DequeueWithPriorityAlternation(ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl, out bool missedSteal) { object? workItem = null; + // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work // items get a chance to run in situations where worker threads are starved and work items that run also // take over the thread, sustaining starvation. For example, when worker threads are continually starved, @@ -824,40 +825,26 @@ internal static bool Dispatch() workQueue.AssignWorkItemQueue(tl); } + // The change needs to be visible to other threads that may request a worker thread before a work item is attempted + // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a + // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing + // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of + // Scheduled, and try to dequeue again or request another thread. + Debug.Assert(workQueue._separated.queueProcessingStage == (int)QueueProcessingStage.Scheduled); + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); + object? workItem = null; if (_nextWorkItemToProcess != null) { workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); } - // Try to dequeue a work item, clean up and return if no item was found - while (workItem == null) + if (workItem == null) { - Debug.Assert(workQueue._separated.queueProcessingStage == (int)QueueProcessingStage.Scheduled); - workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; - Interlocked.MemoryBarrier(); - - if ((workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal)) != null) - { - break; - } - - // The stage before update would naturally be Determining, in that case there is no more work to do - // However it may be Scheduled if more items were enqueued - int stageBeforeUpdate = - Interlocked.CompareExchange( - ref workQueue._separated.queueProcessingStage, - (int)QueueProcessingStage.NotScheduled, - (int)QueueProcessingStage.Determining); - Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); - if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + // Try to dequeue a work item, clean up and return if no item was found + while ((workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal)) == null) { - // no work items found, no work items enqueued so far - if (s_assignableWorkItemQueueCount > 0) - { - workQueue.UnassignWorkItemQueue(tl); - } - // // No work. // If we missed a steal, though, there may be more work in the queue. @@ -867,19 +854,61 @@ internal static bool Dispatch() // if (missedSteal) { - workQueue.EnsureThreadRequested(); + if (s_assignableWorkItemQueueCount > 0) + { + workQueue.UnassignWorkItemQueue(tl); + } + + Debug.Assert(workQueue._separated.queueProcessingStage != (int)QueueProcessingStage.NotScheduled); + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Scheduled; + ThreadPool.RequestWorkerThread(); + return true; } - return true; + // The stage before update would naturally be Determining, in that case there is no more work to do + // However it may be Scheduled if more items were enqueued + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref workQueue._separated.queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + if (s_assignableWorkItemQueueCount > 0) + { + workQueue.UnassignWorkItemQueue(tl); + } + + return true; + } + + // A work item was enqueued after the stage was set to Determining earlier, and a thread was not requested + // by the enqueuer. Set the stage back to Determining and try to dequeue a work item again. + // + // See the first similarly used memory barrier in the method for why it's necessary. + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); } } { + // A work item may have been enqueued after the stage was set to Determining earlier, so the stage may be + // Scheduled here, and the enqueued work item may have already been dequeued above or by a different thread. Now + // that we're about to try dequeuing a second work item, set the stage back to Determining first so that we'll + // be able to detect if an enqueue races with the dequeue below. + // + // See the first similarly used memory barrier in the method for why it's necessary. + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); + object? secondWorkItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); - if (secondWorkItem != null && Interlocked.CompareExchange(ref _nextWorkItemToProcess, secondWorkItem, null) != null) + if (secondWorkItem != null) { - tl.workStealingQueue.LocalPush(secondWorkItem); + Debug.Assert(_nextWorkItemToProcess == null); + _nextWorkItemToProcess = secondWorkItem; } + if (secondWorkItem != null || missedSteal) { // A work item was successfully dequeued, and there may be more work items to process. Request a thread to @@ -887,6 +916,7 @@ internal static bool Dispatch() // responsibility of the new thread and other enqueuers to request more threads as necessary. The // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some // reason that may have a dependency on other queued work items. + Debug.Assert(workQueue._separated.queueProcessingStage != (int)QueueProcessingStage.NotScheduled); workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Scheduled; ThreadPool.RequestWorkerThread(); } @@ -902,13 +932,19 @@ internal static bool Dispatch() Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); if (stageBeforeUpdate == (int)QueueProcessingStage.Scheduled) { + // A work item was enqueued after the stage was set to Determining earlier, and a thread was not + // requested by the enqueuer, so request a thread now. An alternate is to retry dequeuing, as requesting + // a thread can be more expensive, but retrying multiple times (though unlikely) can delay the + // processing of the first work item that was already dequeued. ThreadPool.RequestWorkerThread(); } } - - // After this point, this method is no longer responsible for ensuring thread requests except for missed steals } + // + // After this point, this method is no longer responsible for ensuring thread requests except for missed steals + // + // Has the desire for logging changed since the last time we entered? workQueue.RefreshLoggingEnabled(); From 64b08e747db2372190c57a0e0c13152122d96b57 Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Mon, 3 Jun 2024 22:49:58 -0700 Subject: [PATCH 15/21] PR comments --- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 10 ++++--- .../System/Threading/ThreadPoolWorkQueue.cs | 26 +++++++++++-------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index f070f6c549d31c..133ffeb9270b7b 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -227,8 +227,9 @@ private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) } else { - // The stage before update would naturally be Determining, in that case there is no more work to do - // However the event enqueuer may have set it to Scheduled if more events arrived so enqueue another work item to handle them + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. int stageBeforeUpdate = Interlocked.CompareExchange( ref _eventQueueProcessingStage, @@ -259,8 +260,9 @@ void IThreadPoolWorkItem.Execute() break; } - // The stage before update would naturally be Determining, in that case there is no more work to do - // However the event enqueuer may have set it to Scheduled if more events were enqueued so let the work item try to dequeue again + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. int stageBeforeUpdate = Interlocked.CompareExchange( ref _eventQueueProcessingStage, diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index e78074072a460f..7e44b1e5734fac 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -404,6 +404,8 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); + private object? _nextWorkItemToProcess; + // The scheme works as following: // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them // From Scheduled, the only transition is to Determining right before trying to dequeue an item @@ -780,8 +782,6 @@ public long GlobalCount // Dispatch (if YieldFromDispatchLoop is true), or performing periodic activities public const uint DispatchQuantumMs = 30; - private static object? _nextWorkItemToProcess; - private static object? DequeueWithPriorityAlternation(ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl, out bool missedSteal) { object? workItem = null; @@ -865,8 +865,9 @@ internal static bool Dispatch() return true; } - // The stage before update would naturally be Determining, in that case there is no more work to do - // However it may be Scheduled if more items were enqueued + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. int stageBeforeUpdate = Interlocked.CompareExchange( ref workQueue._separated.queueProcessingStage, @@ -922,8 +923,9 @@ internal static bool Dispatch() } else { - // The stage before update would naturally be Determining, in that case there is no more work to do - // However the enqueuer may have set it to Scheduled if more items were enqueued so request another thread + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. int stageBeforeUpdate = Interlocked.CompareExchange( ref workQueue._separated.queueProcessingStage, @@ -1206,7 +1208,7 @@ public void Enqueue(T workItem) public void CompleteBatchEnqueue() { // Only enqueue a work item if the stage is NotScheduled - // Otherwise there must be a work item already queued or executing, let it handle parallelization + //Otherwise there must be a work item already queued or another thread already handling parallelization if (Interlocked.Exchange( ref _queueProcessingStage, (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) @@ -1224,8 +1226,9 @@ private void UpdateQueueProcessingStage(bool isQueueEmpty) } else { - // The stage before update would naturally be Determining, in that case there is no more work to do - // However the enqueuer may have set it to Scheduled if more items arrived so enqueue another work item to handle them + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. int stageBeforeUpdate = Interlocked.CompareExchange( ref _queueProcessingStage, @@ -1255,8 +1258,9 @@ void IThreadPoolWorkItem.Execute() break; } - // The stage before update would naturally be Determining, in that case there is no more work to do - // However the enqueuer may have set it to Scheduled if more items were enqueued so let the TP work item try to dequeue again + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. int stageBeforeUpdate = Interlocked.CompareExchange( ref _queueProcessingStage, From bf41a72201d8408e36ebab0e3fde85c18b9e0e3f Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Mon, 3 Jun 2024 23:32:04 -0700 Subject: [PATCH 16/21] Add periods --- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 18 +++++------ .../System/Threading/ThreadPoolWorkQueue.cs | 30 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 133ffeb9270b7b..02a9f3dfb080e9 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -88,14 +88,14 @@ private static SocketAsyncEngine[] CreateEngines() private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); // The scheme works as following: - // From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them - // From Scheduled, the only transition is to Determining right before trying to dequeue an event - // From Determining, it can go to either NotScheduled (when no events are present in the queue (the previous work item processed all of them) - // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient) + // From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them. + // From Scheduled, the only transition is to Determining right before trying to dequeue an event. + // From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them) + // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient). // - // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed + // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed. // Another work item isn't enqueued to the thread pool hastily while the state is Determining, - // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. private enum EventQueueProcessingStage { NotScheduled, @@ -201,8 +201,8 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); - // Only enqueue a work item if the stage is NotScheduled - // Otherwise there must be a work item already queued or executing, let it handle parallelization + // Only enqueue a work item if the stage is NotScheduled. + // Otherwise there must be a work item already queued or another thread already handling parallelization. if (handler.HandleSocketEvents(numEvents) && Interlocked.Exchange( ref _eventQueueProcessingStage, @@ -222,7 +222,7 @@ private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) { if (!isEventQueueEmpty) { - // There are more events to process, set stage to Scheduled and enqueue a work item + // There are more events to process, set stage to Scheduled and enqueue a work item. _eventQueueProcessingStage = (int)EventQueueProcessingStage.Scheduled; } else diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 7e44b1e5734fac..09b3ab1366a276 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -407,14 +407,14 @@ public int Count private object? _nextWorkItemToProcess; // The scheme works as following: - // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them - // From Scheduled, the only transition is to Determining right before trying to dequeue an item + // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them. + // From Scheduled, the only transition is to Determining right before trying to dequeue an item. // From Determining, it can go to either NotScheduled when no items are present in the queue (the previous thread processed all of them) - // or Scheduled if the queue is still not empty (let the current thread handle parallelization as convinient) + // or Scheduled if the queue is still not empty (let the current thread handle parallelization as convinient). // - // The goal is to avoid requesting more threads than necessary, while still ensuring that all items are processed + // The goal is to avoid requesting more threads than necessary, while still ensuring that all items are processed. // Another thread isn't requested hastily while the state is Determining, - // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. private enum QueueProcessingStage { NotScheduled, @@ -591,8 +591,8 @@ public void RefreshLoggingEnabledFull() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void EnsureThreadRequested() { - // Only request a thread if the stage is NotScheduled - // Otherwise let the current requested thread handle parallelization + // Only request a thread if the stage is NotScheduled. + // Otherwise let the current requested thread handle parallelization. if (Interlocked.Exchange( ref _separated.queueProcessingStage, (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) @@ -1178,14 +1178,14 @@ internal sealed class ThreadPoolTypedWorkItemQueue : IThreadPoolWo where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback { // The scheme works as following: - // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them - // From Scheduled, the only transition is to Determining right before trying to dequeue an item + // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them. + // From Scheduled, the only transition is to Determining right before trying to dequeue an item. // From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them) - // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient) + // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient). // - // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed + // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed. // Another TP work item isn't enqueued to the thread pool hastily while the state is Determining, - // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. private enum QueueProcessingStage { NotScheduled, @@ -1207,8 +1207,8 @@ public void Enqueue(T workItem) public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem); public void CompleteBatchEnqueue() { - // Only enqueue a work item if the stage is NotScheduled - //Otherwise there must be a work item already queued or another thread already handling parallelization + // Only enqueue a work item if the stage is NotScheduled. + //Otherwise there must be a work item already queued or another thread already handling parallelization. if (Interlocked.Exchange( ref _queueProcessingStage, (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) @@ -1221,7 +1221,7 @@ private void UpdateQueueProcessingStage(bool isQueueEmpty) { if (!isQueueEmpty) { - // There are more items to process, set stage to Scheduled and enqueue a TP work item + // There are more items to process, set stage to Scheduled and enqueue a TP work item. _queueProcessingStage = (int)QueueProcessingStage.Scheduled; } else From 48c79129e791cc90bfa08f2c6bc337f5e4826998 Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Mon, 3 Jun 2024 23:42:11 -0700 Subject: [PATCH 17/21] Fix error CS0120 --- .../src/System/Threading/ThreadPoolWorkQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 09b3ab1366a276..2e2cdba1f1aaa2 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -404,7 +404,7 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); - private object? _nextWorkItemToProcess; + private static object? _nextWorkItemToProcess; // The scheme works as following: // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them. From 153f51cb580b1783f37b0750d5b900c7a1b11b4f Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Tue, 4 Jun 2024 13:15:50 -0700 Subject: [PATCH 18/21] PR comments --- .../Net/Sockets/SocketAsyncEngine.Unix.cs | 12 ++++----- .../System/Threading/ThreadPoolWorkQueue.cs | 26 +++++++++---------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index 02a9f3dfb080e9..d02be0e8aa656b 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -87,11 +87,11 @@ private static SocketAsyncEngine[] CreateEngines() // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); - // The scheme works as following: - // From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them. - // From Scheduled, the only transition is to Determining right before trying to dequeue an event. - // From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them) - // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient). + // The scheme works as follows: + // - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them. + // - From Scheduled, the only transition is to Determining right before trying to dequeue an event. + // - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them) + // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient). // // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed. // Another work item isn't enqueued to the thread pool hastily while the state is Determining, @@ -229,7 +229,7 @@ private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) { // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer - // would not have scheduled a work item to process the work, so try to dequeue a work item again. + // would not have scheduled a work item to process the work, so schedule one now. int stageBeforeUpdate = Interlocked.CompareExchange( ref _eventQueueProcessingStage, diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 2e2cdba1f1aaa2..0f48d6d1c8b68e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -406,11 +406,11 @@ public int Count private static object? _nextWorkItemToProcess; - // The scheme works as following: - // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them. - // From Scheduled, the only transition is to Determining right before trying to dequeue an item. - // From Determining, it can go to either NotScheduled when no items are present in the queue (the previous thread processed all of them) - // or Scheduled if the queue is still not empty (let the current thread handle parallelization as convinient). + // The scheme works as follows: + // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them. + // - From Scheduled, the only transition is to Determining right before trying to dequeue an item. + // - From Determining, it can go to either NotScheduled when no items are present in the queue (the previous thread processed all of them) + // or Scheduled if the queue is still not empty (let the current thread handle parallelization as convinient). // // The goal is to avoid requesting more threads than necessary, while still ensuring that all items are processed. // Another thread isn't requested hastily while the state is Determining, @@ -925,7 +925,7 @@ internal static bool Dispatch() { // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer - // would not have scheduled a work item to process the work, so try to dequeue a work item again. + // would not have requested a thread, so request one now. int stageBeforeUpdate = Interlocked.CompareExchange( ref workQueue._separated.queueProcessingStage, @@ -1177,11 +1177,11 @@ internal sealed class ThreadPoolTypedWorkItemQueue : IThreadPoolWo where T : struct where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback { - // The scheme works as following: - // From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them. - // From Scheduled, the only transition is to Determining right before trying to dequeue an item. - // From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them) - // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient). + // The scheme works as follows: + // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them. + // - From Scheduled, the only transition is to Determining right before trying to dequeue an item. + // - From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them) + // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient). // // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed. // Another TP work item isn't enqueued to the thread pool hastily while the state is Determining, @@ -1208,7 +1208,7 @@ public void Enqueue(T workItem) public void CompleteBatchEnqueue() { // Only enqueue a work item if the stage is NotScheduled. - //Otherwise there must be a work item already queued or another thread already handling parallelization. + // Otherwise there must be a work item already queued or another thread already handling parallelization. if (Interlocked.Exchange( ref _queueProcessingStage, (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) @@ -1228,7 +1228,7 @@ private void UpdateQueueProcessingStage(bool isQueueEmpty) { // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer - // would not have scheduled a work item to process the work, so try to dequeue a work item again. + // would not have scheduled a work item to process the work, so schedule one one. int stageBeforeUpdate = Interlocked.CompareExchange( ref _queueProcessingStage, From 737396ed31bb642af8a1f68da00863eea42bdd23 Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Tue, 4 Jun 2024 15:18:30 -0700 Subject: [PATCH 19/21] Make _nextWorkItemToProcess non static --- .../src/System/Threading/ThreadPoolWorkQueue.cs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 0f48d6d1c8b68e..7cfdbc413d11e7 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -404,7 +404,7 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); - private static object? _nextWorkItemToProcess; + private object? _nextWorkItemToProcess; // The scheme works as follows: // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them. @@ -648,6 +648,8 @@ internal static bool LocalFindAndPop(object callback) public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { + ThreadPoolWorkQueue workQueue = ThreadPool.s_workQueue; + // Check for local work items object? workItem = tl.workStealingQueue.LocalPop(); if (workItem != null) @@ -655,9 +657,9 @@ internal static bool LocalFindAndPop(object callback) return workItem; } - if (_nextWorkItemToProcess != null) + if (workQueue._nextWorkItemToProcess != null) { - workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); + workItem = Interlocked.Exchange(ref workQueue._nextWorkItemToProcess, null); if (workItem != null) { return workItem; @@ -835,9 +837,9 @@ internal static bool Dispatch() Interlocked.MemoryBarrier(); object? workItem = null; - if (_nextWorkItemToProcess != null) + if (workQueue._nextWorkItemToProcess != null) { - workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); + workItem = Interlocked.Exchange(ref workQueue._nextWorkItemToProcess, null); } if (workItem == null) @@ -906,8 +908,8 @@ internal static bool Dispatch() object? secondWorkItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); if (secondWorkItem != null) { - Debug.Assert(_nextWorkItemToProcess == null); - _nextWorkItemToProcess = secondWorkItem; + Debug.Assert(workQueue._nextWorkItemToProcess == null); + workQueue._nextWorkItemToProcess = secondWorkItem; } if (secondWorkItem != null || missedSteal) From 68eaa53930239c341c1ba0ecdd85f9f7c7d1617c Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Tue, 4 Jun 2024 16:32:52 -0700 Subject: [PATCH 20/21] Add MemoryBarrier comments --- .../src/System/Net/Sockets/SocketAsyncEngine.Unix.cs | 6 ++++++ .../src/System/Threading/ThreadPoolWorkQueue.cs | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index d02be0e8aa656b..f14c6753e93d78 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -252,6 +252,12 @@ void IThreadPoolWorkItem.Execute() while (true) { Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled); + + // The change needs to be visible to other threads that may request a worker thread before a work item is attempted + // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a + // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing + // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of + // Scheduled, and try to dequeue again or request another thread. _eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining; Interlocked.MemoryBarrier(); diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 7cfdbc413d11e7..f7fb03e90637fb 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -1252,6 +1252,12 @@ void IThreadPoolWorkItem.Execute() while (true) { Debug.Assert(_queueProcessingStage == (int)QueueProcessingStage.Scheduled); + + // The change needs to be visible to other threads that may request a worker thread before a work item is attempted + // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a + // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing + // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of + // Scheduled, and try to dequeue again or request another thread. _queueProcessingStage = (int)QueueProcessingStage.Determining; Interlocked.MemoryBarrier(); From 887e187418c43841d31e841abc01052b7e14ea3e Mon Sep 17 00:00:00 2001 From: Eduardo Manuel Velarde Polar Date: Wed, 5 Jun 2024 13:44:32 -0700 Subject: [PATCH 21/21] PR comment --- .../src/System/Threading/ThreadPoolWorkQueue.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index f7fb03e90637fb..659d2da9800783 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -648,8 +648,6 @@ internal static bool LocalFindAndPop(object callback) public object? Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal) { - ThreadPoolWorkQueue workQueue = ThreadPool.s_workQueue; - // Check for local work items object? workItem = tl.workStealingQueue.LocalPop(); if (workItem != null) @@ -657,9 +655,9 @@ internal static bool LocalFindAndPop(object callback) return workItem; } - if (workQueue._nextWorkItemToProcess != null) + if (_nextWorkItemToProcess != null) { - workItem = Interlocked.Exchange(ref workQueue._nextWorkItemToProcess, null); + workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); if (workItem != null) { return workItem;