Skip to content

Commit

Permalink
[wasi-http] Return none from future-trailers.get when trailers ab…
Browse files Browse the repository at this point in the history
…sent (#9208)

* [wasi-http] Return `none` from `future-trailers.get` when trailers absent

Previously, we would return `error-code::connection-terminated` in this case,
whereas the `wasi-http` spec says we should simply return `none` (i.e. it's not
an error if there are no trailers present).

This also adds code to `api_proxy_streaming.rs` to check for trailers after
reading the body content.

Signed-off-by: Joel Dice <joel.dice@fermyon.com>

* minor code cleanup

Signed-off-by: Joel Dice <joel.dice@fermyon.com>

---------

Signed-off-by: Joel Dice <joel.dice@fermyon.com>
  • Loading branch information
dicej authored Sep 6, 2024
1 parent 53a909c commit 75ed0b6
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 28 deletions.
106 changes: 83 additions & 23 deletions crates/test-programs/src/bin/api_proxy_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ mod executor {
http::{
outgoing_handler,
types::{
self, IncomingBody, IncomingResponse, InputStream, OutgoingBody, OutgoingRequest,
OutputStream,
self, FutureTrailers, IncomingBody, IncomingResponse, InputStream, OutgoingBody,
OutgoingRequest, OutputStream,
},
},
io::{self, streams::StreamError},
Expand Down Expand Up @@ -465,42 +465,102 @@ mod executor {
}

pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> {
struct Incoming(Option<(InputStream, IncomingBody)>);
enum Inner {
Stream {
stream: InputStream,
body: IncomingBody,
},
Trailers(FutureTrailers),
Closed,
}

struct Incoming(Inner);

impl Drop for Incoming {
fn drop(&mut self) {
if let Some((stream, body)) = self.0.take() {
drop(stream);
IncomingBody::finish(body);
match mem::replace(&mut self.0, Inner::Closed) {
Inner::Stream { stream, body } => {
drop(stream);
IncomingBody::finish(body);
}
Inner::Trailers(_) | Inner::Closed => {}
}
}
}

stream::poll_fn({
let stream = body.stream().expect("response body should be readable");
let pair = Incoming(Some((stream, body)));
let mut incoming = Incoming(Inner::Stream { stream, body });

move |context| {
if let Some((stream, _)) = &pair.0 {
match stream.read(READ_SIZE) {
Ok(buffer) => {
if buffer.is_empty() {
WAKERS
.lock()
.unwrap()
.push((stream.subscribe(), context.waker().clone()));
Poll::Pending
} else {
Poll::Ready(Some(Ok(buffer)))
loop {
match &incoming.0 {
Inner::Stream { stream, .. } => match stream.read(READ_SIZE) {
Ok(buffer) => {
return if buffer.is_empty() {
WAKERS
.lock()
.unwrap()
.push((stream.subscribe(), context.waker().clone()));
Poll::Pending
} else {
Poll::Ready(Some(Ok(buffer)))
};
}
Err(StreamError::Closed) => {
let Inner::Stream { stream, body } =
mem::replace(&mut incoming.0, Inner::Closed)
else {
unreachable!();
};
drop(stream);
incoming.0 = Inner::Trailers(IncomingBody::finish(body));
}
Err(StreamError::LastOperationFailed(error)) => {
return Poll::Ready(Some(Err(anyhow!(
"{}",
error.to_debug_string()
))));
}
},

Inner::Trailers(trailers) => {
match trailers.get() {
Some(Ok(trailers)) => {
incoming.0 = Inner::Closed;
match trailers {
Ok(Some(_)) => {
// Currently, we just ignore any trailers. TODO: Add a test that
// expects trailers and verify they match the expected contents.
}
Ok(None) => {
// No trailers; nothing else to do.
}
Err(error) => {
// Error reading the trailers: pass it on to the application.
return Poll::Ready(Some(Err(anyhow!("{error:?}"))));
}
}
}
Some(Err(_)) => {
// Should only happen if we try to retrieve the trailers twice, i.e. a bug in
// this code.
unreachable!();
}
None => {
WAKERS
.lock()
.unwrap()
.push((trailers.subscribe(), context.waker().clone()));
return Poll::Pending;
}
}
}
Err(StreamError::Closed) => Poll::Ready(None),
Err(StreamError::LastOperationFailed(error)) => {
Poll::Ready(Some(Err(anyhow!("{}", error.to_debug_string()))))

Inner::Closed => {
return Poll::Ready(None);
}
}
} else {
Poll::Ready(None)
}
}
})
Expand Down
7 changes: 2 additions & 5 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,9 @@ impl Subscribe for HostFutureTrailers {
// were reached. It's up to us now to complete the body.
Ok(StreamEnd::Remaining(b)) => body.body = IncomingBodyState::Start(b),

// Technically this shouldn't be possible as the sender
// shouldn't get destroyed without receiving a message. Handle
// this just in case though.
// This means there were no trailers present.
Err(_) => {
debug_assert!(false, "should be unreachable");
*self = HostFutureTrailers::Done(Err(types::ErrorCode::ConnectionTerminated));
*self = HostFutureTrailers::Done(Ok(None));
}
}
}
Expand Down

0 comments on commit 75ed0b6

Please sign in to comment.