Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wasm] Cancel pending ReadableStream on Dispose or cancelationToken #64285

Merged
merged 9 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ public void Close()
CloseWebSocket();
}

public async Task WaitForCloseAsync(CancellationToken cancellationToken)
{
while (_websocket != null
? _websocket.State != WebSocketState.Closed
: !(_socket.Poll(1, SelectMode.SelectRead) && _socket.Available == 0))
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(100);
}
}

public void Shutdown(SocketShutdown how)
{
_socket?.Shutdown(how);
Expand Down Expand Up @@ -138,6 +149,9 @@ public abstract class GenericLoopbackConnection : IDisposable
/// <summary>Waits for the client to signal cancellation.</summary>
public abstract Task WaitForCancellationAsync(bool ignoreIncomingData = true);

/// <summary>Waits for the client to signal cancellation.</summary>
public abstract Task WaitForCloseAsync(CancellationToken cancellationToken);

/// <summary>Helper function to make it easier to convert old test with strings.</summary>
public async Task SendResponseBodyAsync(string content, bool isFinal = true)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,5 +984,10 @@ public override async Task WaitForCancellationAsync(bool ignoreIncomingData = tr
RstStreamFrame rstStreamFrame = Assert.IsType<RstStreamFrame>(frame);
Assert.Equal((int)ProtocolErrors.CANCEL, rstStreamFrame.ErrorCode);
}

public override Task WaitForCloseAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Net.Http.Functional.Tests;
using Xunit;
using System.Threading;

namespace System.Net.Test.Common
{
Expand Down Expand Up @@ -305,6 +306,11 @@ public override async Task WaitForCancellationAsync(bool ignoreIncomingData = tr
{
await GetOpenRequest().WaitForCancellationAsync(ignoreIncomingData).ConfigureAwait(false);
}

public override Task WaitForCloseAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public HttpClientHandler_Cancellation_Test(ITestOutputHelper output) : base(outp
[Theory]
[InlineData(false, CancellationMode.Token)]
[InlineData(true, CancellationMode.Token)]
[ActiveIssue("https://github.com/dotnet/runtime/issues/36634", TestPlatforms.Browser)] // out of memory
public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(bool chunkedTransfer, CancellationMode mode)
{
if (LoopbackServerFactory.Version >= HttpVersion20.Value && chunkedTransfer)
Expand Down Expand Up @@ -228,10 +229,21 @@ await LoopbackServerFactory.CreateServerAsync(async (server, url) =>
await connection.ReadRequestDataAsync();
await connection.SendResponseAsync(HttpStatusCode.OK, headers: headers, isFinal: false);
await clientFinished.Task;

#if TARGET_BROWSER
// make sure that the browser closed the connection
await connection.WaitForCloseAsync(CancellationToken.None);
#endif
});

var req = new HttpRequestMessage(HttpMethod.Get, url) { Version = UseVersion };
req.Headers.ConnectionClose = connectionClose;

#if TARGET_BROWSER
var WebAssemblyEnableStreamingResponseKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
req.Options.Set(WebAssemblyEnableStreamingResponseKey, true);
#endif

Task<HttpResponseMessage> getResponse = client.SendAsync(TestAsync, req, HttpCompletionOption.ResponseHeadersRead, cts.Token);
await ValidateClientCancellationAsync(async () =>
{
Expand All @@ -247,7 +259,6 @@ await ValidateClientCancellationAsync(async () =>
cts.Cancel();
await readTask;
});

try
{
clientFinished.SetResult(true);
Expand All @@ -256,11 +267,13 @@ await ValidateClientCancellationAsync(async () =>
});
}
}

[Theory]
[InlineData(CancellationMode.CancelPendingRequests, false)]
[InlineData(CancellationMode.DisposeHttpClient, false)]
[InlineData(CancellationMode.CancelPendingRequests, true)]
[InlineData(CancellationMode.DisposeHttpClient, true)]
[SkipOnPlatform(TestPlatforms.Browser, "Browser doesn't have blocking synchronous Stream.ReadByte and so it waits for whole body")]
public async Task GetAsync_CancelPendingRequests_DoesntCancelReadAsyncOnResponseStream(CancellationMode mode, bool copyToAsync)
{
if (IsWinHttpHandler && UseVersion >= HttpVersion20.Value)
Expand Down
5 changes: 5 additions & 0 deletions src/libraries/Common/tests/System/Net/Http/LoopbackServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,11 @@ public override async Task WaitForCancellationAsync(bool ignoreIncomingData = tr
}
}
}

public override Task WaitForCloseAsync(CancellationToken cancellationToken)
{
return _socket.WaitForCloseAsync(cancellationToken);
}
}

public override async Task<HttpRequestData> HandleRequestAsync(HttpStatusCode statusCode = HttpStatusCode.OK, IList<HttpHeaderData> headers = null, string content = "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ private static async Task ProcessWebSocketRequest(HttpContext context, WebSocket
var slice = new ArraySegment<byte>(testedBuffer, 0, testedNext.Result);
await control.SendAsync(slice, WebSocketMessageType.Binary, true, cts.Token).ConfigureAwait(false);
}
// did we get TCP FIN?
if (!close && (tested.Poll(1, SelectMode.SelectRead) && tested.Available == 0))
{
close = true;
}
if (!close)
{
testedNext = tested.ReceiveAsync(new Memory<byte>(testedBuffer), SocketFlags.None, cts.Token).AsTask();
Expand Down Expand Up @@ -142,14 +147,14 @@ private static async Task ProcessWebSocketRequest(HttpContext context, WebSocket
}
catch (WebSocketException ex)
{
logger.LogWarning("ProcessWebSocketRequest closing failed", ex);
logger.LogWarning("RemoteLoopHandler.ProcessWebSocketRequest closing failed", ex);
}
}
cts.Cancel();
}
catch (Exception ex)
{
logger.LogError("ProcessWebSocketRequest failed", ex);
logger.LogError("RemoteLoopHandler.ProcessWebSocketRequest failed", ex);
}
finally
{
Expand Down
Loading