From 0b6cdb18133aafed67e9b06f5e519ac8c13ce129 Mon Sep 17 00:00:00 2001 From: Matt Connew Date: Thu, 21 Jul 2022 10:35:57 -0700 Subject: [PATCH 1/3] Fix VS debugging and workaround socket.ConnectAsync bug --- System.ServiceModel.sln | 12 ++---------- eng/testing/.runsettings | 2 +- eng/testing/runsettings.targets | 2 +- .../System/ServiceModel/Channels/SocketConnection.cs | 3 ++- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/System.ServiceModel.sln b/System.ServiceModel.sln index 628d8b155cd..5b31dac0e38 100644 --- a/System.ServiceModel.sln +++ b/System.ServiceModel.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.28728.38 +# Visual Studio Version 17 +VisualStudioVersion = 17.2.32630.192 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "System.ServiceModel.Primitives.Facade", "src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.Facade.csproj", "{1290FD2C-8148-43C3-817B-6D799EDD7E03}" EndProject @@ -255,14 +255,6 @@ Global {161C03B5-B606-4DC2-BC10-5935D1DADDD5}.Debug|Any CPU.Build.0 = Debug|Any CPU {161C03B5-B606-4DC2-BC10-5935D1DADDD5}.Release|Any CPU.ActiveCfg = Release|Any CPU {161C03B5-B606-4DC2-BC10-5935D1DADDD5}.Release|Any CPU.Build.0 = Release|Any CPU - {EAB2959D-0A16-4F60-A453-765491E1C069}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {EAB2959D-0A16-4F60-A453-765491E1C069}.Debug|Any CPU.Build.0 = Debug|Any CPU - {EAB2959D-0A16-4F60-A453-765491E1C069}.Release|Any CPU.ActiveCfg = Release|Any CPU - {EAB2959D-0A16-4F60-A453-765491E1C069}.Release|Any CPU.Build.0 = Release|Any CPU - {AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Debug|Any CPU.Build.0 = Debug|Any CPU - {AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Release|Any CPU.ActiveCfg = Release|Any CPU - {AECCDD96-7C28-4302-9F34-C9ADEE21C7E6}.Release|Any CPU.Build.0 = Release|Any CPU {38F22923-DF54-4BD5-A40D-D4CA05DE3D17}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {38F22923-DF54-4BD5-A40D-D4CA05DE3D17}.Debug|Any CPU.Build.0 = Debug|Any CPU {38F22923-DF54-4BD5-A40D-D4CA05DE3D17}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/eng/testing/.runsettings b/eng/testing/.runsettings index ae33caf1711..176ef5b049f 100644 --- a/eng/testing/.runsettings +++ b/eng/testing/.runsettings @@ -16,7 +16,7 @@ $$TESTCASEFILTER$$ $$DOTNETHOSTPATH$$ - FrameworkCore10 + FrameworkCore10 diff --git a/eng/testing/runsettings.targets b/eng/testing/runsettings.targets index 66ccef4dd32..9fc0295f8ea 100644 --- a/eng/testing/runsettings.targets +++ b/eng/testing/runsettings.targets @@ -39,7 +39,7 @@ .Replace('$$DISABLEPARALLELIZATION$$', '$([MSBuild]::ValueOrDefault('$(TestDisableParallelization)', 'false'))') .Replace('$$DISABLEAPPDOMAIN$$', '$([MSBuild]::ValueOrDefault('$(TestDisableAppDomain)', 'false'))') .Replace('$$TESTCASEFILTER$$', '$(_testFilter)') - .Replace('$$DOTNETHOSTPATH$$', '$(TestHostRootPath)$([System.IO.Path]::GetFileName('$(DotNetTool)'))')) + .Replace('$$DOTNETHOSTPATH$$', '$(DotNetTool)')) CreateConnectionAsync(IPAddress address, int por { AddressFamily addressFamily = address.AddressFamily; socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); - await socket.ConnectAsync(new IPEndPoint(address, port)); + await Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, new IPEndPoint(address, port), null); + //await socket.ConnectAsync(new IPEndPoint(address, port)); return new SocketConnection(socket, _connectionBufferPool, false); } catch From b98c25f5b0d10e632fdc301ca341d8c148cd440b Mon Sep 17 00:00:00 2001 From: Matt Connew Date: Fri, 22 Jul 2022 14:41:28 -0700 Subject: [PATCH 2/3] Fix IOThreadScheduler not being used after first await --- eng/testing/.runsettings | 2 +- .../Internals/System/Runtime/ActionItem.cs | 19 +------ .../System/Runtime/IOThreadScheduler.cs | 55 +++++++++++++++++++ .../ServiceModelSynchronizationContext.cs | 5 +- .../Internals/System/Runtime/TaskHelpers.cs | 36 ++++++++++-- .../Channels/CommunicationObject.cs | 32 ++++++++++- .../ServiceModel/Channels/SocketConnection.cs | 3 +- 7 files changed, 121 insertions(+), 31 deletions(-) diff --git a/eng/testing/.runsettings b/eng/testing/.runsettings index 176ef5b049f..ae33caf1711 100644 --- a/eng/testing/.runsettings +++ b/eng/testing/.runsettings @@ -16,7 +16,7 @@ $$TESTCASEFILTER$$ $$DOTNETHOSTPATH$$ - FrameworkCore10 + FrameworkCore10 diff --git a/src/System.Private.ServiceModel/src/Internals/System/Runtime/ActionItem.cs b/src/System.Private.ServiceModel/src/Internals/System/Runtime/ActionItem.cs index d2b6181a7ff..641114fead5 100644 --- a/src/System.Private.ServiceModel/src/Internals/System/Runtime/ActionItem.cs +++ b/src/System.Private.ServiceModel/src/Internals/System/Runtime/ActionItem.cs @@ -83,7 +83,7 @@ private static void ScheduleCallback(Func callback, object state) // The trick here is using CallbackHelper.IOTaskSchedule as the TaskScheduler. This is a special TaskScheduler created from a sync context // which posts action's to the IOThreadScheduler. So instead of directly posting a Task to the IOThreadScheduler, we let the TaskScheduler // break up the Task into individual Action delegates and post them for us. - Task.Factory.StartNew(callback, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, CallbackHelper.IOTaskScheduler); + Task.Factory.StartNew(callback, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, IOThreadScheduler.IOTaskScheduler); } private void ScheduleCallback(Action callback) @@ -98,26 +98,9 @@ private void ScheduleCallback(Func callback) internal static class CallbackHelper { - private static TaskScheduler s_IOTaskScheduler; private static Action s_invokeCallback; private static Func s_invokeAsyncCallback; - public static TaskScheduler IOTaskScheduler - { - get - { - if (s_IOTaskScheduler == null) - { - using(TaskHelpers.RunTaskContinuationsOnOurThreads()) - { - s_IOTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(); - } - } - - return s_IOTaskScheduler; - } - } - public static Action InvokeCallbackAction { get diff --git a/src/System.Private.ServiceModel/src/Internals/System/Runtime/IOThreadScheduler.cs b/src/System.Private.ServiceModel/src/Internals/System/Runtime/IOThreadScheduler.cs index 6b2a1ce8294..831ea75a64c 100644 --- a/src/System.Private.ServiceModel/src/Internals/System/Runtime/IOThreadScheduler.cs +++ b/src/System.Private.ServiceModel/src/Internals/System/Runtime/IOThreadScheduler.cs @@ -2,6 +2,7 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Diagnostics; using System.Diagnostics.Contracts; using System.Runtime.InteropServices; using System.Threading; @@ -51,8 +52,11 @@ public static bool IsComplete(int gate) } private static IOThreadScheduler s_current = new IOThreadScheduler(32); + private static SynchronizationContext s_syncContext = new IOThreadSchedulerSynchronizationContext(); + private static TaskScheduler s_IOTaskScheduler; private readonly ScheduledOverlapped _overlapped; private readonly Slot[] _slots; + private static ThreadLocal s_isIoThread = new ThreadLocal(); // This field holds both the head (HiWord) and tail (LoWord) indicies into the slot array. This limits each // value to 64k. In order to be able to distinguish wrapping the slot array (allowed) from wrapping the @@ -78,6 +82,22 @@ private IOThreadScheduler(int capacity) _overlapped = new ScheduledOverlapped(); } + public static TaskScheduler IOTaskScheduler + { + get + { + if (s_IOTaskScheduler == null) + { + var savedCtx = SynchronizationContext.Current; + SynchronizationContext.SetSynchronizationContext(s_syncContext); + s_IOTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(); + SynchronizationContext.SetSynchronizationContext(savedCtx); + } + + return s_IOTaskScheduler; + } + } + public static void ScheduleCallbackNoFlow(Action callback, object state) { if (callback == null) @@ -209,6 +229,8 @@ private int SlotMask } } + public static bool IsRunningOnIOThread => s_isIoThread.IsValueCreated && s_isIoThread.Value; + ~IOThreadScheduler() { // If the AppDomain is shutting down, we may still have pending ops. The AppDomain shutdown will clean @@ -435,6 +457,32 @@ private void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeO } private void Callback() + { + try + { + InitThreadDebugData(); + CallbackCore(); + } + finally + { + ClearThreadDebugData(); + } + } + + [Conditional("DEBUG")] + private static void InitThreadDebugData() + { + s_isIoThread.Value = true; + Thread.CurrentThread.Name = "IOThreadScheduler.IOCallback"; + } + + [Conditional("DEBUG")] + private static void ClearThreadDebugData() + { + s_isIoThread.Value = false; + } + + private void CallbackCore() { // Unhook the IOThreadScheduler ASAP to prevent it from leaking. IOThreadScheduler iots = _scheduler; @@ -505,5 +553,12 @@ public void Cleanup() } } + private class IOThreadSchedulerSynchronizationContext : SynchronizationContext + { + public override void Post(SendOrPostCallback d, object state) + { + ScheduleCallbackNoFlow((s) => d(s), state); + } + } } } diff --git a/src/System.Private.ServiceModel/src/Internals/System/Runtime/ServiceModelSynchronizationContext.cs b/src/System.Private.ServiceModel/src/Internals/System/Runtime/ServiceModelSynchronizationContext.cs index e33014762a7..1c0affb6cbe 100644 --- a/src/System.Private.ServiceModel/src/Internals/System/Runtime/ServiceModelSynchronizationContext.cs +++ b/src/System.Private.ServiceModel/src/Internals/System/Runtime/ServiceModelSynchronizationContext.cs @@ -3,16 +3,17 @@ // See the LICENSE file in the project root for more information. using System.Threading; +using System.Threading.Tasks; namespace System.Runtime { - public class ServiceModelSynchronizationContext : SynchronizationContext + internal class ServiceModelSynchronizationContext : SynchronizationContext { public static ServiceModelSynchronizationContext Instance = new ServiceModelSynchronizationContext(); public override void Post(SendOrPostCallback d, object state) { - IOThreadScheduler.ScheduleCallbackNoFlow((s) => d(s), state); + Task.Factory.StartNew((s) => d(s), state, default, TaskCreationOptions.RunContinuationsAsynchronously, IOThreadScheduler.IOTaskScheduler); } } } diff --git a/src/System.Private.ServiceModel/src/Internals/System/Runtime/TaskHelpers.cs b/src/System.Private.ServiceModel/src/Internals/System/Runtime/TaskHelpers.cs index c5b049c5b5e..b6e98232343 100644 --- a/src/System.Private.ServiceModel/src/Internals/System/Runtime/TaskHelpers.cs +++ b/src/System.Private.ServiceModel/src/Internals/System/Runtime/TaskHelpers.cs @@ -249,6 +249,9 @@ public static async Task AwaitWithTimeout(this Task task, TimeSpan timeout // then use the NoSpin variant. public static void WaitForCompletion(this Task task) { + Fx.Assert(task.IsCompleted || !IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems"); + // Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler + // dequeuing loop. task.GetAwaiter().GetResult(); } @@ -260,6 +263,9 @@ public static void WaitForCompletionNoSpin(this Task task) { if (!task.IsCompleted) { + Fx.Assert(!IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems"); + // Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler + // dequeuing loop. ((IAsyncResult)task).AsyncWaitHandle.WaitOne(); } @@ -269,6 +275,9 @@ public static void WaitForCompletionNoSpin(this Task task) public static TResult WaitForCompletion(this Task task) { + Fx.Assert(task.IsCompleted || !IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems"); + // Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler + // dequeuing loop. return task.GetAwaiter().GetResult(); } @@ -276,6 +285,9 @@ public static TResult WaitForCompletionNoSpin(this Task task) { if (!task.IsCompleted) { + Fx.Assert(!IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems"); + // Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler + // dequeuing loop. ((IAsyncResult)task).AsyncWaitHandle.WaitOne(); } @@ -293,6 +305,9 @@ public static bool WaitForCompletionNoSpin(this Task task, TimeSpan timeout) bool completed = true; if (!task.IsCompleted) { + Fx.Assert(!IOThreadScheduler.IsRunningOnIOThread, "Waiting on an IO Thread might cause problems"); + // Waiting on an IO Thread can cause performance problems as we might block the IOThreadScheduler + // dequeuing loop. completed = ((IAsyncResult)task).AsyncWaitHandle.WaitOne(timeout); } @@ -364,16 +379,25 @@ public static IDisposable RunTaskContinuationsOnOurThreads() return new SyncContextScope(); } - // Calls the given Action asynchronously. + // Calls the given Action asynchronously on the ThreadPool. public static async Task CallActionAsync(Action action, TArg argument) { - using (var scope = TaskHelpers.RunTaskContinuationsOnOurThreads()) + // Make sure any async tasks started from the action have their continuation + // execute on the IOThreadScheduler, but make sure the action itself is running + // on the thread pool. + if (!Thread.CurrentThread.IsThreadPoolThread) { - if (scope != null) // No need to change threads if already off of thread pool - { - await Task.Yield(); // Move synchronous method off of thread pool - } + // Switch to a thread pool thread to run passed action + SynchronizationContext.SetSynchronizationContext(null); + await Task.Yield(); + } + // Now we're running on the ThreadPool, we reset the SynchronizationContext to + // our sync context which posts to the IOThreadScheduler. We're not hopping threads + // so any synchronous blocking will occur on the current thread pool thread. + Fx.Assert(Thread.CurrentThread.IsThreadPoolThread, "We should be running on the thread pool"); + using (var scope = RunTaskContinuationsOnOurThreads()) + { action(argument); } } diff --git a/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/CommunicationObject.cs b/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/CommunicationObject.cs index 308b73dfb5d..383d01c15fd 100644 --- a/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/CommunicationObject.cs +++ b/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/CommunicationObject.cs @@ -315,7 +315,7 @@ private async Task OnCloseAsyncInternal(TimeSpan timeout) else { // This type is an external type that cannot override OnCloseAsync. - // If this is a synchronous close, invoke the synchronous OnClose) + // If this is a synchronous close, invoke the synchronous OnClose. if (_isSynchronousClose) { await TaskHelpers.CallActionAsync(OnClose, timeout); @@ -563,7 +563,7 @@ private async Task OnOpenAsyncInternal(TimeSpan timeout) else { // This type is an external type that cannot override OnOpenAsync. - // If this is a synchronous open, invoke the synchronous OnOpen) + // If this is a synchronous open, invoke the synchronous OnOpen if (_isSynchronousOpen) { await TaskHelpers.CallActionAsync(OnOpen, timeout); @@ -1014,10 +1014,24 @@ internal protected virtual Task OnOpenAsync(TimeSpan timeout) // It is used to propagate the use of either the synchronous or asynchronous methods internal Task OpenOtherAsync(ICommunicationObject other, TimeSpan timeout) { + // If the other object supports async open/close, we know it's an + // internal implementation so we can continue the open asynchronously. We need + // to propagate _isSynchronousOpen in case the next inner channel isn't an internal + // implementation. + if (other is CommunicationObject communicationObject && communicationObject.SupportsAsyncOpenClose) + { + communicationObject._isSynchronousOpen = _isSynchronousOpen; + return ((IAsyncCommunicationObject)communicationObject).OpenAsync(timeout); + } + + // Other object isn't an internal implementation so we need to match calling the + // sync/async pattern of the Open call that was initially made. // If the current object is being opened synchronously, use the synchronous // open path for the other object. if (_isSynchronousOpen) { + // We need to make sure the synchronous open which might block a thread happens on the + // thread pool so as not to block the IOThreadScheduler thread. return TaskHelpers.CallActionAsync(other.Open, timeout); } else @@ -1030,10 +1044,24 @@ internal Task OpenOtherAsync(ICommunicationObject other, TimeSpan timeout) // It is used to propagate the use of either the synchronous or asynchronous methods internal Task CloseOtherAsync(ICommunicationObject other, TimeSpan timeout) { + // If the other object supports async open/close, we know it's an + // internal implementation so we can continue the close asynchronously. We need + // to propagate _isSynchronousClose in case the next inner channel isn't an internal + // implementation. + if (other is CommunicationObject communicationObject && communicationObject.SupportsAsyncOpenClose) + { + communicationObject._isSynchronousClose = _isSynchronousClose; + return ((IAsyncCommunicationObject)communicationObject).CloseAsync(timeout); + } + + // Other object isn't an internal implementation so we need to match calling the + // sync/async pattern of the Close call that was initially made. // If the current object is being closed synchronously, use the synchronous // close path for the other object. if (_isSynchronousClose) { + // We need to make sure the synchronous close which might block a thread happens on the + // thread pool so as not to block the IOThreadScheduler thread. return TaskHelpers.CallActionAsync(other.Close, timeout); } else diff --git a/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/SocketConnection.cs b/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/SocketConnection.cs index ce26360eccc..e699f2f8398 100644 --- a/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/SocketConnection.cs +++ b/src/System.Private.ServiceModel/src/System/ServiceModel/Channels/SocketConnection.cs @@ -1414,8 +1414,7 @@ private async Task CreateConnectionAsync(IPAddress address, int por { AddressFamily addressFamily = address.AddressFamily; socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); - await Task.Factory.FromAsync(socket.BeginConnect, socket.EndConnect, new IPEndPoint(address, port), null); - //await socket.ConnectAsync(new IPEndPoint(address, port)); + await socket.ConnectAsync(new IPEndPoint(address, port)); return new SocketConnection(socket, _connectionBufferPool, false); } catch From 806689c2faf3b376bc030e653112d5c1ff336f09 Mon Sep 17 00:00:00 2001 From: Matt Connew Date: Mon, 25 Jul 2022 13:41:12 -0700 Subject: [PATCH 3/3] Update test helix queues --- eng/SendToHelix.proj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eng/SendToHelix.proj b/eng/SendToHelix.proj index 329a320cb59..a7aa7ca3a04 100644 --- a/eng/SendToHelix.proj +++ b/eng/SendToHelix.proj @@ -20,12 +20,12 @@ - - + + - +