diff --git a/crates/test-programs/src/bin/api_proxy_streaming.rs b/crates/test-programs/src/bin/api_proxy_streaming.rs index 328e8a12d525..39b17827264a 100644 --- a/crates/test-programs/src/bin/api_proxy_streaming.rs +++ b/crates/test-programs/src/bin/api_proxy_streaming.rs @@ -318,8 +318,8 @@ mod executor { http::{ outgoing_handler, types::{ - self, IncomingBody, IncomingResponse, InputStream, OutgoingBody, OutgoingRequest, - OutputStream, + self, FutureTrailers, IncomingBody, IncomingResponse, InputStream, OutgoingBody, + OutgoingRequest, OutputStream, }, }, io::{self, streams::StreamError}, @@ -465,42 +465,102 @@ mod executor { } pub fn incoming_body(body: IncomingBody) -> impl Stream>> { - struct Incoming(Option<(InputStream, IncomingBody)>); + enum Inner { + Stream { + stream: InputStream, + body: IncomingBody, + }, + Trailers(FutureTrailers), + Closed, + } + + struct Incoming(Inner); impl Drop for Incoming { fn drop(&mut self) { - if let Some((stream, body)) = self.0.take() { - drop(stream); - IncomingBody::finish(body); + match mem::replace(&mut self.0, Inner::Closed) { + Inner::Stream { stream, body } => { + drop(stream); + IncomingBody::finish(body); + } + Inner::Trailers(_) | Inner::Closed => {} } } } stream::poll_fn({ let stream = body.stream().expect("response body should be readable"); - let pair = Incoming(Some((stream, body))); + let mut incoming = Incoming(Inner::Stream { stream, body }); move |context| { - if let Some((stream, _)) = &pair.0 { - match stream.read(READ_SIZE) { - Ok(buffer) => { - if buffer.is_empty() { - WAKERS - .lock() - .unwrap() - .push((stream.subscribe(), context.waker().clone())); - Poll::Pending - } else { - Poll::Ready(Some(Ok(buffer))) + loop { + match &incoming.0 { + Inner::Stream { stream, .. } => match stream.read(READ_SIZE) { + Ok(buffer) => { + return if buffer.is_empty() { + WAKERS + .lock() + .unwrap() + .push((stream.subscribe(), context.waker().clone())); + Poll::Pending + } else { + Poll::Ready(Some(Ok(buffer))) + }; + } + Err(StreamError::Closed) => { + let Inner::Stream { stream, body } = + mem::replace(&mut incoming.0, Inner::Closed) + else { + unreachable!(); + }; + drop(stream); + incoming.0 = Inner::Trailers(IncomingBody::finish(body)); + } + Err(StreamError::LastOperationFailed(error)) => { + return Poll::Ready(Some(Err(anyhow!( + "{}", + error.to_debug_string() + )))); + } + }, + + Inner::Trailers(trailers) => { + match trailers.get() { + Some(Ok(trailers)) => { + incoming.0 = Inner::Closed; + match trailers { + Ok(Some(_)) => { + // Currently, we just ignore any trailers. TODO: Add a test that + // expects trailers and verify they match the expected contents. + } + Ok(None) => { + // No trailers; nothing else to do. + } + Err(error) => { + // Error reading the trailers: pass it on to the application. + return Poll::Ready(Some(Err(anyhow!("{error:?}")))); + } + } + } + Some(Err(_)) => { + // Should only happen if we try to retrieve the trailers twice, i.e. a bug in + // this code. + unreachable!(); + } + None => { + WAKERS + .lock() + .unwrap() + .push((trailers.subscribe(), context.waker().clone())); + return Poll::Pending; + } } } - Err(StreamError::Closed) => Poll::Ready(None), - Err(StreamError::LastOperationFailed(error)) => { - Poll::Ready(Some(Err(anyhow!("{}", error.to_debug_string())))) + + Inner::Closed => { + return Poll::Ready(None); } } - } else { - Poll::Ready(None) } } }) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 5f16f42393f0..5913a4922df9 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -347,12 +347,9 @@ impl Subscribe for HostFutureTrailers { // were reached. It's up to us now to complete the body. Ok(StreamEnd::Remaining(b)) => body.body = IncomingBodyState::Start(b), - // Technically this shouldn't be possible as the sender - // shouldn't get destroyed without receiving a message. Handle - // this just in case though. + // This means there were no trailers present. Err(_) => { - debug_assert!(false, "should be unreachable"); - *self = HostFutureTrailers::Done(Err(types::ErrorCode::ConnectionTerminated)); + *self = HostFutureTrailers::Done(Ok(None)); } } }