-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallelize epoll events on thread pool and process events in the same thread #35330
Changes from all commits
bc829b7
27992a1
46dc10d
df0caf0
723e7f2
f12febc
9c0e0f1
6f414e8
7ac0395
acd9c4d
aea8e26
079e962
99fc37b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -274,11 +274,21 @@ public void Dispatch() | |
} | ||
else | ||
{ | ||
// Async operation. Process the IO on the threadpool. | ||
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); | ||
// Async operation. | ||
Schedule(); | ||
} | ||
} | ||
|
||
public void Schedule() | ||
{ | ||
Debug.Assert(Event == null); | ||
|
||
// Async operation. Process the IO on the threadpool. | ||
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); | ||
} | ||
|
||
public void Process() => ((IThreadPoolWorkItem)this).Execute(); | ||
|
||
void IThreadPoolWorkItem.Execute() | ||
{ | ||
// ReadOperation and WriteOperation, the only two types derived from | ||
|
@@ -706,6 +716,7 @@ private enum QueueState : byte | |
// These fields define the queue state. | ||
|
||
private QueueState _state; // See above | ||
private bool _isNextOperationSynchronous; | ||
private int _sequenceNumber; // This sequence number is updated when we receive an epoll notification. | ||
// It allows us to detect when a new epoll notification has arrived | ||
// since the last time we checked the state of the queue. | ||
|
@@ -720,6 +731,8 @@ private enum QueueState : byte | |
|
||
private LockToken Lock() => new LockToken(_queueLock); | ||
|
||
public bool IsNextOperationSynchronous_Speculative => _isNextOperationSynchronous; | ||
|
||
public void Init() | ||
{ | ||
Debug.Assert(_queueLock == null); | ||
|
@@ -779,7 +792,12 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation | |
// Enqueue the operation. | ||
Debug.Assert(operation.Next == operation, "Expected operation.Next == operation"); | ||
|
||
if (_tail != null) | ||
if (_tail == null) | ||
{ | ||
Debug.Assert(!_isNextOperationSynchronous); | ||
_isNextOperationSynchronous = operation.Event != null; | ||
} | ||
else | ||
{ | ||
operation.Next = _tail.Next; | ||
_tail.Next = operation; | ||
|
@@ -825,8 +843,7 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation | |
} | ||
} | ||
|
||
// Called on the epoll thread whenever we receive an epoll notification. | ||
public void HandleEvent(SocketAsyncContext context) | ||
public AsyncOperation? ProcessSyncEventOrGetAsyncEvent(SocketAsyncContext context) | ||
{ | ||
AsyncOperation op; | ||
using (Lock()) | ||
|
@@ -839,7 +856,7 @@ public void HandleEvent(SocketAsyncContext context) | |
Debug.Assert(_tail == null, "State == Ready but queue is not empty!"); | ||
_sequenceNumber++; | ||
Trace(context, $"Exit (previously ready)"); | ||
return; | ||
return null; | ||
|
||
case QueueState.Waiting: | ||
Debug.Assert(_tail != null, "State == Waiting but queue is empty!"); | ||
|
@@ -852,21 +869,31 @@ public void HandleEvent(SocketAsyncContext context) | |
Debug.Assert(_tail != null, "State == Processing but queue is empty!"); | ||
_sequenceNumber++; | ||
Trace(context, $"Exit (currently processing)"); | ||
return; | ||
return null; | ||
|
||
case QueueState.Stopped: | ||
Debug.Assert(_tail == null); | ||
Trace(context, $"Exit (stopped)"); | ||
return; | ||
return null; | ||
|
||
default: | ||
Environment.FailFast("unexpected queue state"); | ||
return; | ||
return null; | ||
} | ||
} | ||
|
||
// Dispatch the op so we can try to process it. | ||
op.Dispatch(); | ||
ManualResetEventSlim? e = op.Event; | ||
if (e != null) | ||
{ | ||
// Sync operation. Signal waiting thread to continue processing. | ||
e.Set(); | ||
return null; | ||
} | ||
else | ||
{ | ||
// Async operation. The caller will figure out how to process the IO. | ||
return op; | ||
} | ||
} | ||
|
||
internal void ProcessAsyncOperation(TOperation op) | ||
|
@@ -991,6 +1018,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) | |
{ | ||
// No more operations to process | ||
_tail = null; | ||
_isNextOperationSynchronous = false; | ||
_state = QueueState.Ready; | ||
_sequenceNumber++; | ||
Trace(context, $"Exit (finished queue)"); | ||
|
@@ -999,6 +1027,7 @@ public OperationResult ProcessQueuedOperation(TOperation op) | |
{ | ||
// Pop current operation and advance to next | ||
nextOp = _tail.Next = op.Next; | ||
_isNextOperationSynchronous = nextOp.Event != null; | ||
} | ||
} | ||
} | ||
|
@@ -1033,11 +1062,13 @@ public void CancelAndContinueProcessing(TOperation op) | |
{ | ||
// No more operations | ||
_tail = null; | ||
_isNextOperationSynchronous = false; | ||
} | ||
else | ||
{ | ||
// Pop current operation and advance to next | ||
_tail.Next = op.Next; | ||
_isNextOperationSynchronous = op.Next.Event != null; | ||
} | ||
|
||
// We're the first op in the queue. | ||
|
@@ -1112,6 +1143,7 @@ public bool StopAndAbort(SocketAsyncContext context) | |
} | ||
|
||
_tail = null; | ||
_isNextOperationSynchronous = false; | ||
|
||
Trace(context, $"Exit"); | ||
} | ||
|
@@ -1946,6 +1978,41 @@ public SocketError SendFileAsync(SafeFileHandle fileHandle, long offset, long co | |
return SocketError.IOPending; | ||
} | ||
|
||
// Called on the epoll thread, speculatively tries to process synchronous events and errors for synchronous events, and | ||
// returns any remaining events that remain to be processed. Taking a lock for each operation queue to deterministically | ||
// handle synchronous events on the epoll thread seems to significantly reduce throughput in benchmarks. On the other | ||
// hand, the speculative checks make it nondeterministic, where it would be possible for the epoll thread to think that | ||
// the next operation in a queue is not synchronous when it is (due to a race, old caches, etc.) and cause the event to | ||
// be scheduled instead. It's not functionally incorrect to schedule the release of a synchronous operation, just it may | ||
// lead to thread pool starvation issues if the synchronous operations are blocking thread pool threads (typically not | ||
// advised) and more threads are not immediately available to run work items that would release those operations. | ||
public unsafe Interop.Sys.SocketEvents HandleSyncEventsSpeculatively(Interop.Sys.SocketEvents events) | ||
{ | ||
if ((events & Interop.Sys.SocketEvents.Error) != 0) | ||
{ | ||
// Set the Read and Write flags; the processing for these events | ||
// will pick up the error. | ||
events ^= Interop.Sys.SocketEvents.Error; | ||
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write; | ||
} | ||
|
||
if ((events & Interop.Sys.SocketEvents.Read) != 0 && | ||
_receiveQueue.IsNextOperationSynchronous_Speculative && | ||
_receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) == null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kouvel @stephentoub @adamsitnik I think there is an issue when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch, will fix |
||
{ | ||
events ^= Interop.Sys.SocketEvents.Read; | ||
} | ||
|
||
if ((events & Interop.Sys.SocketEvents.Write) != 0 && | ||
_sendQueue.IsNextOperationSynchronous_Speculative && | ||
_sendQueue.ProcessSyncEventOrGetAsyncEvent(this) == null) | ||
{ | ||
events ^= Interop.Sys.SocketEvents.Write; | ||
} | ||
|
||
return events; | ||
} | ||
|
||
public unsafe void HandleEvents(Interop.Sys.SocketEvents events) | ||
{ | ||
if ((events & Interop.Sys.SocketEvents.Error) != 0) | ||
|
@@ -1955,14 +2022,23 @@ public unsafe void HandleEvents(Interop.Sys.SocketEvents events) | |
events |= Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write; | ||
} | ||
|
||
if ((events & Interop.Sys.SocketEvents.Read) != 0) | ||
AsyncOperation? receiveOperation = | ||
stephentoub marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(events & Interop.Sys.SocketEvents.Read) != 0 ? _receiveQueue.ProcessSyncEventOrGetAsyncEvent(this) : null; | ||
AsyncOperation? sendOperation = | ||
(events & Interop.Sys.SocketEvents.Write) != 0 ? _sendQueue.ProcessSyncEventOrGetAsyncEvent(this) : null; | ||
|
||
// This method is called from a thread pool thread. When we have only one operation to process, process it | ||
// synchronously to avoid an extra thread pool work item. When we have two operations to process, processing both | ||
// synchronously may delay the second operation, so schedule one onto the thread pool and process the other | ||
// synchronously. There might be better ways of doing this. | ||
if (sendOperation == null) | ||
{ | ||
_receiveQueue.HandleEvent(this); | ||
receiveOperation?.Process(); | ||
} | ||
|
||
if ((events & Interop.Sys.SocketEvents.Write) != 0) | ||
else | ||
{ | ||
_sendQueue.HandleEvent(this); | ||
receiveOperation?.Schedule(); | ||
sendOperation.Process(); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I reading the code correctly that we'll likely end up calling Set twice for a sync operation (with the second call just being a nop)? I think that's probably fine, but it'd be worth a comment calling that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you think it might be called twice? My intention was that either the epoll thread handles an event or it will queue it for processing in the background, but not both. If the epoll thread correctly sees that there is a pending sync operation next, then it would call set and not queue an operation for that. Otherwise, it would not process the event and queue it instead.