Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve io thread scheduler #4862

Merged
merged 3 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions System.ServiceModel.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.28728.38
# Visual Studio Version 17
VisualStudioVersion = 17.2.32630.192
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.ServiceModel.Primitives.Facade", "src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.Facade.csproj", "{1290FD2C-8148-43C3-817B-6D799EDD7E03}"
EndProject
Expand Down Expand Up @@ -255,14 +255,6 @@ Global
{161C03B5-B606-4DC2-BC10-5935D1DADDD5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{161C03B5-B606-4DC2-BC10-5935D1DADDD5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{161C03B5-B606-4DC2-BC10-5935D1DADDD5}.Release|Any CPU.Build.0 = Release|Any CPU
{EAB2959D-0A16-4F60-A453-765491E1C069}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EAB2959D-0A16-4F60-A453-765491E1C069}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EAB2959D-0A16-4F60-A453-765491E1C069}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EAB2959D-0A16-4F60-A453-765491E1C069}.Release|Any CPU.Build.0 = Release|Any CPU
{AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Release|Any CPU.Build.0 = Release|Any CPU
{38F22923-DF54-4BD5-A40D-D4CA05DE3D17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{38F22923-DF54-4BD5-A40D-D4CA05DE3D17}.Debug|Any CPU.Build.0 = Debug|Any CPU
{38F22923-DF54-4BD5-A40D-D4CA05DE3D17}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down
6 changes: 3 additions & 3 deletions eng/SendToHelix.proj
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
</ItemGroup>

<ItemGroup Condition="'$(TestJob)' == 'Windows' AND '$(RunAsPublic)' == 'true'" >
<HelixTargetQueue Include="Windows.10.Amd64.Client21H1.Open" />
<HelixTargetQueue Include="Windows.10.Amd64.Server20H2.Open" />
<HelixTargetQueue Include="windows.11.amd64.client.open" />
<HelixTargetQueue Include="windows.amd64.server2022.open" />
</ItemGroup>

<ItemGroup Condition="'$(TestJob)' == 'Windows' AND '$(RunAsInternal)'" >
<HelixTargetQueue Include="Windows.10.Amd64" />
<HelixTargetQueue Include="windows.11.amd64" />
<HelixTargetQueue Include="(Debian.11.Amd64)ubuntu.2004.amd64@mcr.microsoft.com/dotnet-buildtools/prereqs:debian-11-helix-amd64-20220511124750-0ece9b3" />
<HelixTargetQueue Include="RedHat.7.Amd64" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion eng/testing/runsettings.targets
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
.Replace('$$DISABLEPARALLELIZATION$$', '$([MSBuild]::ValueOrDefault('$(TestDisableParallelization)', 'false'))')
.Replace('$$DISABLEAPPDOMAIN$$', '$([MSBuild]::ValueOrDefault('$(TestDisableAppDomain)', 'false'))')
.Replace('$$TESTCASEFILTER$$', '$(_testFilter)')
.Replace('$$DOTNETHOSTPATH$$', '$(TestHostRootPath)$([System.IO.Path]::GetFileName('$(DotNetTool)'))'))</RunSettingsFileContent>
.Replace('$$DOTNETHOSTPATH$$', '$(DotNetTool)'))</RunSettingsFileContent>
</PropertyGroup>

<WriteLinesToFile File="$(RunSettingsOutputFilePath)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private static void ScheduleCallback(Func<object, Task> callback, object state)
// The trick here is using CallbackHelper.IOTaskSchedule as the TaskScheduler. This is a special TaskScheduler created from a sync context
// which posts action's to the IOThreadScheduler. So instead of directly posting a Task to the IOThreadScheduler, we let the TaskScheduler
// break up the Task into individual Action<object> delegates and post them for us.
Task<Task>.Factory.StartNew(callback, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, CallbackHelper.IOTaskScheduler);
Task<Task>.Factory.StartNew(callback, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, IOThreadScheduler.IOTaskScheduler);
}

private void ScheduleCallback(Action<object> callback)
Expand All @@ -98,26 +98,9 @@ private void ScheduleCallback(Func<object, Task> callback)

internal static class CallbackHelper
{
private static TaskScheduler s_IOTaskScheduler;
private static Action<object> s_invokeCallback;
private static Func<object, Task> s_invokeAsyncCallback;

public static TaskScheduler IOTaskScheduler
{
get
{
if (s_IOTaskScheduler == null)
{
using(TaskHelpers.RunTaskContinuationsOnOurThreads())
{
s_IOTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
}
}

return s_IOTaskScheduler;
}
}

public static Action<object> InvokeCallbackAction
{
get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -51,8 +52,11 @@ public static bool IsComplete(int gate)
}

private static IOThreadScheduler s_current = new IOThreadScheduler(32);
private static SynchronizationContext s_syncContext = new IOThreadSchedulerSynchronizationContext();
private static TaskScheduler s_IOTaskScheduler;
private readonly ScheduledOverlapped _overlapped;
private readonly Slot[] _slots;
private static ThreadLocal<bool> s_isIoThread = new ThreadLocal<bool>();

// This field holds both the head (HiWord) and tail (LoWord) indicies into the slot array. This limits each
// value to 64k. In order to be able to distinguish wrapping the slot array (allowed) from wrapping the
Expand All @@ -78,6 +82,22 @@ private IOThreadScheduler(int capacity)
_overlapped = new ScheduledOverlapped();
}

public static TaskScheduler IOTaskScheduler
{
get
{
if (s_IOTaskScheduler == null)
{
var savedCtx = SynchronizationContext.Current;
SynchronizationContext.SetSynchronizationContext(s_syncContext);
s_IOTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(savedCtx);
}

return s_IOTaskScheduler;
}
}

public static void ScheduleCallbackNoFlow(Action<object> callback, object state)
{
if (callback == null)
Expand Down Expand Up @@ -209,6 +229,8 @@ private int SlotMask
}
}

public static bool IsRunningOnIOThread => s_isIoThread.IsValueCreated && s_isIoThread.Value;

~IOThreadScheduler()
{
// If the AppDomain is shutting down, we may still have pending ops. The AppDomain shutdown will clean
Expand Down Expand Up @@ -435,6 +457,32 @@ private void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeO
}

private void Callback()
{
try
{
InitThreadDebugData();
CallbackCore();
}
finally
{
ClearThreadDebugData();
}
}

[Conditional("DEBUG")]
private static void InitThreadDebugData()
{
s_isIoThread.Value = true;
Thread.CurrentThread.Name = "IOThreadScheduler.IOCallback";
}

[Conditional("DEBUG")]
private static void ClearThreadDebugData()
{
s_isIoThread.Value = false;
}

private void CallbackCore()
{
// Unhook the IOThreadScheduler ASAP to prevent it from leaking.
IOThreadScheduler iots = _scheduler;
Expand Down Expand Up @@ -505,5 +553,12 @@ public void Cleanup()
}
}

private class IOThreadSchedulerSynchronizationContext : SynchronizationContext
{
public override void Post(SendOrPostCallback d, object state)
{
ScheduleCallbackNoFlow((s) => d(s), state);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
// See the LICENSE file in the project root for more information.

using System.Threading;
using System.Threading.Tasks;

namespace System.Runtime
{
public class ServiceModelSynchronizationContext : SynchronizationContext
internal class ServiceModelSynchronizationContext : SynchronizationContext
{
public static ServiceModelSynchronizationContext Instance = new ServiceModelSynchronizationContext();

public override void Post(SendOrPostCallback d, object state)
{
IOThreadScheduler.ScheduleCallbackNoFlow((s) => d(s), state);
Task.Factory.StartNew((s) => d(s), state, default, TaskCreationOptions.RunContinuationsAsynchronously, IOThreadScheduler.IOTaskScheduler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ public static async Task<bool> AwaitWithTimeout(this Task task, TimeSpan timeout
// then use the NoSpin variant.
public static void WaitForCompletion(this Task task)
{
Fx.Assert(task.IsCompleted || !IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems");
// Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler
// dequeuing loop.
task.GetAwaiter().GetResult();
}

Expand All @@ -260,6 +263,9 @@ public static void WaitForCompletionNoSpin(this Task task)
{
if (!task.IsCompleted)
{
Fx.Assert(!IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems");
// Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler
// dequeuing loop.
((IAsyncResult)task).AsyncWaitHandle.WaitOne();
}

Expand All @@ -269,13 +275,19 @@ public static void WaitForCompletionNoSpin(this Task task)

public static TResult WaitForCompletion<TResult>(this Task<TResult> task)
{
Fx.Assert(task.IsCompleted || !IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems");
// Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler
// dequeuing loop.
return task.GetAwaiter().GetResult();
}

public static TResult WaitForCompletionNoSpin<TResult>(this Task<TResult> task)
{
if (!task.IsCompleted)
{
Fx.Assert(!IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems");
// Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler
// dequeuing loop.
((IAsyncResult)task).AsyncWaitHandle.WaitOne();
}

Expand All @@ -293,6 +305,9 @@ public static bool WaitForCompletionNoSpin(this Task task, TimeSpan timeout)
bool completed = true;
if (!task.IsCompleted)
{
Fx.Assert(!IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems");
// Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler
// dequeuing loop.
completed = ((IAsyncResult)task).AsyncWaitHandle.WaitOne(timeout);
}

Expand Down Expand Up @@ -364,16 +379,25 @@ public static IDisposable RunTaskContinuationsOnOurThreads()
return new SyncContextScope();
}

// Calls the given Action asynchronously.
// Calls the given Action asynchronously on the ThreadPool.
public static async Task CallActionAsync<TArg>(Action<TArg> action, TArg argument)
{
using (var scope = TaskHelpers.RunTaskContinuationsOnOurThreads())
// Make sure any async tasks started from the action have their continuation
// execute on the IOThreadScheduler, but make sure the action itself is running
// on the thread pool.
if (!Thread.CurrentThread.IsThreadPoolThread)
{
if (scope != null) // No need to change threads if already off of thread pool
{
await Task.Yield(); // Move synchronous method off of thread pool
}
// Switch to a thread pool thread to run passed action
SynchronizationContext.SetSynchronizationContext(null);
await Task.Yield();
}

// Now we're running on the ThreadPool, we reset the SynchronizationContext to
// our sync context which posts to the IOThreadScheduler. We're not hopping threads
// so any synchronous blocking will occur on the current thread pool thread.
Fx.Assert(Thread.CurrentThread.IsThreadPoolThread, "We should be running on the thread pool");
using (var scope = RunTaskContinuationsOnOurThreads())
{
action(argument);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private async Task OnCloseAsyncInternal(TimeSpan timeout)
else
{
// This type is an external type that cannot override OnCloseAsync.
// If this is a synchronous close, invoke the synchronous OnClose)
// If this is a synchronous close, invoke the synchronous OnClose.
if (_isSynchronousClose)
{
await TaskHelpers.CallActionAsync(OnClose, timeout);
Expand Down Expand Up @@ -563,7 +563,7 @@ private async Task OnOpenAsyncInternal(TimeSpan timeout)
else
{
// This type is an external type that cannot override OnOpenAsync.
// If this is a synchronous open, invoke the synchronous OnOpen)
// If this is a synchronous open, invoke the synchronous OnOpen
if (_isSynchronousOpen)
{
await TaskHelpers.CallActionAsync(OnOpen, timeout);
Expand Down Expand Up @@ -1014,10 +1014,24 @@ internal protected virtual Task OnOpenAsync(TimeSpan timeout)
// It is used to propagate the use of either the synchronous or asynchronous methods
internal Task OpenOtherAsync(ICommunicationObject other, TimeSpan timeout)
{
// If the other object supports async open/close, we know it's an
// internal implementation so we can continue the open asynchronously. We need
// to propagate _isSynchronousOpen in case the next inner channel isn't an internal
// implementation.
if (other is CommunicationObject communicationObject && communicationObject.SupportsAsyncOpenClose)
{
communicationObject._isSynchronousOpen = _isSynchronousOpen;
return ((IAsyncCommunicationObject)communicationObject).OpenAsync(timeout);
}

// Other object isn't an internal implementation so we need to match calling the
// sync/async pattern of the Open call that was initially made.
// If the current object is being opened synchronously, use the synchronous
// open path for the other object.
if (_isSynchronousOpen)
{
// We need to make sure the synchronous open which might block a thread happens on the
// thread pool so as not to block the IOThreadScheduler thread.
return TaskHelpers.CallActionAsync(other.Open, timeout);
}
else
Expand All @@ -1030,10 +1044,24 @@ internal Task OpenOtherAsync(ICommunicationObject other, TimeSpan timeout)
// It is used to propagate the use of either the synchronous or asynchronous methods
internal Task CloseOtherAsync(ICommunicationObject other, TimeSpan timeout)
{
// If the other object supports async open/close, we know it's an
// internal implementation so we can continue the close asynchronously. We need
// to propagate _isSynchronousClose in case the next inner channel isn't an internal
// implementation.
if (other is CommunicationObject communicationObject && communicationObject.SupportsAsyncOpenClose)
{
communicationObject._isSynchronousClose = _isSynchronousClose;
return ((IAsyncCommunicationObject)communicationObject).CloseAsync(timeout);
}

// Other object isn't an internal implementation so we need to match calling the
// sync/async pattern of the Close call that was initially made.
// If the current object is being closed synchronously, use the synchronous
// close path for the other object.
if (_isSynchronousClose)
{
// We need to make sure the synchronous close which might block a thread happens on the
// thread pool so as not to block the IOThreadScheduler thread.
return TaskHelpers.CallActionAsync(other.Close, timeout);
}
else
Expand Down