Skip to content

Commit

Permalink
Use non-generic TaskCompletionSource in SignalR and Kestrel (#22925)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored Jun 16, 2020
1 parent a67c217 commit 49aecc3
Show file tree
Hide file tree
Showing 71 changed files with 508 additions and 526 deletions.
2 changes: 1 addition & 1 deletion eng/helix/content/RunTests/ProcessUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static async Task<ProcessResult> RunAsync(
string filename,
string arguments,
string? workingDirectory = null,
string dumpDirectoryPath = null,
string? dumpDirectoryPath = null,
bool throwOnError = true,
IDictionary<string, string?>? environmentVariables = null,
Action<string>? outputDataReceived = null,
Expand Down
6 changes: 3 additions & 3 deletions src/Hosting/Hosting/src/WebHostExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ private static async Task WaitForTokenShutdownAsync(this IWebHost host, Cancella
},
applicationLifetime);

var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var waitForStop = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
applicationLifetime.ApplicationStopping.Register(obj =>
{
var tcs = (TaskCompletionSource<object>)obj;
tcs.TrySetResult(null);
var tcs = (TaskCompletionSource)obj;
tcs.TrySetResult();
}, waitForStop);

await waitForStop.Task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ public override async Task<DeploymentResult> DeployAsync()
AddEnvironmentVariablesToProcess(startInfo, DeploymentParameters.EnvironmentVariables);

Uri actualUrl = null;
var started = new TaskCompletionSource<object>();
var started = new TaskCompletionSource();

HostProcess = new Process() { StartInfo = startInfo };
HostProcess.EnableRaisingEvents = true;
HostProcess.OutputDataReceived += (sender, dataArgs) =>
{
if (string.Equals(dataArgs.Data, ApplicationStartedMessage))
{
started.TrySetResult(null);
started.TrySetResult();
}
else if (!string.IsNullOrEmpty(dataArgs.Data))
{
Expand Down
4 changes: 2 additions & 2 deletions src/Servers/Kestrel/Core/src/Internal/ConnectionDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class ConnectionDispatcher<T> where T : BaseConnectionContext
private readonly ServiceContext _serviceContext;
private readonly Func<T, Task> _connectionDelegate;
private readonly TransportConnectionManager _transportConnectionManager;
private readonly TaskCompletionSource<object> _acceptLoopTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _acceptLoopTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

public ConnectionDispatcher(ServiceContext serviceContext, Func<T, Task> connectionDelegate, TransportConnectionManager transportConnectionManager)
{
Expand Down Expand Up @@ -73,7 +73,7 @@ async Task AcceptConnectionsAsync()
}
finally
{
_acceptLoopTcs.TrySetResult(null);
_acceptLoopTcs.TrySetResult();
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ internal abstract class Http3Stream : HttpProtocol, IHttpHeadersHandler, IThread

private readonly Http3Connection _http3Connection;
private bool _receivedHeaders;
private TaskCompletionSource<object> _appCompleted;
private TaskCompletionSource _appCompleted;

public Pipe RequestBodyPipe { get; }

public Http3Stream(Http3Connection http3Connection, Http3StreamContext context)
public Http3Stream(Http3Connection http3Connection, Http3StreamContext context)
{
Initialize(context);

Expand Down Expand Up @@ -307,7 +307,7 @@ protected override void OnRequestProcessingEnded()
{
Debug.Assert(_appCompleted != null);

_appCompleted.SetResult(new object());
_appCompleted.SetResult();
}

private bool TryClose()
Expand Down Expand Up @@ -457,7 +457,7 @@ private Task ProcessHeadersFrameAsync<TContext>(IHttpApplication<TContext> appli
_receivedHeaders = true;
InputRemaining = HttpRequestHeaders.ContentLength;

_appCompleted = new TaskCompletionSource<object>();
_appCompleted = new TaskCompletionSource();

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal abstract class KestrelConnection : IConnectionHeartbeatFeature, IConnec
private bool _completed;

private readonly CancellationTokenSource _connectionClosingCts = new CancellationTokenSource();
private readonly TaskCompletionSource<object> _completionTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _completionTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
protected readonly long _id;
protected readonly ServiceContext _serviceContext;
protected readonly TransportConnectionManager _transportConnectionManager;
Expand Down Expand Up @@ -166,7 +166,7 @@ public void RequestClose()

public void Complete()
{
_completionTcs.TrySetResult(null);
_completionTcs.TrySetResult();

_connectionClosingCts.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ private static Task CancellationTokenAsTask(CancellationToken token)
return Task.CompletedTask;
}

var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
token.Register(() => tcs.SetResult(null));
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
token.Register(() => tcs.SetResult());
return tcs.Task;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/Servers/Kestrel/Core/src/KestrelServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class KestrelServer : IServer
private bool _hasStarted;
private int _stopping;
private readonly CancellationTokenSource _stopCts = new CancellationTokenSource();
private readonly TaskCompletionSource<object> _stoppedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _stoppedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

private IDisposable _configChangedRegistration;

Expand Down Expand Up @@ -238,7 +238,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
_bindSemaphore.Release();
}

_stoppedTcs.TrySetResult(null);
_stoppedTcs.TrySetResult();
}

// Ungraceful shutdown
Expand Down
10 changes: 5 additions & 5 deletions src/Servers/Kestrel/Core/test/ConcurrentPipeWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public async Task QueuesIfFlushIsNotAwaited()
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);

mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
mockPipeWriter.FlushTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
pipeWriterFlushTcsArray[0].SetResult(default);

await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
Expand All @@ -141,7 +141,7 @@ public async Task QueuesIfFlushIsNotAwaited()
Assert.False(flushTask0.IsCompleted);
Assert.False(flushTask1.IsCompleted);

mockPipeWriter.FlushTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
mockPipeWriter.FlushTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
pipeWriterFlushTcsArray[1].SetResult(default);

await mockPipeWriter.FlushTcs.Task.DefaultTimeout();
Expand Down Expand Up @@ -235,7 +235,7 @@ public async Task KeepsQueueIfInnerFlushFinishesBetweenGetMemoryAndAdvance()
Assert.Equal(3, mockPipeWriter.AdvanceCallCount);
Assert.Equal(2, mockPipeWriter.FlushCallCount);
Assert.False(flushTask1.IsCompleted);

pipeWriterFlushTcsArray[1].SetResult(default);

await flushTask1.DefaultTimeout();
Expand Down Expand Up @@ -426,14 +426,14 @@ public MockPipeWriter(TaskCompletionSource<FlushResult>[] flushResults)
public int FlushCallCount { get; set; }
public int CancelPendingFlushCallCount { get; set; }

public TaskCompletionSource<object> FlushTcs { get; set; }
public TaskCompletionSource FlushTcs { get; set; }

public Exception CompleteException { get; set; }

public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
FlushCallCount++;
FlushTcs?.TrySetResult(null);
FlushTcs?.TrySetResult();
return new ValueTask<FlushResult>(_flushResults[FlushCallCount - 1].Task);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Servers/Kestrel/Core/test/ConnectionDispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public async Task OnConnectionCreatesLogScopeWithConnectionId()
{
var serviceContext = new TestServiceContext();
// This needs to run inline
var tcs = new TaskCompletionSource<object>();
var tcs = new TaskCompletionSource();

var connection = new Mock<DefaultConnectionContext> { CallBase = true }.Object;
connection.ConnectionClosed = new CancellationToken(canceled: true);
Expand All @@ -47,7 +47,7 @@ public async Task OnConnectionCreatesLogScopeWithConnectionId()
Assert.True(pairs.ContainsKey("ConnectionId"));
Assert.Equal(connection.ConnectionId, pairs["ConnectionId"]);

tcs.TrySetResult(null);
tcs.TrySetResult();

await task;

Expand Down
8 changes: 4 additions & 4 deletions src/Servers/Kestrel/Core/test/HeartbeatTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public async Task HeartbeatTakingLongerThanIntervalIsLoggedAsError()
var debugger = new Mock<IDebugger>();
var kestrelTrace = new Mock<IKestrelTrace>();
var handlerMre = new ManualResetEventSlim();
var handlerStartedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var handlerStartedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var now = systemClock.UtcNow;

heartbeatHandler.Setup(h => h.OnHeartbeat(now)).Callback(() =>
{
handlerStartedTcs.SetResult(null);
handlerStartedTcs.SetResult();
handlerMre.Wait();
});
debugger.Setup(d => d.IsAttached).Returns(false);
Expand Down Expand Up @@ -67,12 +67,12 @@ public async Task HeartbeatTakingLongerThanIntervalIsNotLoggedAsErrorIfDebuggerA
var debugger = new Mock<IDebugger>();
var kestrelTrace = new Mock<IKestrelTrace>();
var handlerMre = new ManualResetEventSlim();
var handlerStartedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var handlerStartedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var now = systemClock.UtcNow;

heartbeatHandler.Setup(h => h.OnHeartbeat(now)).Callback(() =>
{
handlerStartedTcs.SetResult(null);
handlerStartedTcs.SetResult();
handlerMre.Wait();
});

Expand Down
6 changes: 3 additions & 3 deletions src/Servers/Kestrel/Core/test/HttpConnectionTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.IO.Pipelines;
Expand Down Expand Up @@ -29,14 +29,14 @@ public async Task WriteDataRateTimeoutAbortsConnection()

var httpConnection = new HttpConnection(httpConnectionContext);

var aborted = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var aborted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var http1Connection = new Http1Connection(httpConnectionContext);

httpConnection.Initialize(http1Connection);
http1Connection.Reset();
http1Connection.RequestAborted.Register(() =>
{
aborted.SetResult(null);
aborted.SetResult();
});

httpConnection.OnTimeout(TimeoutReason.WriteDataRate);
Expand Down
10 changes: 5 additions & 5 deletions src/Servers/Kestrel/Core/test/KestrelServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public async Task StopAsyncDispatchesSubsequentStopAsyncContinuations()
}
};

var unbindTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var unbindTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

var mockTransport = new Mock<IConnectionListener>();
var mockTransportFactory = new Mock<IConnectionListenerFactory>();
Expand Down Expand Up @@ -411,7 +411,7 @@ public async Task StopAsyncDispatchesSubsequentStopAsyncContinuations()
stopTask1.Wait();
});

unbindTcs.SetResult(null);
unbindTcs.SetResult();

// If stopTask2 is completed inline by the first call to StopAsync, stopTask1 will never complete.
await stopTask1.DefaultTimeout();
Expand Down Expand Up @@ -467,16 +467,16 @@ public async Task ReloadsOnConfigurationChangeWhenOptedIn()
}).Build();

Func<Task> changeCallback = null;
TaskCompletionSource<object> changeCallbackRegisteredTcs = null;
TaskCompletionSource changeCallbackRegisteredTcs = null;

var mockChangeToken = new Mock<IChangeToken>();
mockChangeToken.Setup(t => t.RegisterChangeCallback(It.IsAny<Action<object>>(), It.IsAny<object>())).Returns<Action<object>, object>((callback, state) =>
{
changeCallbackRegisteredTcs?.SetResult(null);
changeCallbackRegisteredTcs?.SetResult();

changeCallback = () =>
{
changeCallbackRegisteredTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
changeCallbackRegisteredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
callback(state);
return changeCallbackRegisteredTcs.Task;
};
Expand Down
4 changes: 2 additions & 2 deletions src/Servers/Kestrel/Core/test/MessageBodyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -859,11 +859,11 @@ public async Task LogsWhenStopsReadingRequestBody()
{
using (var input = new TestInput())
{
var logEvent = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var logEvent = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var mockLogger = new Mock<IKestrelTrace>();
mockLogger
.Setup(logger => logger.RequestBodyDone("ConnectionId", "RequestId"))
.Callback(() => logEvent.SetResult(null));
.Callback(() => logEvent.SetResult());
mockLogger
.Setup(logger => logger.IsEnabled(Extensions.Logging.LogLevel.Debug))
.Returns(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal partial class LibuvConnection : TransportConnection

private MemoryHandle _bufferHandle;
private Task _processingTask;
private readonly TaskCompletionSource<object> _waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
private bool _connectionClosed;

public LibuvConnection(UvStreamHandle socket,
Expand Down Expand Up @@ -253,7 +253,7 @@ private void FireConnectionClosed()
{
state.CancelConnectionClosedToken();

state._waitForConnectionClosedTcs.TrySetResult(null);
state._waitForConnectionClosedTcs.TrySetResult();
},
this,
preferLocal: false);
Expand Down
10 changes: 5 additions & 5 deletions src/Servers/Kestrel/Transport.Libuv/src/Internal/LibuvThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class LibuvThread : PipeScheduler
private readonly LibuvFunctions _libuv;
private readonly IHostApplicationLifetime _appLifetime;
private readonly Thread _thread;
private readonly TaskCompletionSource<object> _threadTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _threadTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly UvLoopHandle _loop;
private readonly UvAsyncHandle _post;
private Queue<Work> _workAdding = new Queue<Work>(1024);
Expand Down Expand Up @@ -224,7 +224,7 @@ public Task PostAsync<T>(Action<T> callback, T state)
return Task.CompletedTask;
}

var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var work = new Work
{
CallbackAdapter = CallbackAdapter<T>.PostAsyncCallbackAdapter,
Expand Down Expand Up @@ -344,7 +344,7 @@ private void ThreadStart(object parameter)
_closeError = _closeError == null ? ex : new AggregateException(_closeError, ex);
}
WriteReqPool.Dispose();
_threadTcs.SetResult(null);
_threadTcs.SetResult();

#if DEBUG && !INNER_LOOP
// Check for handle leaks after disposing everything
Expand Down Expand Up @@ -383,7 +383,7 @@ private bool DoPostWork()
try
{
work.CallbackAdapter(work.Callback, work.State);
work.Completion?.TrySetResult(null);
work.Completion?.TrySetResult();
}
catch (Exception ex)
{
Expand Down Expand Up @@ -446,7 +446,7 @@ private struct Work
public Action<object, object> CallbackAdapter;
public object Callback;
public object State;
public TaskCompletionSource<object> Completion;
public TaskCompletionSource Completion;
}

private struct CloseHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public async Task NonListenerPipeConnectionsAreLoggedAndIgnored()
}

// Create a pipe connection and keep it open without sending any data
var connectTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var connectTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var connectionTrace = new LibuvTrace(new TestApplicationErrorLogger());
var pipe = new UvPipeHandle(connectionTrace);

Expand All @@ -147,7 +147,7 @@ public async Task NonListenerPipeConnectionsAreLoggedAndIgnored()
}
else
{
connectTcs.SetResult(null);
connectTcs.SetResult();
}
},
null);
Expand Down
Loading

0 comments on commit 49aecc3

Please sign in to comment.