From e3ee55d570c51bb4aefa59bbe22073134ca079be Mon Sep 17 00:00:00 2001 From: "Enzo \"raskyld\" Nocera" Date: Thu, 24 Oct 2024 19:50:53 +0200 Subject: [PATCH] feat: migrate integration crate off-tree Signed-off-by: Enzo "raskyld" Nocera --- Cargo.toml | 27 +++ src/executor.rs | 280 ++++++++++++++++++++++++++ src/handler.rs | 524 ++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 60 ++++++ src/request.rs | 116 +++++++++++ src/response.rs | 144 +++++++++++++ src/utils.rs | 44 ++++ 7 files changed, 1195 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/executor.rs create mode 100644 src/handler.rs create mode 100644 src/lib.rs create mode 100644 src/request.rs create mode 100644 src/response.rs create mode 100644 src/utils.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ec22e2a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "leptos_wasi" +authors = ["Enzo Nocera"] +license = "MIT" +repository = "https://github.com/leptos-rs/leptos_wasi" +description = "WASI integrations for the Leptos web framework." +version = "0.0.0" +edition = "2021" + +[dependencies] +any_spawner = { version = "0.1.1", features = ["futures-executor"] } +throw_error = { version = "0.2.0-rc0" } +hydration_context = { version = "0.2.0-rc0" } +futures = "0.3.30" +wasi = "0.13.1+wasi-0.2.0" +leptos = { version = "0.7.0-rc0", features = ["nonce", "ssr"] } +leptos_meta = { version = "0.7.0-rc0", features = ["ssr"] } +leptos_router = { version = "0.7.0-rc0", features = ["ssr"] } +leptos_macro = { version = "0.7.0-rc0", features = ["generic"] } +leptos_integration_utils = {version = "0.7.0-rc0" } +server_fn = { version = "0.7.0-rc0", features = ["generic"] } +http = "1.1.0" +parking_lot = "0.12.3" +bytes = "1.7.2" +routefinder = "0.5.4" +mime_guess = "2.0" +thiserror = "1.0.65" diff --git a/src/executor.rs b/src/executor.rs new file mode 100644 index 0000000..8c604f8 --- /dev/null +++ b/src/executor.rs @@ -0,0 +1,280 @@ +//! This is (Yet Another) Async Runtime for WASI with first-class support +//! for `.await`-ing on [`Pollable`]. It is an ad-hoc implementation +//! tailored for Leptos but it could be exported into a standalone crate. +//! +//! It is based on the `futures` crate's [`LocalPool`] and makes use of +//! no `unsafe` code. +//! +//! # Performance Notes +//! +//! I haven't benchmarked this runtime but since it makes no use of unsafe code +//! and Rust `core`'s `Context` was prematurely optimised for multi-threading +//! environment, I had no choice but using synchronisation primitives to make +//! the API happy. +//! +//! IIRC, `wasm32` targets have an implementation of synchronisation primitives +//! that are just stubs, downgrading them to their single-threaded counterpart +//! so the overhead should be minimal. +//! +//! Also, you can customise the behaviour of the [`Executor`] using the +//! [`Mode`] enum to trade-off reactivity for less host context switch +//! with the [`Mode::Stalled`] variant. + +use std::{ + cell::RefCell, + future::Future, + mem, + rc::Rc, + sync::{Arc, OnceLock}, + task::{Context, Poll, Wake, Waker}, +}; + +use any_spawner::CustomExecutor; +use futures::{ + channel::mpsc::{UnboundedReceiver, UnboundedSender}, + executor::{LocalPool, LocalSpawner}, + task::{LocalSpawnExt, SpawnExt}, + FutureExt, Stream, +}; +use parking_lot::Mutex; +use wasi::{ + clocks::monotonic_clock::{subscribe_duration, Duration}, + io::poll::{poll, Pollable}, +}; + +struct TableEntry(Pollable, Waker); + +static POLLABLE_SINK: OnceLock> = OnceLock::new(); + +pub async fn sleep(duration: Duration) { + WaitPoll::new(subscribe_duration(duration)).await +} + +pub struct WaitPoll(WaitPollInner); + +enum WaitPollInner { + Unregistered(Pollable), + Registered(Arc), +} + +impl WaitPoll { + pub fn new(pollable: Pollable) -> Self { + Self(WaitPollInner::Unregistered(pollable)) + } +} + +impl Future for WaitPoll { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { + match &mut self.get_mut().0 { + this @ WaitPollInner::Unregistered(_) => { + let waker = Arc::new(WaitPollWaker::new(cx.waker())); + + if let Some(sender) = POLLABLE_SINK.get() { + if let WaitPollInner::Unregistered(pollable) = mem::replace( + this, + WaitPollInner::Registered(waker.clone()), + ) { + sender + .clone() + .unbounded_send(TableEntry(pollable, waker.into())) + .expect("cannot spawn a new WaitPoll"); + + Poll::Pending + } else { + unreachable!(); + } + } else { + panic!( + "cannot create a WaitPoll before creating an Executor" + ); + } + } + WaitPollInner::Registered(waker) => { + let mut lock = waker.0.lock(); + if lock.done { + Poll::Ready(()) + } else { + // How can it happen?! :O + // Well, if, for some reason, the Task get woken up for + // another reason than the pollable associated with this + // WaitPoll got ready. + // + // We need to make sure we update the waker. + lock.task_waker = cx.waker().clone(); + Poll::Pending + } + } + } + } +} + +struct WaitPollWaker(Mutex); + +struct WaitPollWakerInner { + done: bool, + task_waker: Waker, +} + +impl WaitPollWaker { + fn new(waker: &Waker) -> Self { + Self(Mutex::new(WaitPollWakerInner { + done: false, + task_waker: waker.clone(), + })) + } +} + +impl Wake for WaitPollWaker { + fn wake(self: std::sync::Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &std::sync::Arc) { + let mut lock = self.0.lock(); + lock.task_waker.wake_by_ref(); + lock.done = true; + } +} + +/// Controls how often the [`Executor`] checks for [`Pollable`] readiness. +pub enum Mode { + /// Will check as often as possible for readiness, this have some + /// performance overhead. + Premptive, + + /// Will only check for readiness when no more progress can be made + /// on pooled Futures. + Stalled, +} + +#[derive(Clone)] +pub struct Executor(Rc); + +struct ExecutorInner { + pool: RefCell, + spawner: LocalSpawner, + rx: RefCell>, + mode: Mode, +} + +impl Executor { + pub fn new(mode: Mode) -> Self { + let pool = LocalPool::new(); + let spawner = pool.spawner(); + let (tx, rx) = futures::channel::mpsc::unbounded(); + + POLLABLE_SINK + .set(tx.clone()) + .expect("calling Executor::new two times is not supported"); + + Self(Rc::new(ExecutorInner { + pool: RefCell::new(pool), + spawner, + rx: RefCell::new(rx), + mode, + })) + } + + pub fn run_until(&self, fut: T) -> T::Output + where + T: Future + 'static, + { + let (tx, mut rx) = futures::channel::oneshot::channel::(); + self.spawn_local(Box::pin(fut.then(|val| async move { + if tx.send(val).is_err() { + panic!("failed to send the return value of the future passed to run_until"); + } + }))); + + loop { + match rx.try_recv() { + Err(_) => panic!( + "internal error: sender of run until has been dropped" + ), + Ok(Some(val)) => return val, + Ok(None) => { + self.poll_local(); + } + } + } + } +} + +impl CustomExecutor for Executor { + fn spawn(&self, fut: any_spawner::PinnedFuture<()>) { + self.0.spawner.spawn(fut).unwrap(); + } + + fn spawn_local(&self, fut: any_spawner::PinnedLocalFuture<()>) { + self.0.spawner.spawn_local(fut).unwrap(); + } + + fn poll_local(&self) { + let mut pool = match self.0.pool.try_borrow_mut() { + Ok(pool) => pool, + // Nested call to poll_local(), noop. + Err(_) => return, + }; + + match self.0.mode { + Mode::Premptive => { + pool.try_run_one(); + } + Mode::Stalled => pool.run_until_stalled(), + }; + + let (lower, upper) = self.0.rx.borrow().size_hint(); + let capacity = upper.unwrap_or(lower); + let mut entries = Vec::with_capacity(capacity); + let mut rx = self.0.rx.borrow_mut(); + + loop { + match rx.try_next() { + Ok(None) => break, + Ok(Some(entry)) => { + entries.push(Some(entry)); + } + Err(_) => break, + } + } + + if entries.is_empty() { + // This could happen if some Futures use Waker that are not + // registered through [`WaitPoll`] or that we are blocked + // because some Future returned `Poll::Pending` without + // actually making sure their Waker is called at some point. + return; + } + + let pollables = entries + .iter() + .map(|entry| &entry.as_ref().unwrap().0) + .collect::>(); + + let ready = poll(&pollables); + + if let Some(sender) = POLLABLE_SINK.get() { + let sender = sender.clone(); + + // Wakes futures subscribed to ready pollable. + for index in ready { + let wake = entries[index as usize].take().unwrap().1; + wake.wake(); + } + + // Requeue not ready pollable. + for entry in entries.into_iter().flatten() { + sender + .unbounded_send(entry) + .expect("the sender channel is closed"); + } + } else { + unreachable!(); + } + } +} diff --git a/src/handler.rs b/src/handler.rs new file mode 100644 index 0000000..58be7a0 --- /dev/null +++ b/src/handler.rs @@ -0,0 +1,524 @@ +#![forbid(unsafe_code)] + +use std::sync::Arc; + +use bytes::Bytes; +use futures::{ + stream::{self, once}, + StreamExt, +}; +use http::{ + header::{ACCEPT, LOCATION, REFERER}, + request::Parts, + HeaderValue, StatusCode, Uri, +}; +use hydration_context::SsrSharedContext; +use leptos::{ + prelude::{provide_context, Owner, ScopedFuture}, + server_fn::{ + codec::Encoding, http_export::Request, + response::generic::Body as ServerFnBody, ServerFn, ServerFnTraitObj, + }, + IntoView, +}; +use leptos_integration_utils::{ExtendResponse, PinnedStream}; +use leptos_meta::ServerMetaContext; +use leptos_router::{ + components::provide_server_redirect, location::RequestUrl, PathSegment, + RouteList, RouteListing, SsrMode, +}; +use mime_guess::MimeGuess; +use routefinder::Router; +use server_fn::middleware::Service; +use thiserror::Error; + +use wasi::http::types::{ + IncomingRequest, OutgoingBody, OutgoingResponse, ResponseOutparam, +}; + +use crate::{ + response::{Body, Response, ResponseOptions}, + utils::redirect, + CHUNK_BYTE_SIZE, +}; + +/// Handle routing, static file serving and response tx using the low-level +/// `wasi:http` APIs. +/// +/// ## Usage +/// +/// Please, note that the handler expect to be run with a local Executor initiated. +/// +/// ``` +/// use leptos_wasi::prelude::Handler; +/// +/// let conf = get_configuration(None).unwrap(); +/// let leptos_options = conf.leptos_options; +/// +/// Handler::build(request, response_out) +/// .expect("could not create handler") +/// // Those two functions should be called first because they can +/// // *shortcut* the handler, see "Performance Considerations". +/// +/// // Any HTTP request prefixed with `/pkg` will call the passed +/// // `serve_static_files` function to deliver static files. +/// .static_files_handler("/pkg", serve_static_files) +/// .with_server_fn::() +/// // Fetch all available routes from your App. +/// .generate_routes(App) +/// // Actually process the request and write the response. +/// .handle_with_context(move || shell(leptos_options.clone()), || {}).await.expect("could not handle the request"); +/// ``` +/// +/// ## Performance Considerations +/// +/// This handler is optimised for the special case of WASI Components being spawned +/// on a per-request basis. That is, the lifetime of the component is bound to the +/// one of the request, so we don't do any fancy pre-setup: it means +/// **your Server-Side will always be cold-started**. +/// +/// While it could have a bad impact on the performance of your app, please, know +/// that there is a *shotcut* mechanism implemented that allows the [`Handler`] +/// to shortcut the whole HTTP Rendering and Reactivity logic to directly jump to +/// writting the response in those case: +/// +/// * The user request a static-file, then, calling [`Handler::static_files_handler`] +/// will *shortcut* the handler and all future calls are ignored to reach +/// [`Handler::handle_with_context`] *almost* instantly. +/// * The user reach a server function, then, calling [`Handler::with_server_fn`] +/// will check if the request's path matches the one from the passed server functions, +/// if so, *shortcut* the handler. +/// +/// This implementation ensures that, even though your component is cold-started +/// on each request, the performance are good. Please, note that this approach is +/// directly enabled by the fact WASI Components have under-millisecond start-up +/// times! It wouldn't be practical to do that with traditional container-based solutions. +/// +/// ## Limitations +/// +/// [`SsrMode::Static`] is not implemented yet, having one in your `` +/// will cause [`Handler::handle_with_context`] to panic! +pub struct Handler { + req: Request, + res_out: ResponseOutparam, + + // *shortcut* if any is set + server_fn: + Option, http::Response>>, + preset_res: Option, + should_404: bool, + + // built using the user-defined app_fn + ssr_router: Router, +} + +impl Handler { + /// Wraps the WASI resources to handle the request. + /// Could fail if the [`IncomingRequest`] cannot be converted to + /// a [`http:Request`]. + pub fn build( + req: IncomingRequest, + res_out: ResponseOutparam, + ) -> Result { + Ok(Self { + req: crate::request::Request(req).try_into()?, + res_out, + server_fn: None, + preset_res: None, + ssr_router: Router::new(), + should_404: false, + }) + } + + // Test whether we are ready to send a response to shortcut some + // code and provide a fast-path. + #[inline] + const fn shortcut(&self) -> bool { + self.server_fn.is_some() || self.preset_res.is_some() || self.should_404 + } + + /// Tests if the request path matches the bound server function + /// and *shortcut* the [`Handler`] to quickly reach + /// the call to [`Handler::handle_with_context`]. + pub fn with_server_fn(mut self) -> Self + where + T: ServerFn< + ServerRequest = Request, + ServerResponse = http::Response, + > + 'static, + { + if self.shortcut() { + return self; + } + + if self.req.method() == T::InputEncoding::METHOD + && self.req.uri().path() == T::PATH + { + self.server_fn = Some(ServerFnTraitObj::new( + T::PATH, + T::InputEncoding::METHOD, + |request| Box::pin(T::run_on_server(request)), + T::middlewares, + )); + } + + self + } + + /// If the request is prefixed with `prefix` [`Uri`], then + /// the handler will call the passed `handler` with the Uri trimmed of + /// the prefix. If the closure returns + /// None, the response will be 404, otherwise, the returned [`Body`] + /// will be served as-if. + /// + /// This function, when matching, *shortcut* the [`Handler`] to quickly reach + /// the call to [`Handler::handle_with_context`]. + pub fn static_files_handler( + mut self, + prefix: T, + handler: impl Fn(String) -> Option + 'static + Send + Clone, + ) -> Self + where + T: TryInto, + >::Error: std::error::Error, + { + if self.shortcut() { + return self; + } + + if let Some(trimmed_url) = self.req.uri().path().strip_prefix( + prefix.try_into().expect("you passed an invalid Uri").path(), + ) { + match handler(trimmed_url.to_string()) { + None => self.should_404 = true, + Some(body) => { + let mut res = http::Response::new(body); + let mime = MimeGuess::from_path(trimmed_url); + + res.headers_mut().insert( + http::header::CONTENT_TYPE, + HeaderValue::from_str( + mime.first_or_octet_stream().as_ref(), + ) + .expect("internal error: could not parse MIME type"), + ); + + self.preset_res = Some(Response(res)); + } + } + } + + self + } + + /// This mocks a request to the `app_fn` component to extract your + /// ``'s ``. + pub fn generate_routes( + self, + app_fn: impl Fn() -> IV + 'static + Send + Clone, + ) -> Self + where + IV: IntoView + 'static, + { + self.generate_routes_with_exclusions_and_context(app_fn, None, || {}) + } + + /// This mocks a request to the `app_fn` component to extract your + /// ``'s ``. + /// + /// You can pass an `additional_context` to [`provide_context`] to the + /// application. + pub fn generate_routes_with_context( + self, + app_fn: impl Fn() -> IV + 'static + Send + Clone, + additional_context: impl Fn() + 'static + Send + Clone, + ) -> Self + where + IV: IntoView + 'static, + { + self.generate_routes_with_exclusions_and_context( + app_fn, + None, + additional_context, + ) + } + + /// This mocks a request to the `app_fn` component to extract your + /// ``'s ``. + /// + /// You can pass an `additional_context` to [`provide_context`] to the + /// application. + /// + /// You can pass a list of `excluded_routes` to avoid generating them. + pub fn generate_routes_with_exclusions_and_context( + mut self, + app_fn: impl Fn() -> IV + 'static + Send + Clone, + excluded_routes: Option>, + additional_context: impl Fn() + 'static + Send + Clone, + ) -> Self + where + IV: IntoView + 'static, + { + // If we matched a server function, we do not need to go through + // all of that. + if self.shortcut() { + return self; + } + + if !self.ssr_router.is_empty() { + panic!("generate_routes was called twice"); + } + + let owner = Owner::new_root(Some(Arc::new(SsrSharedContext::new()))); + let (mock_meta, _) = ServerMetaContext::new(); + let routes = owner + .with(|| { + // as we are generating the app to extract + // the , we want to mock the root path. + provide_context(RequestUrl::new("")); + provide_context(ResponseOptions::default()); + provide_context(http::uri::Parts::default()); + provide_context(mock_meta); + additional_context(); + RouteList::generate(&app_fn) + }) + .unwrap_or_default() + .into_inner() + .into_iter() + .map(|rt| (rt.path().to_rf_str_representation(), rt)) + .filter(|route| { + excluded_routes.as_ref().map_or(true, |excluded_routes| { + !excluded_routes.iter().any(|ex_path| *ex_path == route.0) + }) + }); + + for (path, route_listing) in routes { + self.ssr_router + .add(path, route_listing) + .expect("internal error: impossible to parse a RouteListing"); + } + + self + } + + /// Consumes the [`Handler`] to actually perform all the request handling + /// logic. + /// + /// You can pass an `additional_context` to [`provide_context`] to the + /// application. + pub async fn handle_with_context( + self, + app: impl Fn() -> IV + 'static + Send + Clone, + additional_context: impl Fn() + 'static + Clone + Send, + ) -> Result<(), HandlerError> + where + IV: IntoView + 'static, + { + let path = self.req.uri().path().to_string(); + let best_match = self.ssr_router.best_match(&path); + let (parts, body) = self.req.into_parts(); + let context_parts = parts.clone(); + let req = Request::from_parts(parts, body); + + let owner = Owner::new(); + let response = owner.with(|| { + ScopedFuture::new(async move { + let res_opts = ResponseOptions::default(); + let response: Option = if self.should_404 { + None + } else if self.preset_res.is_some() { + self.preset_res + } else if let Some(mut sfn) = self.server_fn { + provide_contexts(additional_context, context_parts, res_opts.clone()); + + // store Accepts and Referer in case we need them for redirect (below) + let accepts_html = req + .headers() + .get(ACCEPT) + .and_then(|v| v.to_str().ok()) + .map(|v| v.contains("text/html")) + .unwrap_or(false); + let referrer = req.headers().get(REFERER).cloned(); + + let mut res = sfn.run(req).await; + + // if it accepts text/html (i.e., is a plain form post) and doesn't already have a + // Location set, then redirect to to Referer + if accepts_html { + if let Some(referrer) = referrer { + let has_location = + res.headers().get(LOCATION).is_some(); + if !has_location { + *res.status_mut() = StatusCode::FOUND; + res.headers_mut().insert(LOCATION, referrer); + } + } + } + + Some(res.into()) + } else if let Some(best_match) = best_match { + let listing = best_match.handler(); + let (meta_context, meta_output) = ServerMetaContext::new(); + + let add_ctx = additional_context.clone(); + let additional_context = { + let res_opts = res_opts.clone(); + let meta_ctx = meta_context.clone(); + move || { + provide_contexts(add_ctx, context_parts, res_opts); + provide_context(meta_ctx); + } + }; + + Some(Response::from_app( + app, + meta_output, + additional_context, + res_opts.clone(), + match listing.mode() { + SsrMode::Async => |app, chunks| { + Box::pin(async move { + let app = if cfg!(feature = "islands-router") { + app.to_html_stream_in_order_branching() + } else { + app.to_html_stream_in_order() + }; + let app = app.collect::().await; + let chunks = chunks(); + Box::pin(once(async move { app }).chain(chunks)) as PinnedStream + }) + }, + SsrMode::InOrder => |app, chunks| { + Box::pin(async move { + let app = if cfg!(feature = "islands-router") { + app.to_html_stream_in_order_branching() + } else { + app.to_html_stream_in_order() + }; + Box::pin(app.chain(chunks())) as PinnedStream + }) + }, + SsrMode::PartiallyBlocked | SsrMode::OutOfOrder => |app, chunks| { + Box::pin(async move { + let app = if cfg!(feature = "islands-router") { + app.to_html_stream_out_of_order_branching() + } else { + app.to_html_stream_out_of_order() + }; + Box::pin(app.chain(chunks())) as PinnedStream + }) + }, + SsrMode::Static(_) => panic!("SsrMode::Static routes are not supported yet!") + } + ).await) + } else { + None + }; + + response.map(|mut req| { + req.extend_response(&res_opts); + req + }) + }) + }).await; + + let response = response.unwrap_or_else(|| { + let body = Bytes::from("404 not found"); + let mut res = http::Response::new(Body::Sync(body)); + *res.status_mut() = http::StatusCode::NOT_FOUND; + Response(res) + }); + + let headers = response.headers()?; + let wasi_res = OutgoingResponse::new(headers); + + wasi_res + .set_status_code(response.0.status().as_u16()) + .expect("invalid http status code was returned"); + let body = wasi_res.body().expect("unable to take response body"); + ResponseOutparam::set(self.res_out, Ok(wasi_res)); + + let output_stream = body + .write() + .expect("unable to open writable stream on body"); + let mut input_stream = match response.0.into_body() { + Body::Sync(buf) => Box::pin(stream::once(async { Ok(buf) })), + Body::Async(stream) => stream, + }; + + while let Some(buf) = input_stream.next().await { + let buf = buf.map_err(HandlerError::ResponseStream)?; + let chunks = buf.chunks(CHUNK_BYTE_SIZE); + for chunk in chunks { + output_stream + .blocking_write_and_flush(chunk) + .map_err(HandlerError::from)?; + } + } + + drop(output_stream); + OutgoingBody::finish(body, None) + .map_err(HandlerError::WasiResponseBody)?; + + Ok(()) + } +} + +fn provide_contexts( + additional_context: impl Fn() + 'static + Clone + Send, + context_parts: Parts, + res_opts: ResponseOptions, +) { + additional_context(); + provide_context(RequestUrl::new(context_parts.uri.path())); + provide_context(context_parts); + provide_context(res_opts); + provide_server_redirect(redirect); +} + +trait RouterPathRepresentation { + fn to_rf_str_representation(&self) -> String; +} + +impl RouterPathRepresentation for &[PathSegment] { + fn to_rf_str_representation(&self) -> String { + let mut path = String::new(); + for segment in self.iter() { + // TODO trailing slash handling + let raw = segment.as_raw_str(); + if !raw.is_empty() && !raw.starts_with('/') { + path.push('/'); + } + match segment { + PathSegment::Static(s) => path.push_str(s), + PathSegment::Param(s) => { + path.push(':'); + path.push_str(s); + } + PathSegment::Splat(_) => { + path.push('*'); + } + PathSegment::Unit => {} + } + } + path + } +} + +#[derive(Error, Debug)] +pub enum HandlerError { + #[error("error handling request")] + Request(#[from] crate::request::RequestError), + + #[error("error handling response")] + Response(#[from] crate::response::ResponseError), + + #[error("response stream emitted an error")] + ResponseStream(throw_error::Error), + + #[error("wasi stream failure")] + WasiStream(#[from] wasi::io::streams::StreamError), + + #[error("failed to finish response body")] + WasiResponseBody(wasi::http::types::ErrorCode), +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..777a2df --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,60 @@ +//! A first-party support of the `wasm32-wasip1` target for the **Server-Side** +//! of Leptos using the [`wasi:http`][wasi-http] proposal. +//! +//! [wasi-http]: https://github.com/WebAssembly/wasi-http +//! +//! # `Handler` +//! +//! The [`prelude::Handler`] is the main abstraction you will use. +//! +//! It expects being run in the context of a Future Executor `Task`, +//! since WASI is, at the moment, a single-threaded environment, +//! we provide a simple abstraction in the form of [`leptos::spawn::Executor`] +//! that you can leverage to use this crate. +//! +//! ``` +//! use wasi::exports::http::incoming_handler::*; +//! +//! struct LeptosServer; +//! +//! // NB(raskyld): for now, the types to use for the HTTP handlers are the one from +//! // the `leptos_wasi` crate, not the one generated in your crate. +//! impl Guest for LeptosServer { +//! fn handle(request: IncomingRequest, response_out: ResponseOutparam) { +//! // Initiate a single-threaded [`Future`] Executor so we can run the +//! // rendering system and take advantage of bodies streaming. +//! Executor::init_futures_local_executor().expect("cannot init future executor"); +//! Executor::spawn(async { +//! // declare an async function called `handle_request` and +//! // use the Handler in this function. +//! handle_request(request, response_out).await; +//! }); +//! Executor::run(); +//! } +//! } +//! ``` +//! +//! # WASI Bindings +//! +//! We are using the bindings provided by the `wasi` crate. + +pub mod executor; +pub mod handler; +pub mod request; +pub mod response; +pub mod utils; + +#[allow(clippy::pub_use)] +pub mod prelude { + pub use crate::executor::Executor as WasiExecutor; + pub use crate::handler::Handler; + pub use crate::response::Body; + pub use crate::utils::redirect; + pub use wasi::exports::wasi::http::incoming_handler::{ + IncomingRequest, ResponseOutparam, + }; +} + +/// When working with streams, this crate will try to chunk bytes with +/// this size. +const CHUNK_BYTE_SIZE: usize = 64; diff --git a/src/request.rs b/src/request.rs new file mode 100644 index 0000000..3d10f25 --- /dev/null +++ b/src/request.rs @@ -0,0 +1,116 @@ +use bytes::Bytes; +use http::{uri::Parts, Uri}; +use thiserror::Error; + +use wasi::{ + http::types::{IncomingBody, IncomingRequest, Method, Scheme}, + io::streams::StreamError, +}; + +use crate::CHUNK_BYTE_SIZE; + +pub struct Request(pub IncomingRequest); + +impl TryFrom for http::Request { + type Error = RequestError; + + fn try_from(req: Request) -> Result { + let mut builder = http::Request::builder(); + let req = req.0; + let req_method = method_wasi_to_http(req.method())?; + let headers = req.headers(); + + for (header_name, header_value) in headers.entries() { + builder = builder.header(header_name, header_value); + } + + drop(headers); + + // NB(raskyld): consume could fail if, for some reason the caller + // manage to recreate an IncomingRequest backed by the same underlying + // resource handle (need to dig more to see if that's possible) + let incoming_body = req.consume().expect("could not consume body"); + + let body_stream = incoming_body + .stream() + .expect("could not create a stream from body"); + + let mut body_bytes = Vec::::with_capacity(CHUNK_BYTE_SIZE); + + loop { + match body_stream.blocking_read(CHUNK_BYTE_SIZE as u64) { + Err(StreamError::Closed) => break, + Err(StreamError::LastOperationFailed(err)) => { + return Err(StreamError::LastOperationFailed(err).into()) + } + Ok(data) => { + body_bytes.extend(data); + } + } + } + + let mut uri_parts = Parts::default(); + + uri_parts.scheme = req.scheme().map(scheme_wasi_to_http).transpose()?; + uri_parts.authority = req + .authority() + .map(|aut| { + http::uri::Authority::from_maybe_shared(aut.into_bytes()) + }) + .transpose() + .map_err(http::Error::from)?; + uri_parts.path_and_query = req + .path_with_query() + .map(|paq| { + http::uri::PathAndQuery::from_maybe_shared(paq.into_bytes()) + }) + .transpose() + .map_err(http::Error::from)?; + + drop(body_stream); + IncomingBody::finish(incoming_body); + builder + .method(req_method) + .uri(Uri::from_parts(uri_parts).map_err(http::Error::from)?) + .body(Bytes::from(body_bytes)) + .map_err(RequestError::from) + } +} + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum RequestError { + #[error("failed to convert wasi bindings to http types")] + Http(#[from] http::Error), + + #[error("error while processing wasi:http body stream")] + WasiIo(#[from] wasi::io::streams::StreamError), +} + +pub fn method_wasi_to_http(value: Method) -> Result { + match value { + Method::Connect => Ok(http::Method::CONNECT), + Method::Delete => Ok(http::Method::DELETE), + Method::Get => Ok(http::Method::GET), + Method::Head => Ok(http::Method::HEAD), + Method::Options => Ok(http::Method::OPTIONS), + Method::Patch => Ok(http::Method::PATCH), + Method::Post => Ok(http::Method::POST), + Method::Put => Ok(http::Method::PUT), + Method::Trace => Ok(http::Method::TRACE), + Method::Other(mtd) => { + http::Method::from_bytes(mtd.as_bytes()).map_err(http::Error::from) + } + } +} + +pub fn scheme_wasi_to_http( + value: Scheme, +) -> Result { + match value { + Scheme::Http => Ok(http::uri::Scheme::HTTP), + Scheme::Https => Ok(http::uri::Scheme::HTTPS), + Scheme::Other(oth) => http::uri::Scheme::try_from(oth.as_bytes()) + .map_err(http::Error::from), + } +} diff --git a/src/response.rs b/src/response.rs new file mode 100644 index 0000000..605af7e --- /dev/null +++ b/src/response.rs @@ -0,0 +1,144 @@ +use std::{pin::Pin, sync::Arc}; + +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; +use leptos_integration_utils::ExtendResponse; +use parking_lot::RwLock; + +use server_fn::response::generic::Body as ServerFnBody; +use thiserror::Error; + +use wasi::http::types::{HeaderError, Headers}; + +/// This crate uses platform-agnostic [`http::Response`] +/// with a custom [`Body`] and convert them under the hood to +/// WASI native types. +/// +/// It supports both [`Body::Sync`] and [`Body::Async`], +/// allowing you to choose between synchronous response +/// (i.e. sending the whole response) and asynchronous response +/// (i.e. streaming the response). +pub struct Response(pub http::Response); + +impl Response { + pub fn headers(&self) -> Result { + let headers = Headers::new(); + for (name, value) in self.0.headers() { + headers.append(&name.to_string(), &Vec::from(value.as_bytes()))?; + } + Ok(headers) + } +} + +impl From> for Response +where + T: Into, +{ + fn from(value: http::Response) -> Self { + Self(value.map(Into::into)) + } +} + +pub enum Body { + /// The response body will be written synchronously. + Sync(Bytes), + + /// The response body will be written asynchronously, + /// this execution model is also known as + /// "streaming". + Async( + Pin< + Box< + dyn Stream> + + Send + + 'static, + >, + >, + ), +} + +impl From for Body { + fn from(value: ServerFnBody) -> Self { + match value { + ServerFnBody::Sync(data) => Self::Sync(data), + ServerFnBody::Async(stream) => Self::Async(stream), + } + } +} + +/// This struct lets you define headers and override the status of the Response from an Element or a Server Function +/// Typically contained inside of a ResponseOptions. Setting this is useful for cookies and custom responses. +#[derive(Debug, Clone, Default)] +pub struct ResponseParts { + pub headers: HeaderMap, + pub status: Option, +} + +/// Allows you to override details of the HTTP response like the status code and add Headers/Cookies. +#[derive(Debug, Clone, Default)] +pub struct ResponseOptions(Arc>); + +impl ResponseOptions { + /// A simpler way to overwrite the contents of `ResponseOptions` with a new `ResponseParts`. + #[inline] + pub fn overwrite(&self, parts: ResponseParts) { + *self.0.write() = parts + } + /// Set the status of the returned Response. + #[inline] + pub fn set_status(&self, status: StatusCode) { + self.0.write().status = Some(status); + } + /// Insert a header, overwriting any previous value with the same key. + #[inline] + pub fn insert_header(&self, key: HeaderName, value: HeaderValue) { + self.0.write().headers.insert(key, value); + } + /// Append a header, leaving any header with the same key intact. + #[inline] + pub fn append_header(&self, key: HeaderName, value: HeaderValue) { + self.0.write().headers.append(key, value); + } +} + +impl ExtendResponse for Response { + type ResponseOptions = ResponseOptions; + + fn from_stream( + stream: impl Stream + Send + 'static, + ) -> Self { + let stream = stream.map(|data| { + Result::::Ok(Bytes::from(data)) + }); + + Self(http::Response::new(Body::Async(Box::pin(stream)))) + } + + fn extend_response(&mut self, opt: &Self::ResponseOptions) { + let mut opt = opt.0.write(); + if let Some(status_code) = opt.status { + *self.0.status_mut() = status_code; + } + self.0 + .headers_mut() + .extend(std::mem::take(&mut opt.headers)); + } + + fn set_default_content_type(&mut self, content_type: &str) { + let headers = self.0.headers_mut(); + if !headers.contains_key(http::header::CONTENT_TYPE) { + headers.insert( + http::header::CONTENT_TYPE, + HeaderValue::from_str(content_type).unwrap(), + ); + } + } +} + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ResponseError { + #[error("failed to parse http::Response's headers")] + WasiHeaders(#[from] HeaderError), +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..66ca0ac --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,44 @@ +use http::{header, request::Parts, HeaderName, HeaderValue, StatusCode}; +use leptos::prelude::use_context; +use server_fn::redirect::REDIRECT_HEADER; + +use crate::response::ResponseOptions; + +/// Allow to return an HTTP redirection from components. +pub fn redirect(path: &str) { + if let (Some(req), Some(res)) = + (use_context::(), use_context::()) + { + // insert the Location header in any case + res.insert_header( + header::LOCATION, + header::HeaderValue::from_str(path) + .expect("Failed to create HeaderValue"), + ); + + let accepts_html = req + .headers + .get(header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .map(|v| v.contains("text/html")) + .unwrap_or(false); + if accepts_html { + // if the request accepts text/html, it's a plain form request and needs + // to have the 302 code set + res.set_status(StatusCode::FOUND); + } else { + // otherwise, we sent it from the server fn client and actually don't want + // to set a real redirect, as this will break the ability to return data + // instead, set the REDIRECT_HEADER to indicate that the client should redirect + res.insert_header( + HeaderName::from_static(REDIRECT_HEADER), + HeaderValue::from_str("").unwrap(), + ); + } + } else { + eprintln!( + "Couldn't retrieve either Parts or ResponseOptions while \ + trying to redirect()." + ); + } +}