diff --git a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs index aafcf158518a9c..f14c6753e93d78 100644 --- a/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs +++ b/src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs @@ -87,12 +87,23 @@ private static SocketAsyncEngine[] CreateEngines() // private readonly ConcurrentQueue _eventQueue = new ConcurrentQueue(); + // The scheme works as follows: + // - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them. + // - From Scheduled, the only transition is to Determining right before trying to dequeue an event. + // - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them) + // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient). // - // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is - // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is - // not scheduled. Changes are protected by atomic operations as appropriate. - // - private int _eventQueueProcessingRequested; + // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed. + // Another work item isn't enqueued to the thread pool hastily while the state is Determining, + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. + private enum EventQueueProcessingStage + { + NotScheduled, + Determining, + Scheduled + } + + private int _eventQueueProcessingStage; // // Registers the Socket with a SocketAsyncEngine, and returns the associated engine. @@ -190,9 +201,14 @@ private void EventLoop() // The native shim is responsible for ensuring this condition. Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}"); - if (handler.HandleSocketEvents(numEvents)) + // Only enqueue a work item if the stage is NotScheduled. + // Otherwise there must be a work item already queued or another thread already handling parallelization. + if (handler.HandleSocketEvents(numEvents) && + Interlocked.Exchange( + ref _eventQueueProcessingStage, + (int)EventQueueProcessingStage.Scheduled) == (int)EventQueueProcessingStage.NotScheduled) { - ScheduleToProcessEvents(); + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } } } @@ -202,42 +218,73 @@ private void EventLoop() } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void ScheduleToProcessEvents() + private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty) { - // Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid - // over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item - // to be scheduled for parallelizing processing of events. - if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0) + if (!isEventQueueEmpty) { - ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + // There are more events to process, set stage to Scheduled and enqueue a work item. + _eventQueueProcessingStage = (int)EventQueueProcessingStage.Scheduled; + } + else + { + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so schedule one now. + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _eventQueueProcessingStage, + (int)EventQueueProcessingStage.NotScheduled, + (int)EventQueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining) + { + return; + } } + + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); } void IThreadPoolWorkItem.Execute() { - // Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer - // threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an - // enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is - // the last thread processing events, it must see the event queued by the enqueuer. - Interlocked.Exchange(ref _eventQueueProcessingRequested, 0); - ConcurrentQueue eventQueue = _eventQueue; - if (!eventQueue.TryDequeue(out SocketIOEvent ev)) + SocketIOEvent ev; + while (true) { - return; - } + Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled); - int startTimeMs = Environment.TickCount; + // The change needs to be visible to other threads that may request a worker thread before a work item is attempted + // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a + // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing + // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of + // Scheduled, and try to dequeue again or request another thread. + _eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); + + if (eventQueue.TryDequeue(out ev)) + { + break; + } - // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize - // processing of events, before processing more events. Following this, it is the responsibility of the new work - // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if - // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued - // socket events. - ScheduleToProcessEvents(); + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _eventQueueProcessingStage, + (int)EventQueueProcessingStage.NotScheduled, + (int)EventQueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining) + { + return; + } + } - while (true) + UpdateEventQueueProcessingStage(eventQueue.IsEmpty); + + int startTimeMs = Environment.TickCount; + do { ev.Context.HandleEvents(ev.Events); @@ -253,19 +300,7 @@ void IThreadPoolWorkItem.Execute() // using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater // impact on throughput compared to the threshold chosen below, though it is slight enough that it may not // matter much. Higher thresholds didn't seem to have any noticeable effect. - if (Environment.TickCount - startTimeMs >= 15) - { - break; - } - - if (!eventQueue.TryDequeue(out ev)) - { - return; - } - } - - // The queue was not observed to be empty, schedule another work item before yielding the thread - ScheduleToProcessEvents(); + } while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev)); } private void FreeNativeResources() diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index cc7c76922d9964..659d2da9800783 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -404,12 +404,30 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); + private object? _nextWorkItemToProcess; + + // The scheme works as follows: + // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a thread is requested to process them. + // - From Scheduled, the only transition is to Determining right before trying to dequeue an item. + // - From Determining, it can go to either NotScheduled when no items are present in the queue (the previous thread processed all of them) + // or Scheduled if the queue is still not empty (let the current thread handle parallelization as convinient). + // + // The goal is to avoid requesting more threads than necessary, while still ensuring that all items are processed. + // Another thread isn't requested hastily while the state is Determining, + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. + private enum QueueProcessingStage + { + NotScheduled, + Determining, + Scheduled + } + [StructLayout(LayoutKind.Sequential)] private struct CacheLineSeparated { private readonly Internal.PaddingFor32 pad1; - public int hasOutstandingThreadRequest; + public int queueProcessingStage; private readonly Internal.PaddingFor32 pad2; } @@ -573,24 +591,16 @@ public void RefreshLoggingEnabledFull() [MethodImpl(MethodImplOptions.AggressiveInlining)] internal void EnsureThreadRequested() { - // Only one thread is requested at a time to avoid over-parallelization - if (Interlocked.CompareExchange(ref _separated.hasOutstandingThreadRequest, 1, 0) == 0) + // Only request a thread if the stage is NotScheduled. + // Otherwise let the current requested thread handle parallelization. + if (Interlocked.Exchange( + ref _separated.queueProcessingStage, + (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) { ThreadPool.RequestWorkerThread(); } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void MarkThreadRequestSatisfied() - { - // The change needs to be visible to other threads that may request a worker thread before a work item is attempted - // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a - // thread because it sees that a thread request is already outstanding, and the current thread is the last thread - // processing work items, the current thread must see the work item queued by the enqueuer. - _separated.hasOutstandingThreadRequest = 0; - Interlocked.MemoryBarrier(); - } - public void Enqueue(object callback, bool forceGlobal) { Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); @@ -645,6 +655,15 @@ internal static bool LocalFindAndPop(object callback) return workItem; } + if (_nextWorkItemToProcess != null) + { + workItem = Interlocked.Exchange(ref _nextWorkItemToProcess, null); + if (workItem != null) + { + return workItem; + } + } + // Check for high-priority work items if (tl.isProcessingHighPriorityWorkItems) { @@ -763,6 +782,32 @@ public long GlobalCount // Dispatch (if YieldFromDispatchLoop is true), or performing periodic activities public const uint DispatchQuantumMs = 30; + private static object? DequeueWithPriorityAlternation(ThreadPoolWorkQueue workQueue, ThreadPoolWorkQueueThreadLocals tl, out bool missedSteal) + { + object? workItem = null; + + // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work + // items get a chance to run in situations where worker threads are starved and work items that run also + // take over the thread, sustaining starvation. For example, when worker threads are continually starved, + // high-priority work items may always be queued and normal-priority work items may not get a chance to run. + bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst; + if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal) + { + workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst; + ConcurrentQueue queue = + s_assignableWorkItemQueueCount > 0 ? tl.assignedGlobalWorkItemQueue : workQueue.workItems; + if (!queue.TryDequeue(out workItem) && s_assignableWorkItemQueueCount > 0) + { + workQueue.workItems.TryDequeue(out workItem); + } + } + + missedSteal = false; + workItem ??= workQueue.Dequeue(tl, ref missedSteal); + + return workItem; + } + /// /// Dispatches work items to this thread. /// @@ -780,66 +825,128 @@ internal static bool Dispatch() workQueue.AssignWorkItemQueue(tl); } - // Before dequeuing the first work item, acknowledge that the thread request has been satisfied - workQueue.MarkThreadRequestSatisfied(); + // The change needs to be visible to other threads that may request a worker thread before a work item is attempted + // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a + // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing + // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of + // Scheduled, and try to dequeue again or request another thread. + Debug.Assert(workQueue._separated.queueProcessingStage == (int)QueueProcessingStage.Scheduled); + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); object? workItem = null; + if (workQueue._nextWorkItemToProcess != null) { - // Alternate between checking for high-prioriy and normal-priority work first, that way both sets of work - // items get a chance to run in situations where worker threads are starved and work items that run also - // take over the thread, sustaining starvation. For example, when worker threads are continually starved, - // high-priority work items may always be queued and normal-priority work items may not get a chance to run. - bool dispatchNormalPriorityWorkFirst = workQueue._dispatchNormalPriorityWorkFirst; - if (dispatchNormalPriorityWorkFirst && !tl.workStealingQueue.CanSteal) - { - workQueue._dispatchNormalPriorityWorkFirst = !dispatchNormalPriorityWorkFirst; - ConcurrentQueue queue = - s_assignableWorkItemQueueCount > 0 ? tl.assignedGlobalWorkItemQueue : workQueue.workItems; - if (!queue.TryDequeue(out workItem) && s_assignableWorkItemQueueCount > 0) - { - workQueue.workItems.TryDequeue(out workItem); - } - } + workItem = Interlocked.Exchange(ref workQueue._nextWorkItemToProcess, null); + } - if (workItem == null) + if (workItem == null) + { + // Try to dequeue a work item, clean up and return if no item was found + while ((workItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal)) == null) { - bool missedSteal = false; - workItem = workQueue.Dequeue(tl, ref missedSteal); - - if (workItem == null) + // + // No work. + // If we missed a steal, though, there may be more work in the queue. + // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread + // that owns the contended work-stealing queue will pick up its own workitems in the meantime, + // which will be more efficient than this thread doing it anyway. + // + if (missedSteal) { if (s_assignableWorkItemQueueCount > 0) { workQueue.UnassignWorkItemQueue(tl); } - // - // No work. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - if (missedSteal) + Debug.Assert(workQueue._separated.queueProcessingStage != (int)QueueProcessingStage.NotScheduled); + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Scheduled; + ThreadPool.RequestWorkerThread(); + return true; + } + + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref workQueue._separated.queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + if (s_assignableWorkItemQueueCount > 0) { - workQueue.EnsureThreadRequested(); + workQueue.UnassignWorkItemQueue(tl); } - // Tell the VM we're returning normally, not because Hill Climbing asked us to return. return true; } + + // A work item was enqueued after the stage was set to Determining earlier, and a thread was not requested + // by the enqueuer. Set the stage back to Determining and try to dequeue a work item again. + // + // See the first similarly used memory barrier in the method for why it's necessary. + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); } + } + + { + // A work item may have been enqueued after the stage was set to Determining earlier, so the stage may be + // Scheduled here, and the enqueued work item may have already been dequeued above or by a different thread. Now + // that we're about to try dequeuing a second work item, set the stage back to Determining first so that we'll + // be able to detect if an enqueue races with the dequeue below. + // + // See the first similarly used memory barrier in the method for why it's necessary. + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); - // A work item was successfully dequeued, and there may be more work items to process. Request a thread to - // parallelize processing of work items, before processing more work items. Following this, it is the - // responsibility of the new thread and other enqueuers to request more threads as necessary. The - // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some - // reason that may have a dependency on other queued work items. - workQueue.EnsureThreadRequested(); + object? secondWorkItem = DequeueWithPriorityAlternation(workQueue, tl, out bool missedSteal); + if (secondWorkItem != null) + { + Debug.Assert(workQueue._nextWorkItemToProcess == null); + workQueue._nextWorkItemToProcess = secondWorkItem; + } - // After this point, this method is no longer responsible for ensuring thread requests except for missed steals + if (secondWorkItem != null || missedSteal) + { + // A work item was successfully dequeued, and there may be more work items to process. Request a thread to + // parallelize processing of work items, before processing more work items. Following this, it is the + // responsibility of the new thread and other enqueuers to request more threads as necessary. The + // parallelization may be necessary here for correctness (aside from perf) if the work item blocks for some + // reason that may have a dependency on other queued work items. + Debug.Assert(workQueue._separated.queueProcessingStage != (int)QueueProcessingStage.NotScheduled); + workQueue._separated.queueProcessingStage = (int)QueueProcessingStage.Scheduled; + ThreadPool.RequestWorkerThread(); + } + else + { + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have requested a thread, so request one now. + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref workQueue._separated.queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Scheduled) + { + // A work item was enqueued after the stage was set to Determining earlier, and a thread was not + // requested by the enqueuer, so request a thread now. An alternate is to retry dequeuing, as requesting + // a thread can be more expensive, but retrying multiple times (though unlikely) can delay the + // processing of the first work item that was already dequeued. + ThreadPool.RequestWorkerThread(); + } + } } + // + // After this point, this method is no longer responsible for ensuring thread requests except for missed steals + // + // Has the desire for logging changed since the last time we entered? workQueue.RefreshLoggingEnabled(); @@ -1070,7 +1177,23 @@ internal sealed class ThreadPoolTypedWorkItemQueue : IThreadPoolWo where T : struct where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback { - private int _isScheduledForProcessing; + // The scheme works as follows: + // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them. + // - From Scheduled, the only transition is to Determining right before trying to dequeue an item. + // - From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them) + // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient). + // + // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed. + // Another TP work item isn't enqueued to the thread pool hastily while the state is Determining, + // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. + private enum QueueProcessingStage + { + NotScheduled, + Determining, + Scheduled + } + + private int _queueProcessingStage; private readonly ConcurrentQueue _workItems = new ConcurrentQueue(); public int Count => _workItems.Count; @@ -1082,40 +1205,81 @@ public void Enqueue(T workItem) } public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem); - public void CompleteBatchEnqueue() => ScheduleForProcessing(); - - private void ScheduleForProcessing() + public void CompleteBatchEnqueue() { - // Only one thread is requested at a time to avoid over-parallelization. Currently where this type is used, queued - // work is expected to be processed at high priority. The implementation could be modified to support different - // priorities if necessary. - if (Interlocked.CompareExchange(ref _isScheduledForProcessing, 1, 0) == 0) + // Only enqueue a work item if the stage is NotScheduled. + // Otherwise there must be a work item already queued or another thread already handling parallelization. + if (Interlocked.Exchange( + ref _queueProcessingStage, + (int)QueueProcessingStage.Scheduled) == (int)QueueProcessingStage.NotScheduled) { ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this); } } - void IThreadPoolWorkItem.Execute() + private void UpdateQueueProcessingStage(bool isQueueEmpty) { - Debug.Assert(_isScheduledForProcessing != 0); + if (!isQueueEmpty) + { + // There are more items to process, set stage to Scheduled and enqueue a TP work item. + _queueProcessingStage = (int)QueueProcessingStage.Scheduled; + } + else + { + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so schedule one one. + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + return; + } + } - // This change needs to be visible to other threads that may enqueue work items before a work item is attempted to - // be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not schedule for - // processing, and the current thread is the last thread processing work items, the current thread must see the work - // item queued by the enqueuer. - _isScheduledForProcessing = 0; - Interlocked.MemoryBarrier(); - if (!_workItems.TryDequeue(out T workItem)) + ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this); + } + + void IThreadPoolWorkItem.Execute() + { + T workItem; + while (true) { - return; + Debug.Assert(_queueProcessingStage == (int)QueueProcessingStage.Scheduled); + + // The change needs to be visible to other threads that may request a worker thread before a work item is attempted + // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a + // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing + // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of + // Scheduled, and try to dequeue again or request another thread. + _queueProcessingStage = (int)QueueProcessingStage.Determining; + Interlocked.MemoryBarrier(); + + if (_workItems.TryDequeue(out workItem)) + { + break; + } + + // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining + // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer + // would not have scheduled a work item to process the work, so try to dequeue a work item again. + int stageBeforeUpdate = + Interlocked.CompareExchange( + ref _queueProcessingStage, + (int)QueueProcessingStage.NotScheduled, + (int)QueueProcessingStage.Determining); + Debug.Assert(stageBeforeUpdate != (int)QueueProcessingStage.NotScheduled); + if (stageBeforeUpdate == (int)QueueProcessingStage.Determining) + { + return; + } } - // An work item was successfully dequeued, and there may be more work items to process. Schedule a work item to - // parallelize processing of work items, before processing more work items. Following this, it is the responsibility - // of the new work item and the poller thread to schedule more work items as necessary. The parallelization may be - // necessary here if the user callback as part of handling the work item blocks for some reason that may have a - // dependency on other queued work items. - ScheduleForProcessing(); + UpdateQueueProcessingStage(_workItems.IsEmpty); ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals!; Debug.Assert(tl != null);