Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop doing sync over async in Produce100Continue #31650

Merged
merged 13 commits into from
Apr 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

public override bool TryReadInternal(out ReadResult readResult)
{
TryStart();
TryStartAsync();
wtgodbe marked this conversation as resolved.
Show resolved Hide resolved

var boolResult = _requestBodyPipe.Reader.TryRead(out _readResult);

Expand All @@ -61,7 +61,7 @@ public override bool TryReadInternal(out ReadResult readResult)

public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default)
{
TryStart();
await TryStartAsync();

try
{
Expand Down Expand Up @@ -101,7 +101,7 @@ private async Task PumpAsync()

if (!awaitable.IsCompleted)
{
TryProduceContinue();
await TryProduceContinueAsync();
}

while (true)
Expand Down Expand Up @@ -171,7 +171,7 @@ protected override ValueTask OnStopAsync()
// call complete here on the reader
_requestBodyPipe.Reader.Complete();

Debug.Assert(_pumpTask != null, "OnReadStarted must have been called.");
Debug.Assert(_pumpTask != null, "OnReadStartedAsync must have been called.");

// PumpTask catches all Exceptions internally.
if (_pumpTask.IsCompleted)
Expand All @@ -195,9 +195,10 @@ private async ValueTask StopAsyncAwaited(Task pumpTask)
_requestBodyPipe.Reset();
}

protected override void OnReadStarted()
protected override Task OnReadStartedAsync()
{
_pumpTask = PumpAsync();
return Task.CompletedTask;
}

private bool Read(ReadOnlySequence<byte> readableBuffer, PipeWriter writableBuffer, out SequencePosition consumed, out SequencePosition examined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public override async ValueTask<ReadResult> ReadAsyncInternal(CancellationToken
KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
}

TryStart();
await TryStartAsync();

// The while(true) loop is required because the Http1 connection calls CancelPendingRead to unblock
// the call to StartTimingReadAsync to check if the request timed out.
Expand Down Expand Up @@ -132,7 +132,7 @@ public override bool TryReadInternal(out ReadResult readResult)
KestrelBadHttpRequestException.Throw(RequestRejectionReason.RequestBodyTimeout);
}

TryStart();
TryStartAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a behavior change to go from blocking to fire-and-forget? Can TryStartAsync throw asynchronously like if the client has disconnected?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Write100ContinueAsync() could realistically return an incomplete ValueTask this is a change in behavior. Not sure how that could ever happen though. If it could happen, not blocking is waaay better than block because at least you return the thread to the pool. TryStartAsync should never be able to throw asynchronously because it's a write without a canceled token.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an assert. I dislike the fire and forget as well unless we have an explicit check that it's always completed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, plus a10c9d9

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love that bug 😄

Copy link
Member

@halter73 halter73 Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike the fire and forget

I totally agree. I also dislike fire-and-forget with a passion, but what's even worse is blocking on I/O. This isn't the only place we don't await output-writing ValueTasks we expect to be usually completed.

Now unlike with HTTP/2 where you could conceive of a scenario where output backpressure has built up prior to sending a WINDOW_UPDATE or RST_STREAM, it's hard to come up with one for an HTTP/1.1 "100 continue" response. That said, it's not a mathematical invariant. I could write a unit test with a custom transport and write buffering disabled to force it. For that reason, I don't think we should add this Debug.Assert like we don't for WINDOW_UPDATE or RST_STREAM. If we think it's worthwhile we can store the task and await it in ConsumeAsync(), but I don't think that's necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me - if it matches what we do for RST_STREAM and WINDOW_UPDATE seems like we can remove the assert here


// The while(true) because we don't want to return a canceled ReadResult if the user themselves didn't cancel it.
while (true)
Expand Down
16 changes: 5 additions & 11 deletions src/Servers/Kestrel/Core/src/Internal/Http/HttpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -909,27 +909,21 @@ protected bool VerifyResponseContentLength([NotNullWhen(false)] out Exception? e
return true;
}

public void ProduceContinue()
public ValueTask<FlushResult> ProduceContinueAsync()
{
if (HasResponseStarted)
{
return;
return default;
}

if (_httpVersion != Http.HttpVersion.Http10 &&
((IHeaderDictionary)HttpRequestHeaders).TryGetValue(HeaderNames.Expect, out var expect) &&
(expect.FirstOrDefault() ?? "").Equals("100-continue", StringComparison.OrdinalIgnoreCase))
{
ValueTask<FlushResult> vt = Output.Write100ContinueAsync();
if (vt.IsCompleted)
{
vt.GetAwaiter().GetResult();
}
else
{
vt.AsTask().GetAwaiter().GetResult();
}
return Output.Write100ContinueAsync();
}

return default;
}

public Task InitializeResponseAsync(int firstWriteByteCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
{
internal interface IHttpResponseControl
{
void ProduceContinue();
ValueTask<FlushResult> ProduceContinueAsync();
Memory<byte> GetMemory(int sizeHint = 0);
Span<byte> GetSpan(int sizeHint = 0);
void Advance(int bytes);
Expand Down
50 changes: 42 additions & 8 deletions src/Servers/Kestrel/Core/src/Internal/Http/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,21 @@ public virtual ValueTask CompleteAsync(Exception? exception)

public virtual Task ConsumeAsync()
{
TryStart();
Task startTask = TryStartAsync();
if (!startTask.IsCompletedSuccessfully)
{
return ConsumeAwaited(startTask);
}

return OnConsumeAsync();
}

private async Task ConsumeAwaited(Task startTask)
{
await startTask;
await OnConsumeAsync();
}

public virtual ValueTask StopAsync()
{
TryStop();
Expand All @@ -93,20 +103,22 @@ public virtual void Reset()
_examinedUnconsumedBytes = 0;
}

protected void TryProduceContinue()
protected ValueTask<FlushResult> TryProduceContinueAsync()
{
if (_send100Continue)
{
_context.HttpResponseControl.ProduceContinue();
_send100Continue = false;
return _context.HttpResponseControl.ProduceContinueAsync();
}

return default;
}

protected void TryStart()
protected Task TryStartAsync()
{
if (_context.HasStartedConsumingRequestBody)
{
return;
return Task.CompletedTask;
}

OnReadStarting();
Expand All @@ -128,7 +140,7 @@ protected void TryStart()
}
}

OnReadStarted();
return OnReadStartedAsync();
}

protected void TryStop()
Expand Down Expand Up @@ -165,8 +177,9 @@ protected virtual void OnReadStarting()
{
}

protected virtual void OnReadStarted()
protected virtual Task OnReadStartedAsync()
{
return Task.CompletedTask;
}

protected void AddAndCheckObservedBytes(long observedBytes)
Expand All @@ -183,7 +196,15 @@ protected ValueTask<ReadResult> StartTimingReadAsync(ValueTask<ReadResult> readA
{
if (!readAwaitable.IsCompleted)
{
TryProduceContinue();
ValueTask<FlushResult> continueTask = TryProduceContinueAsync();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we indicate that the ValueTask result has been consumed? @stephentoub submitted a PR to do that for historical code: #31221

if (!continueTask.IsCompletedSuccessfully)
{
    return StartTimingReadAwaited(continueTask, readAwaitable, cancellationToken);
}
else
{
    continueTask.GetAwaiter().GetResult();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I'd have expected the code in the PR to trigger CA2012, providing the same feedback as James. Were no warnings issued for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your .editconfig changes are still there: https://github.com/dotnet/aspnetcore/blob/main/.editorconfig

I'm not familiar with configuring analyzers in editconfigs or CA2021. I'm not sure why it wasn't prompted.

if (!continueTask.IsCompletedSuccessfully)
{
return StartTimingReadAwaited(continueTask, readAwaitable, cancellationToken);
}
else
{
continueTask.GetAwaiter().GetResult();
}

if (_timingEnabled)
{
Expand All @@ -195,6 +216,19 @@ protected ValueTask<ReadResult> StartTimingReadAsync(ValueTask<ReadResult> readA
return readAwaitable;
}

protected async ValueTask<ReadResult> StartTimingReadAwaited(ValueTask<FlushResult> continueTask, ValueTask<ReadResult> readAwaitable, CancellationToken cancellationToken)
{
await continueTask;

if (_timingEnabled)
{
_backpressure = true;
_context.TimeoutControl.StartTimingRead();
}

return await readAwaitable;
}

protected void CountBytesRead(long bytesInReadResult)
{
var numFirstSeenBytes = bytesInReadResult - _alreadyTimedBytes;
Expand Down
16 changes: 12 additions & 4 deletions src/Servers/Kestrel/Core/src/Internal/Http2/Http2MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
Expand All @@ -30,13 +32,19 @@ protected override void OnReadStarting()
}
}

protected override void OnReadStarted()
protected override Task OnReadStartedAsync()
{
// Produce 100-continue if no request body data for the stream has arrived yet.
if (!_context.RequestBodyStarted)
{
TryProduceContinue();
ValueTask<FlushResult> continueTask = TryProduceContinueAsync();
if (!continueTask.IsCompletedSuccessfully)
{
return continueTask.GetAsTask();
}
}

return Task.CompletedTask;
}

public override void Reset()
Expand All @@ -59,7 +67,7 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

public override bool TryRead(out ReadResult readResult)
{
TryStart();
TryStartAsync();
wtgodbe marked this conversation as resolved.
Show resolved Hide resolved

var hasResult = _context.RequestBodyPipe.Reader.TryRead(out readResult);

Expand All @@ -80,7 +88,7 @@ public override bool TryRead(out ReadResult readResult)

public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
TryStart();
await TryStartAsync();

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

public override bool TryRead(out ReadResult readResult)
{
TryStart();
TryStartAsync();

var hasResult = _context.RequestBodyPipe.Reader.TryRead(out readResult);

Expand All @@ -67,7 +67,7 @@ public override bool TryRead(out ReadResult readResult)

public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
TryStart();
await TryStartAsync();

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ public async Task RemoveConnectionSpecificHeaders()
}

[Fact]
[QuarantinedTest("https://github.com/dotnet/aspnetcore/issues/31777")]
public async Task ContentLength_Received_NoDataFrames_Reset()
{
var headers = new[]
Expand Down