@@ -87,12 +87,23 @@ private static SocketAsyncEngine[] CreateEngines()
87
87
//
88
88
private readonly ConcurrentQueue < SocketIOEvent > _eventQueue = new ConcurrentQueue < SocketIOEvent > ( ) ;
89
89
90
+ // The scheme works as follows:
91
+ // - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
92
+ // - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
93
+ // - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
94
+ // or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
90
95
//
91
- // This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is
92
- // set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is
93
- // not scheduled. Changes are protected by atomic operations as appropriate.
94
- //
95
- private int _eventQueueProcessingRequested ;
96
+ // The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
97
+ // Another work item isn't enqueued to the thread pool hastily while the state is Determining,
98
+ // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
99
+ private enum EventQueueProcessingStage
100
+ {
101
+ NotScheduled ,
102
+ Determining ,
103
+ Scheduled
104
+ }
105
+
106
+ private int _eventQueueProcessingStage ;
96
107
97
108
//
98
109
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
@@ -190,9 +201,14 @@ private void EventLoop()
190
201
// The native shim is responsible for ensuring this condition.
191
202
Debug . Assert ( numEvents > 0 , $ "Unexpected numEvents: { numEvents } ") ;
192
203
193
- if ( handler . HandleSocketEvents ( numEvents ) )
204
+ // Only enqueue a work item if the stage is NotScheduled.
205
+ // Otherwise there must be a work item already queued or another thread already handling parallelization.
206
+ if ( handler . HandleSocketEvents ( numEvents ) &&
207
+ Interlocked . Exchange (
208
+ ref _eventQueueProcessingStage ,
209
+ ( int ) EventQueueProcessingStage . Scheduled ) == ( int ) EventQueueProcessingStage . NotScheduled )
194
210
{
195
- ScheduleToProcessEvents ( ) ;
211
+ ThreadPool . UnsafeQueueUserWorkItem ( this , preferLocal : false ) ;
196
212
}
197
213
}
198
214
}
@@ -202,42 +218,73 @@ private void EventLoop()
202
218
}
203
219
}
204
220
205
- [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
206
- private void ScheduleToProcessEvents ( )
221
+ private void UpdateEventQueueProcessingStage ( bool isEventQueueEmpty )
207
222
{
208
- // Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid
209
- // over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item
210
- // to be scheduled for parallelizing processing of events.
211
- if ( Interlocked . CompareExchange ( ref _eventQueueProcessingRequested , 1 , 0 ) == 0 )
223
+ if ( ! isEventQueueEmpty )
212
224
{
213
- ThreadPool . UnsafeQueueUserWorkItem ( this , preferLocal : false ) ;
225
+ // There are more events to process, set stage to Scheduled and enqueue a work item.
226
+ _eventQueueProcessingStage = ( int ) EventQueueProcessingStage . Scheduled ;
227
+ }
228
+ else
229
+ {
230
+ // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
231
+ // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
232
+ // would not have scheduled a work item to process the work, so schedule one now.
233
+ int stageBeforeUpdate =
234
+ Interlocked . CompareExchange (
235
+ ref _eventQueueProcessingStage ,
236
+ ( int ) EventQueueProcessingStage . NotScheduled ,
237
+ ( int ) EventQueueProcessingStage . Determining ) ;
238
+ Debug . Assert ( stageBeforeUpdate != ( int ) EventQueueProcessingStage . NotScheduled ) ;
239
+ if ( stageBeforeUpdate == ( int ) EventQueueProcessingStage . Determining )
240
+ {
241
+ return ;
242
+ }
214
243
}
244
+
245
+ ThreadPool . UnsafeQueueUserWorkItem ( this , preferLocal : false ) ;
215
246
}
216
247
217
248
void IThreadPoolWorkItem . Execute ( )
218
249
{
219
- // Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer
220
- // threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an
221
- // enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is
222
- // the last thread processing events, it must see the event queued by the enqueuer.
223
- Interlocked . Exchange ( ref _eventQueueProcessingRequested , 0 ) ;
224
-
225
250
ConcurrentQueue < SocketIOEvent > eventQueue = _eventQueue ;
226
- if ( ! eventQueue . TryDequeue ( out SocketIOEvent ev ) )
251
+ SocketIOEvent ev ;
252
+ while ( true )
227
253
{
228
- return ;
229
- }
254
+ Debug . Assert ( _eventQueueProcessingStage == ( int ) EventQueueProcessingStage . Scheduled ) ;
230
255
231
- int startTimeMs = Environment . TickCount ;
256
+ // The change needs to be visible to other threads that may request a worker thread before a work item is attempted
257
+ // to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
258
+ // thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
259
+ // work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
260
+ // Scheduled, and try to dequeue again or request another thread.
261
+ _eventQueueProcessingStage = ( int ) EventQueueProcessingStage . Determining ;
262
+ Interlocked . MemoryBarrier ( ) ;
263
+
264
+ if ( eventQueue . TryDequeue ( out ev ) )
265
+ {
266
+ break ;
267
+ }
232
268
233
- // An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize
234
- // processing of events, before processing more events. Following this, it is the responsibility of the new work
235
- // item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if
236
- // the user callback as part of handling the event blocks for some reason that may have a dependency on other queued
237
- // socket events.
238
- ScheduleToProcessEvents ( ) ;
269
+ // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
270
+ // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
271
+ // would not have scheduled a work item to process the work, so try to dequeue a work item again.
272
+ int stageBeforeUpdate =
273
+ Interlocked . CompareExchange (
274
+ ref _eventQueueProcessingStage ,
275
+ ( int ) EventQueueProcessingStage . NotScheduled ,
276
+ ( int ) EventQueueProcessingStage . Determining ) ;
277
+ Debug . Assert ( stageBeforeUpdate != ( int ) EventQueueProcessingStage . NotScheduled ) ;
278
+ if ( stageBeforeUpdate == ( int ) EventQueueProcessingStage . Determining )
279
+ {
280
+ return ;
281
+ }
282
+ }
239
283
240
- while ( true )
284
+ UpdateEventQueueProcessingStage ( eventQueue . IsEmpty ) ;
285
+
286
+ int startTimeMs = Environment . TickCount ;
287
+ do
241
288
{
242
289
ev . Context . HandleEvents ( ev . Events ) ;
243
290
@@ -253,19 +300,7 @@ void IThreadPoolWorkItem.Execute()
253
300
// using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater
254
301
// impact on throughput compared to the threshold chosen below, though it is slight enough that it may not
255
302
// matter much. Higher thresholds didn't seem to have any noticeable effect.
256
- if ( Environment . TickCount - startTimeMs >= 15 )
257
- {
258
- break ;
259
- }
260
-
261
- if ( ! eventQueue . TryDequeue ( out ev ) )
262
- {
263
- return ;
264
- }
265
- }
266
-
267
- // The queue was not observed to be empty, schedule another work item before yielding the thread
268
- ScheduleToProcessEvents ( ) ;
303
+ } while ( Environment . TickCount - startTimeMs < 15 && eventQueue . TryDequeue ( out ev ) ) ;
269
304
}
270
305
271
306
private void FreeNativeResources ( )
0 commit comments