From 4d96adfca5b0dfa84172b3ad2fe5773e669439b5 Mon Sep 17 00:00:00 2001 From: Dillen Meijboom Date: Thu, 5 Jan 2023 15:49:19 +0100 Subject: [PATCH] Implement bytes_stream() for wasm. (#1713) Co-authored-by: muji --- Cargo.toml | 10 ++++++---- src/wasm/response.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 528b5399e..3e2c7ca17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ multipart = ["mime_guess"] trust-dns = ["trust-dns-resolver"] -stream = ["tokio/fs", "tokio-util"] +stream = ["tokio/fs", "tokio-util", "wasm-streams"] socks = ["tokio-socks"] @@ -83,6 +83,8 @@ bytes = "1.0" serde = "1.0" serde_urlencoded = "0.7.1" tower-service = "0.3" +futures-core = { version = "0.3.0", default-features = false } +futures-util = { version = "0.3.0", default-features = false } # Optional deps... @@ -93,8 +95,6 @@ mime_guess = { version = "2.0", default-features = false, optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] encoding_rs = "0.8" -futures-core = { version = "0.3.0", default-features = false } -futures-util = { version = "0.3.0", default-features = false } http-body = "0.4.0" hyper = { version = "0.14.18", default-features = false, features = ["tcp", "http1", "http2", "client", "runtime"] } h2 = "0.3.10" @@ -154,6 +154,7 @@ js-sys = "0.3.45" serde_json = "1.0" wasm-bindgen = "0.2.68" wasm-bindgen-futures = "0.4.18" +wasm-streams = { version = "0.2", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies.web-sys] version = "0.3.25" @@ -169,7 +170,8 @@ features = [ "BlobPropertyBag", "ServiceWorkerGlobalScope", "RequestCredentials", - "File" + "File", + "ReadableStream" ] [target.'cfg(target_arch = "wasm32")'.dev-dependencies] diff --git a/src/wasm/response.rs b/src/wasm/response.rs index 44041e274..83c7a98c1 100644 --- a/src/wasm/response.rs +++ b/src/wasm/response.rs @@ -5,6 +5,12 @@ use http::{HeaderMap, StatusCode}; use js_sys::Uint8Array; use url::Url; +#[cfg(feature = "stream")] +use wasm_bindgen::JsCast; + +#[cfg(feature = "stream")] +use futures_util::stream::StreamExt; + #[cfg(feature = "json")] use serde::de::DeserializeOwned; @@ -118,6 +124,26 @@ impl Response { Ok(bytes.into()) } + /// Convert the response into a `Stream` of `Bytes` from the body. + #[cfg(feature = "stream")] + pub fn bytes_stream(self) -> impl futures_core::Stream> { + let web_response = self.http.into_body(); + let body = web_response + .body() + .expect("could not create wasm byte stream"); + let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into()); + Box::pin(body.into_stream().map(|buf_js| { + let buffer = Uint8Array::new( + &buf_js + .map_err(crate::error::wasm) + .map_err(crate::error::decode)?, + ); + let mut bytes = vec![0; buffer.length() as usize]; + buffer.copy_to(&mut bytes); + Ok(bytes.into()) + })) + } + // util methods /// Turn a response into an error if the server returned an error.