Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[browser] [wasm] Request Streaming upload #91295

Merged
merged 23 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d94e62d
progress
campersau Aug 27, 2023
c52325d
progress try out byob
campersau Aug 28, 2023
a85a8c8
back to enqueue again
campersau Aug 28, 2023
ec46048
finalize request stream test
campersau Aug 28, 2023
d9e12ce
checkpoint
campersau Aug 29, 2023
5297b09
reject fetch with read stream exception
campersau Aug 29, 2023
df3b0c9
fix build
campersau Aug 30, 2023
cdfeab2
use createPromiseController in pull
campersau Aug 30, 2023
e2193e6
dispose controller
campersau Aug 30, 2023
92ae132
swap delegate allocation with state allocation
campersau Aug 30, 2023
487ef33
add response / request streaming test
campersau Aug 30, 2023
1ebb07d
use createPromiseController for fetch
campersau Aug 30, 2023
2fce6de
Revert "use createPromiseController for fetch"
campersau Aug 30, 2023
cd4957a
rename pull to pull_delegate
campersau Aug 30, 2023
a69fd0a
move reading into ReadableStreamPullState
campersau Aug 30, 2023
2da7d3d
pass HttpCompletionOption.ResponseHeadersRead in streaming test
campersau Aug 31, 2023
7648544
- don't pass controller to C#, only pass pull_state
pavelsavara Aug 31, 2023
dd83582
fix build by passing the correct length
campersau Aug 31, 2023
f91c045
feedback
pavelsavara Aug 31, 2023
f578966
type: "bytes" need be there
pavelsavara Aug 31, 2023
dad30c5
call ReadableStreamControllerEnqueueUnsafe again
campersau Aug 31, 2023
ee3a4a7
BrowserHttpHandler_Streaming not OuterLoop
pavelsavara Aug 31, 2023
a594c92
add comment for streaming request feature detection logic
campersau Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public HttpClientHandler_Cancellation_Test(ITestOutputHelper output) : base(outp
[Theory]
[InlineData(false, CancellationMode.Token)]
[InlineData(true, CancellationMode.Token)]
[ActiveIssue("https://github.com/dotnet/runtime/issues/36634", TestPlatforms.Browser)] // out of memory
public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(bool chunkedTransfer, CancellationMode mode)
{
if (LoopbackServerFactory.Version >= HttpVersion20.Value && chunkedTransfer)
Expand All @@ -42,6 +41,12 @@ public async Task PostAsync_CancelDuringRequestContentSend_TaskCanceledQuickly(b
return;
}

if (PlatformDetection.IsBrowser && LoopbackServerFactory.Version < HttpVersion20.Value)
{
// Browser request streaming is only supported on HTTP/2 or higher
return;
}

var serverRelease = new TaskCompletionSource<bool>();
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
{
Expand All @@ -58,6 +63,13 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
req.Content = new ByteAtATimeContent(int.MaxValue, waitToSend.Task, contentSending, millisecondDelayBetweenBytes: 1);
req.Headers.TransferEncodingChunked = chunkedTransfer;

if (PlatformDetection.IsBrowser)
{
#if !NETFRAMEWORK
req.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest"), true);
#endif
}

Task<HttpResponseMessage> resp = client.SendAsync(TestAsync, req, HttpCompletionOption.ResponseHeadersRead, cts.Token);
waitToSend.SetResult(true);
await Task.WhenAny(contentSending.Task, resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1886,16 +1886,30 @@ await connection.ReadRequestHeaderAndSendCustomResponseAsync(
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure)
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public async Task PostAsync_ThrowFromContentCopy_RequestFails(bool syncFailure, bool enableWasmStreaming)
{
if (UseVersion == HttpVersion30)
{
// TODO: Make this version-indepdendent
return;
}

if (enableWasmStreaming && !PlatformDetection.IsBrowser)
{
// enableWasmStreaming makes only sense on Browser platform
return;
}

if (enableWasmStreaming && PlatformDetection.IsBrowser && UseVersion < HttpVersion20.Value)
{
// Browser request streaming is only supported on HTTP/2 or higher
return;
}

await LoopbackServer.CreateServerAsync(async (server, uri) =>
{
Task responseTask = server.AcceptConnectionAsync(async connection =>
Expand All @@ -1914,8 +1928,20 @@ await LoopbackServer.CreateServerAsync(async (server, uri) =>
canReadFunc: () => true,
readFunc: (buffer, offset, count) => throw error,
readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith<int>(_ => throw error)));
var request = new HttpRequestMessage(HttpMethod.Post, uri);
request.Content = content;

if (PlatformDetection.IsBrowser)
{
if (enableWasmStreaming)
{
#if !NETFRAMEWORK
request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest"), true);
#endif
}
}

Assert.Same(error, await Assert.ThrowsAsync<FormatException>(() => client.PostAsync(uri, content)));
Assert.Same(error, await Assert.ThrowsAsync<FormatException>(() => client.SendAsync(request)));
}
});
}
Expand Down
85 changes: 84 additions & 1 deletion src/libraries/Common/tests/System/Net/Http/ResponseStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,92 @@ await client.GetAsync(remoteServer.EchoUri, HttpCompletionOption.ResponseHeaders
}

#if NETCOREAPP

[OuterLoop]
pavelsavara marked this conversation as resolved.
Show resolved Hide resolved
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
[InlineData(true)]
[InlineData(false)]
public async Task BrowserHttpHandler_StreamingRequest(bool useStringContent)
{
var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");

var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteVerifyUploadServer);

req.Options.Set(WebAssemblyEnableStreamingRequestKey, true);

int expectedBodyLength;
if (useStringContent)
{
string bodyContent = "Hello World";
expectedBodyLength = bodyContent.Length;
req.Content = new StringContent(bodyContent);
}
else
{
expectedBodyLength = 1500 * 1024 * 1024;
int remaining = expectedBodyLength;
req.Content = new StreamContent(new DelegateStream(
readAsyncFunc: (buffer, offset, count, cancellationToken) =>
{
if (remaining > 0)
{
int send = Math.Min(remaining, count);
buffer.AsSpan(offset, send).Fill(65);
remaining -= send;
return Task.FromResult(send);
}
return Task.FromResult(0);
}));
}

req.Content.Headers.Add("Content-MD5-Skip", "browser");

using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server))
using (HttpResponseMessage response = await client.SendAsync(req))
{
Assert.Equal(HttpStatusCode.OK, response.StatusCode);
Assert.Equal(expectedBodyLength.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Body-Length")));
Assert.Equal(useStringContent, response.Headers.Contains("X-HttpRequest-Headers-ContentLength"));
if (useStringContent)
{
Assert.Equal(expectedBodyLength.ToString(), Assert.Single(response.Headers.GetValues("X-HttpRequest-Headers-ContentLength")));
}
}
}

// Duplicate of PostAsync_ThrowFromContentCopy_RequestFails using remote server
[OuterLoop]
[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
[InlineData(false)]
[InlineData(true)]
public async Task BrowserHttpHandler_StreamingRequest_ThrowFromContentCopy_RequestFails(bool syncFailure)
{
var WebAssemblyEnableStreamingRequestKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");

var req = new HttpRequestMessage(HttpMethod.Post, Configuration.Http.Http2RemoteEchoServer);

req.Options.Set(WebAssemblyEnableStreamingRequestKey, true);

Exception error = new FormatException();
var content = new StreamContent(new DelegateStream(
canSeekFunc: () => true,
lengthFunc: () => 12345678,
positionGetFunc: () => 0,
canReadFunc: () => true,
readFunc: (buffer, offset, count) => throw error,
readAsyncFunc: (buffer, offset, count, cancellationToken) => syncFailure ? throw error : Task.Delay(1).ContinueWith<int>(_ => throw error)));

req.Content = content;

using (HttpClient client = CreateHttpClientForRemoteServer(Configuration.Http.RemoteHttp2Server))
{
Assert.Same(error, await Assert.ThrowsAsync<FormatException>(() => client.SendAsync(req)));
}
}

[OuterLoop]
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
public async Task BrowserHttpHandler_Streaming()
public async Task BrowserHttpHandler_StreamingResponse()
{
var WebAssemblyEnableStreamingResponseKey = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
using System.Security.Cryptography;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;

namespace NetCoreServer
{
public class VerifyUploadHandler
{
public static async Task InvokeAsync(HttpContext context)
{
context.Features.Get<IHttpMaxRequestBodySizeFeature>().MaxRequestBodySize = null;

// Report back original request method verb.
context.Response.Headers["X-HttpRequest-Method"] = context.Request.Method;

Expand All @@ -29,12 +32,15 @@ public static async Task InvokeAsync(HttpContext context)
context.Response.Headers["X-HttpRequest-Headers-TransferEncoding"] = transferEncoding;
}

// Get request body.
byte[] requestBodyBytes = await ReadAllRequestBytesAsync(context);
// Compute MD5 hash of received request body.
(byte[] md5Bytes, int bodyLength) = await ComputeMD5HashRequestBodyAsync(context);

// Report back the actual body length.
context.Response.Headers["X-HttpRequest-Body-Length"] = bodyLength.ToString();

// Skip MD5 checksum for empty request body
// Skip MD5 checksum for empty request body
// or for requests which opt to skip it due to [ActiveIssue("https://github.com/dotnet/runtime/issues/37669", TestPlatforms.Browser)]
if (requestBodyBytes.Length == 0 || !string.IsNullOrEmpty(context.Request.Headers["Content-MD5-Skip"]))
if (bodyLength == 0 || !string.IsNullOrEmpty(context.Request.Headers["Content-MD5-Skip"]))
{
context.Response.StatusCode = 200;
return;
Expand All @@ -49,13 +55,7 @@ public static async Task InvokeAsync(HttpContext context)
return;
}

// Compute MD5 hash of received request body.
string actualHash;
using (MD5 md5 = MD5.Create())
{
byte[] hash = md5.ComputeHash(requestBodyBytes);
actualHash = Convert.ToBase64String(hash);
}
string actualHash = Convert.ToBase64String(md5Bytes);

if (expectedHash == actualHash)
{
Expand All @@ -66,21 +66,22 @@ public static async Task InvokeAsync(HttpContext context)
context.Response.StatusCode = 400;
context.Response.SetStatusDescription("Received request body fails MD5 checksum");
}

}

private static async Task<byte[]> ReadAllRequestBytesAsync(HttpContext context)
private static async Task<(byte[] MD5Hash, int BodyLength)> ComputeMD5HashRequestBodyAsync(HttpContext context)
{
Stream requestStream = context.Request.Body;
byte[] buffer = new byte[16 * 1024];
using (MemoryStream ms = new MemoryStream())
using (MD5 md5 = MD5.Create())
{
int read;
int read, size = 0;
while ((read = await requestStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
ms.Write(buffer, 0, read);
size += read;
md5.TransformBlock(buffer, 0, read, buffer, 0);
}
return ms.ToArray();
md5.TransformFinalBlock(buffer, 0, read);
return (md5.Hash, size);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace System.Net.Http
// the JavaScript objects have thread affinity, it is necessary that the continuations run the same thread as the start of the async method.
internal sealed class BrowserHttpHandler : HttpMessageHandler
{
private static readonly HttpRequestOptionsKey<bool> EnableStreamingRequest = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingRequest");
private static readonly HttpRequestOptionsKey<bool> EnableStreamingResponse = new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse");
private static readonly HttpRequestOptionsKey<IDictionary<string, object>> FetchOptions = new HttpRequestOptionsKey<IDictionary<string, object>>("WebAssemblyFetchOptions");
private bool _allowAutoRedirect = HttpHandlerDefaults.DefaultAutomaticRedirection;
Expand Down Expand Up @@ -220,10 +221,64 @@ private static async Task<WasmFetchResponse> CallFetch(HttpRequestMessage reques
}
else
{
byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();
bool streamingEnabled = false;
if (BrowserHttpInterop.SupportsStreamingRequest())
{
request.Options.TryGetValue(EnableStreamingRequest, out streamingEnabled);
}

if (streamingEnabled)
{
Stream stream = await request.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();

byte[]? buffer = null;

var pull = async void (JSObject controller, int desiredSize) =>
{
Memory<byte> view;
if (desiredSize > 0)
{
if (buffer is null || buffer.Length < desiredSize)
{
view = buffer = new byte[desiredSize];
}
else
{
view = buffer.AsMemory(0, desiredSize);
}
}
else
{
view = buffer ??= new byte[65536];
}

try
{
int length = await stream.ReadAsync(view, cancellationToken).ConfigureAwait(true);
using (Buffers.MemoryHandle handle = view.Pin())
{
ReadableStreamControllerEnqueueUnsafe(controller, handle, length);
}
}
catch (Exception ex)
{
BrowserHttpInterop.ReadableStreamControllerError(controller, ex);
}
};

unsafe static void ReadableStreamControllerEnqueueUnsafe(JSObject controller, Buffers.MemoryHandle handle, int length) =>
BrowserHttpInterop.ReadableStreamControllerEnqueue(controller, (IntPtr)handle.Pointer, length);

promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, pull);
}
else
{
byte[] buffer = await request.Content.ReadAsByteArrayAsync(cancellationToken).ConfigureAwait(true);
cancellationToken.ThrowIfCancellationRequested();

promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer);
promise = BrowserHttpInterop.Fetch(uri, headerNames.ToArray(), headerValues.ToArray(), optionNames, optionValues, abortController, buffer);
}
}
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ namespace System.Net.Http
{
internal static partial class BrowserHttpInterop
{
[JSImport("INTERNAL.http_wasm_supports_streaming_request")]
public static partial bool SupportsStreamingRequest();

[JSImport("INTERNAL.http_wasm_supports_streaming_response")]
public static partial bool SupportsStreamingResponse();

Expand All @@ -25,6 +28,17 @@ public static partial void AbortRequest(
public static partial void AbortResponse(
JSObject fetchResponse);

[JSImport("INTERNAL.http_wasm_readable_stream_controller_enqueue")]
public static partial void ReadableStreamControllerEnqueue(
JSObject controller,
IntPtr bufferPtr,
int bufferLength);

[JSImport("INTERNAL.http_wasm_readable_stream_controller_error")]
public static partial void ReadableStreamControllerError(
JSObject controller,
Exception error);

[JSImport("INTERNAL.http_wasm_get_response_header_names")]
private static partial string[] _GetResponseHeaderNames(
JSObject fetchResponse);
Expand Down Expand Up @@ -58,6 +72,16 @@ public static partial Task<JSObject> Fetch(
JSObject abortControler,
string? body = null);

[JSImport("INTERNAL.http_wasm_fetch_stream")]
public static partial Task<JSObject> Fetch(
string uri,
string[] headerNames,
string[] headerValues,
string[] optionNames,
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
[JSMarshalAs<JSType.Function<JSType.Object, JSType.Number>>] Action<JSObject, int> pull);

[JSImport("INTERNAL.http_wasm_fetch_bytes")]
private static partial Task<JSObject> FetchBytes(
string uri,
Expand All @@ -67,8 +91,7 @@ private static partial Task<JSObject> FetchBytes(
[JSMarshalAs<JSType.Array<JSType.Any>>] object?[] optionValues,
JSObject abortControler,
IntPtr bodyPtr,
int bodyLength
);
int bodyLength);

public static unsafe Task<JSObject> Fetch(string uri, string[] headerNames, string[] headerValues, string[] optionNames, object?[] optionValues, JSObject abortControler, byte[] body)
{
Expand Down
Loading
Loading