Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -87,23 +88,18 @@ private static SocketAsyncEngine[] CreateEngines()
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();

// The scheme works as follows:
// - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
// - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
// - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
// or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
//
// The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
// Another work item isn't enqueued to the thread pool hastily while the state is Determining,
// instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
private enum EventQueueProcessingStage
{
NotScheduled,
Determining,
Scheduled
}

private EventQueueProcessingStage _eventQueueProcessingStage;
// This flag is used for communication between item enqueuing and workers that process the items.
// There are two states of this flag:
// 0: has no guarantees
// 1: means a worker will check work queues and ensure that
// any work items inserted in work queue before setting the flag
// are picked up.
// Note: The state must be cleared by the worker thread _before_
// checking. Otherwise there is a window between finding no work
// and resetting the flag, when the flag is in a wrong state.
// A new work item may be added right before the flag is reset
// without asking for a worker, while the last worker is quitting.
private int _hasOutstandingThreadRequest;

//
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
Expand Down Expand Up @@ -201,14 +197,9 @@ private void EventLoop()
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");

// Only enqueue a work item if the stage is NotScheduled.
// Otherwise there must be a work item already queued or another thread already handling parallelization.
if (handler.HandleSocketEvents(numEvents) &&
Interlocked.Exchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled)
if (handler.HandleSocketEvents(numEvents))
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
EnsureWorkerScheduled();
}
}
}
Expand All @@ -218,70 +209,40 @@ private void EventLoop()
}
}

private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void EnsureWorkerScheduled()
{
if (!isEventQueueEmpty)
{
// There are more events to process, set stage to Scheduled and enqueue a work item.
_eventQueueProcessingStage = EventQueueProcessingStage.Scheduled;
}
else
// Only one worker is requested at a time to mitigate Thundering Herd problem.
if (Interlocked.Exchange(ref _hasOutstandingThreadRequest, 1) == 0)
{
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so schedule one now.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

void IThreadPoolWorkItem.Execute()
{
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
SocketIOEvent ev;
while (true)
{
Debug.Assert(_eventQueueProcessingStage == EventQueueProcessingStage.Scheduled);
// We are asking for one worker at a time, thus the state should be 1.
Debug.Assert(_hasOutstandingThreadRequest == 1);
_hasOutstandingThreadRequest = 0;

// The change needs to be visible to other threads that may request a worker thread before a work item is attempted
// to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
// Scheduled, and try to dequeue again or request another thread.
_eventQueueProcessingStage = EventQueueProcessingStage.Determining;
Interlocked.MemoryBarrier();
// Checking for items must happen after resetting the processing state.
Interlocked.MemoryBarrier();

if (eventQueue.TryDequeue(out ev))
{
break;
}

// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
EventQueueProcessingStage stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
EventQueueProcessingStage.NotScheduled,
EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
{
return;
}
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
if (!eventQueue.TryDequeue(out SocketIOEvent ev))
{
return;
}

UpdateEventQueueProcessingStage(eventQueue.IsEmpty);
// The batch that is currently in the queue could have asked only for one worker.
// We are going to process a workitem, which may take unknown time or even block.
// In a worst case the current workitem will indirectly depend on progress of other
// items and that would lead to a deadlock if no one else checks the queue.
// We must ensure at least one more worker is coming if the queue is not empty.
if (!eventQueue.IsEmpty)
{
EnsureWorkerScheduled();
}

int startTimeMs = Environment.TickCount;
do
Expand Down
Loading
Loading