Skip to content

Commit

Permalink
Prevent WebSockets from throwing during graceful shutdown (#27123)
Browse files Browse the repository at this point in the history
- Fix canceled ReadResult handling in Http1UpgradeMessageBody
- Add UpgradeTests.DoesNotThrowGivenCanceledReadResult
  • Loading branch information
halter73 authored Oct 23, 2020
1 parent 074069f commit c15cd69
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http
/// </summary>
internal sealed class Http1UpgradeMessageBody : Http1MessageBody
{
private int _userCanceled;

public Http1UpgradeMessageBody(Http1Connection context, bool keepAlive)
: base(context, keepAlive)
{
Expand All @@ -26,13 +28,13 @@ public Http1UpgradeMessageBody(Http1Connection context, bool keepAlive)
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
ThrowIfCompleted();
return _context.Input.ReadAsync(cancellationToken);
return ReadAsyncInternal(cancellationToken);
}

public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();
return _context.Input.TryRead(out result);
return TryReadInternal(out result);
}

public override void AdvanceTo(SequencePosition consumed)
Expand All @@ -54,6 +56,7 @@ public override void Complete(Exception exception)

public override void CancelPendingRead()
{
Interlocked.Exchange(ref _userCanceled, 1);
_context.Input.CancelPendingRead();
}

Expand All @@ -69,12 +72,49 @@ public override Task StopAsync()

public override bool TryReadInternal(out ReadResult readResult)
{
return _context.Input.TryRead(out readResult);
// Ignore the canceled readResult unless it was canceled by the user.
do
{
if (!_context.Input.TryRead(out readResult))
{
return false;
}
} while (readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 0);

return true;
}

public override ValueTask<ReadResult> ReadAsyncInternal(CancellationToken cancellationToken = default)
{
return _context.Input.ReadAsync(cancellationToken);
ReadResult readResult;

// Ignore the canceled readResult unless it was canceled by the user.
do
{
var readTask = _context.Input.ReadAsync(cancellationToken);

if (!readTask.IsCompletedSuccessfully)
{
return ReadAsyncInternalAwaited(readTask, cancellationToken);
}

readResult = readTask.GetAwaiter().GetResult();
} while (readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 0);

return new ValueTask<ReadResult>(readResult);
}

private async ValueTask<ReadResult> ReadAsyncInternalAwaited(ValueTask<ReadResult> readTask, CancellationToken cancellationToken = default)
{
var readResult = await readTask;

// Ignore the canceled readResult unless it was canceled by the user.
while (readResult.IsCanceled && Interlocked.Exchange(ref _userCanceled, 0) == 0)
{
readResult = await _context.Input.ReadAsync(cancellationToken);
}

return readResult;
}
}
}
48 changes: 48 additions & 0 deletions src/Servers/Kestrel/test/InMemory.FunctionalTests/UpgradeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
Expand Down Expand Up @@ -375,5 +378,50 @@ await connection.Receive("HTTP/1.1 101 Switching Protocols",
}
}
}

[Fact]
public async Task DoesNotThrowGivenCanceledReadResult()
{
var appCompletedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);

await using var server = new TestServer(async context =>
{
try
{
var upgradeFeature = context.Features.Get<IHttpUpgradeFeature>();
var duplexStream = await upgradeFeature.UpgradeAsync();

// Kestrel will call Transport.Input.CancelPendingRead() during shutdown so idle connections
// can wake up and shutdown gracefully. We manually call CancelPendingRead() to simulate this and
// ensure the Stream returned by UpgradeAsync doesn't throw in this case.
// https://github.com/dotnet/aspnetcore/issues/26482
var connectionTransportFeature = context.Features.Get<IConnectionTransportFeature>();
connectionTransportFeature.Transport.Input.CancelPendingRead();

// Use ReadAsync() instead of CopyToAsync() for this test since IsCanceled is only checked in
// HttpRequestStream.ReadAsync() and not HttpRequestStream.CopyToAsync()
Assert.Equal(0, await duplexStream.ReadAsync(new byte[1]));
appCompletedTcs.SetResult(null);
}
catch (Exception ex)
{
appCompletedTcs.SetException(ex);
throw;
}
},
new TestServiceContext(LoggerFactory));

using (var connection = server.CreateConnection())
{
await connection.SendEmptyGetWithUpgrade();
await connection.Receive("HTTP/1.1 101 Switching Protocols",
"Connection: Upgrade",
$"Date: {server.Context.DateHeaderValue}",
"",
"");
}

await appCompletedTcs.Task.DefaultTimeout();
}
}
}

0 comments on commit c15cd69

Please sign in to comment.