From 620e9c217bd95a142d48f491009c8fae7f8e0779 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Wed, 6 Apr 2022 10:29:01 -0500 Subject: [PATCH 1/9] add: initial draft of bytestream callback RFC --- .../rfcs/rfc0012_bytestream_callback_api.md | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 design/src/rfcs/rfc0012_bytestream_callback_api.md diff --git a/design/src/rfcs/rfc0012_bytestream_callback_api.md b/design/src/rfcs/rfc0012_bytestream_callback_api.md new file mode 100644 index 0000000000..fdd2394b76 --- /dev/null +++ b/design/src/rfcs/rfc0012_bytestream_callback_api.md @@ -0,0 +1,106 @@ +RFC: A callback API for `ByteStream` +==================================== + +> Status: RFC + +Adding a callback API to `ByteStream` will enable developers using the SDK to implement things like checksum validations and 'read progress' callbacks. + +## The Implementation + +*Note that comments starting with '//' are not necessarily going to be included in the actual implementation and are intended as clarifying comments for the purposes of this RFC.* + +```rust +// Each trait method defaults to doing nothing. It's up to implementors to +// implement one or both of the trait methods +/// Structs and enums implementing this trait can be inserted into a `ByteStream`, +/// and will then be called in reaction to various events during a `ByteStream`'s +/// lifecycle. +pub trait ByteStreamReadCallback +// I don't really like this bound but we need it because this will be inserted into an `Inner` +where + Self: Debug + Clone + PartialEq + Eq, +{ + /// This callback is called for each chunk **successfully** read. + /// If an error occurs while reading a chunk, this will not be called. + /// This function takes `&mut self` so that implementors may modify + /// an implementing struct/enum's internal state. + // In order to stop the compiler complaining about these empty default impls, + // we allow unused variables. + fn on_read_chunk(&mut self, #[allow(unused_variables)] chunk: &Bytes) {} + + /// This callback is called once all chunks have been successfully read. + /// It's passed a reference to the chunks in the form of an `AggregatedBytes`. + /// This function takes `&mut self` so that implementors may modify an + /// implementing struct/enum's internal state. + fn finally(&mut self, #[allow(unused_variables)] aggregated_bytes: &AggregatedBytes) {} +} + +// We add a new method to `ByteStream` for inserting callbacks +impl ByteStream { + // ...other impls omitted + + // Read callbacks can only be inserted, not removed or reordered. If users + // desire extra management functions, we can add them in a later update. + // Callbacks are actually stored and called from the `Inner` object + pub fn insert_read_callback(&mut self, callback: Box) { + self.inner.insert_read_callback(callback); + } +} + +// Callbacks actually get stored in the `Inner` struct because that's where +// the chunk-reading actually occurs. +#[pin_project] +#[derive(Debug, Clone, PartialEq, Eq)] +struct Inner { + #[pin] + body: B, + // This field is new. It's going to store callbacks that get called when we're + // reading the `SdkBody` chunk-by-chunk + callbacks: Vec> +} + +impl Inner { + // ...other impls omitted + + pub fn new(body: B) -> Self { + Self { body, callbacks: Vec::new() } + } + + pub fn insert_read_callback(&mut self, callback: Box) { + self.callbacks.push(callback); + } + + pub async fn collect(self) -> Result + where + B: http_body::Body, + { + let mut output = SegmentedBuf::new(); + let body = self.body; + crate::pin_mut!(body); + while let Some(buf) = body.data().await { + // If we successfully read some bytes, + // then we call each callback in turn, + // passing a ref to those bytes. + if let Ok(bytes) = buf.as_ref() { + self.callbacks.iter_mut().for_each(|callback| { + callback.on_read_chunk(bytes); + }) + } + output.push(buf?); + } + + let aggregated_bytes = AggregatedBytes(output); + + // We also call the callback at the end too. + self.callbacks.iter_mut().for_each(|callback| { + callback.finally(&aggregated_bytes) + }); + + Ok(aggregated_bytes) + } +} +``` + +The current version of `ByteStream` and `Inner` can be seen [here][ByteStream impls]. + +[ByteStream impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/byte_stream.rs#L205 From 8e56bc0104be2e1f18f3343b566a3dba84b83452 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Wed, 6 Apr 2022 10:34:08 -0500 Subject: [PATCH 2/9] add: builder-style method --- design/src/rfcs/rfc0012_bytestream_callback_api.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/design/src/rfcs/rfc0012_bytestream_callback_api.md b/design/src/rfcs/rfc0012_bytestream_callback_api.md index fdd2394b76..29694b3191 100644 --- a/design/src/rfcs/rfc0012_bytestream_callback_api.md +++ b/design/src/rfcs/rfc0012_bytestream_callback_api.md @@ -45,6 +45,12 @@ impl ByteStream { pub fn insert_read_callback(&mut self, callback: Box) { self.inner.insert_read_callback(callback); } + + // Alternatively, we could add a "builder-style" method for setting callbacks + pub fn with_callback(&mut self) -> &mut Self { + self.inner.insert_read_callback(callback); + self + } } // Callbacks actually get stored in the `Inner` struct because that's where From a3bd2c9f50acf90140bba9917abac714cde01838 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Wed, 6 Apr 2022 11:55:17 -0500 Subject: [PATCH 3/9] update: remove callback insert method in favor of builder method update: Arc callbacks instead of boxing them remove: most bounds from ByteStreamReadCallback trait add: PartialEq/Eq impl for Inner that disregards callback list --- .../rfcs/rfc0012_bytestream_callback_api.md | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/design/src/rfcs/rfc0012_bytestream_callback_api.md b/design/src/rfcs/rfc0012_bytestream_callback_api.md index 29694b3191..abd59e7965 100644 --- a/design/src/rfcs/rfc0012_bytestream_callback_api.md +++ b/design/src/rfcs/rfc0012_bytestream_callback_api.md @@ -18,7 +18,7 @@ Adding a callback API to `ByteStream` will enable developers using the SDK to im pub trait ByteStreamReadCallback // I don't really like this bound but we need it because this will be inserted into an `Inner` where - Self: Debug + Clone + PartialEq + Eq, + Self: Debug, { /// This callback is called for each chunk **successfully** read. /// If an error occurs while reading a chunk, this will not be called. @@ -33,22 +33,20 @@ where /// This function takes `&mut self` so that implementors may modify an /// implementing struct/enum's internal state. fn finally(&mut self, #[allow(unused_variables)] aggregated_bytes: &AggregatedBytes) {} + + /// return any trailers to be appended to this `ByteStream` if it's used to + /// create the body of an HTTP request. + // `HeaderMap`/`HeaderValue` are defined by `hyper` + fn trailers(&self) -> Option> {} } // We add a new method to `ByteStream` for inserting callbacks impl ByteStream { // ...other impls omitted - // Read callbacks can only be inserted, not removed or reordered. If users - // desire extra management functions, we can add them in a later update. - // Callbacks are actually stored and called from the `Inner` object - pub fn insert_read_callback(&mut self, callback: Box) { - self.inner.insert_read_callback(callback); - } - - // Alternatively, we could add a "builder-style" method for setting callbacks - pub fn with_callback(&mut self) -> &mut Self { - self.inner.insert_read_callback(callback); + // A "builder-style" method for setting callbacks + pub fn with_callback(&mut self, callback: Arc) -> &mut Self { + self.inner.with_callback(callback); self } } @@ -56,15 +54,22 @@ impl ByteStream { // Callbacks actually get stored in the `Inner` struct because that's where // the chunk-reading actually occurs. #[pin_project] -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] struct Inner { #[pin] body: B, // This field is new. It's going to store callbacks that get called when we're - // reading the `SdkBody` chunk-by-chunk - callbacks: Vec> + // reading the `SdkBody` chunk-by-chunk. This field doesn't get checked for + // equality purposes. + callbacks: Vec> } +impl PartialEq for Inner { + fn eq(&self, rhs: &Self) -> bool { self.body == rhs.body } +} + +impl Eq for Inner {} + impl Inner { // ...other impls omitted @@ -72,8 +77,10 @@ impl Inner { Self { body, callbacks: Vec::new() } } - pub fn insert_read_callback(&mut self, callback: Box) { + // `Inner` also gets a "builder-style" function for adding callbacks + pub fn with_callback(&mut self, callback: Arc) -> &mut self { self.callbacks.push(callback); + self } pub async fn collect(self) -> Result From 2cfb31df87894d9aa4cfebc46d6e3acb5084e195 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Mon, 11 Apr 2022 15:38:55 -0500 Subject: [PATCH 4/9] update: RFC with SdkBody impl update: RFC with checksum callback example --- design/src/rfcs/rfc0012_body_callback_apis.md | 259 ++++++++++++++++++ .../rfcs/rfc0012_bytestream_callback_api.md | 119 -------- 2 files changed, 259 insertions(+), 119 deletions(-) create mode 100644 design/src/rfcs/rfc0012_body_callback_apis.md delete mode 100644 design/src/rfcs/rfc0012_bytestream_callback_api.md diff --git a/design/src/rfcs/rfc0012_body_callback_apis.md b/design/src/rfcs/rfc0012_body_callback_apis.md new file mode 100644 index 0000000000..1ec08f5aff --- /dev/null +++ b/design/src/rfcs/rfc0012_body_callback_apis.md @@ -0,0 +1,259 @@ +RFC: Callback APIs for `ByteStream` and `SdkBody` +================================================= + +> Status: RFC + +Adding a callback APIs to `ByteStream` and `SdkBody` will enable developers using the SDK to implement things like checksum validations and 'read progress' callbacks. + +## The Implementation + +*Note that comments starting with '//' are not necessarily going to be included in the actual implementation and are intended as clarifying comments for the purposes of this RFC.* + +```rust +// in aws_smithy_http::read_callback... + +// Each trait method defaults to doing nothing. It's up to implementors to +// implement one or both of the trait methods +/// Structs and enums implementing this trait can be inserted into a `ByteStream`, +/// and will then be called in reaction to various events during a `ByteStream`'s +/// lifecycle. +pub trait ReadCallback: Send + Sync { + /// This callback is called for each chunk **successfully** read. + /// If an error occurs while reading a chunk, this will not be called. + /// This function takes `&mut self` so that implementors may modify + /// an implementing struct/enum's internal state. + // In order to stop the compiler complaining about these empty default impls, + // we allow unused variables. + fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) {} + + /// This callback is called once all chunks have been read. If the callback encountered 1 or more errors + /// while running `update`s, this is how those errors are raised. + fn finally(&self) -> Result<(), Box> { Ok(()) } + + /// return any trailers to be appended to this `ByteStream` if it's used to + /// create the body of an HTTP request. + // `HeaderMap`/`HeaderValue` are defined by `hyper` + fn trailers(&self) -> Option> { None } + + /// Create a new `ReadCallback` from an existing one. This is called when a `ReadCallback` need + /// to be re-initialized with default state. For example: when a request has a body that needs + /// to be rebuilt, all read callbacks on that body need to be run again but with a fresh internal state. + fn make_new(&self) -> Box; +} + +// We also impl `ReadCallback` for `Box` because it makes callback trait objects easier to work with. +``` + +The changes we need to make to `ByteStream`: + +*(The current version of `ByteStream` and `Inner` can be seen [here][ByteStream impls].)* + +```rust +// in `aws_smithy_http::byte_stream`... + +// We add a new method to `ByteStream` for inserting callbacks +impl ByteStream { + // ...other impls omitted + + // A "builder-style" method for setting callbacks + pub fn with_callback(&mut self, callback: Box) -> &mut Self { + self.inner.with_callback(callback); + self + } +} + +impl Inner { + // `Inner` wraps an `SdkBody` which has a "builder-style" function for adding callbacks. + pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { + self.body.with_read_callback(read_callback); + self + } +} +``` + +The changes we need to make to `SdkBody`: + +*(The current version of `SdkBody` can be seen [here][SdkBody impls].)* + +```rust +// In aws_smithy_http::body... + +#[pin_project] +pub struct SdkBody { + #[pin] + inner: Inner, + rebuild: Option Inner) + Send + Sync>>, + // We add a `Vec` to store the callbacks + #[pin] + read_callbacks: Vec>, +} + +impl SdkBody { + // We update the various fns that create `SdkBody`s to create an empty `Vec` to store callbacks. + // Those updates are very simple so I've omitted them from this code example. + + fn poll_inner( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + // This block is old. I've included for context. + let polling_result = match this.inner.project() { + InnerProj::Once(ref mut opt) => { + let data = opt.take(); + match data { + Some(bytes) if bytes.is_empty() => Poll::Ready(None), + Some(bytes) => Poll::Ready(Some(Ok(bytes))), + None => Poll::Ready(None), + } + } + InnerProj::Streaming(body) => body.poll_data(cx).map_err(|e| e.into()), + InnerProj::Dyn(box_body) => box_body.poll_data(cx), + InnerProj::Taken => { + Poll::Ready(Some(Err("A `Taken` body should never be polled".into()))) + } + }; + + // This block is new. + match &polling_result { + // When we get some bytes back from polling, pass those bytes to each callback in turn + Poll::Ready(Some(Ok(bytes))) => { + this.read_callbacks + .iter_mut() + .for_each(|callback| callback.update(bytes)); + } + // When we're done polling for bytes, run each callback's `finally()` method. If any calls to + // `finally()` return an error, propagate that error up. Otherwise, continue. + Poll::Ready(None) => { + for callback_result in this.read_callbacks.iter().map(ReadCallback::finally) { + if let Err(e) = callback_result { + return Poll::Ready(Some(Err(e))); + } + } + } + _ => (), + } + + // Now that we've inspected the polling result, all that's left to do is to return it. + polling_result + } + + // This function now has the added responsibility of cloning callback functions (but with fresh state) + // in the case that the `SdkBody` needs to be rebuilt. + pub fn try_clone(&self) -> Option { + self.rebuild.as_ref().map(|rebuild| { + let next = rebuild(); + let read_callbacks = self + .read_callbacks + .iter() + .map(ReadCallback::make_new) + .collect(); + + Self { + inner: next, + rebuild: self.rebuild.clone(), + read_callbacks, + } + }) + } + + pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { + self.read_callbacks.push(read_callback); + self + } +} + +impl http_body::Body for SdkBody { + // The other methods have been omitted because they haven't changed + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>, Self::Error>> { + let mut last_header_key_seen = None; + let header_map = self + .read_callbacks + .iter() + .filter_map(|callback| callback.trailers()) + .reduce(|mut left_header_map, mut right_header_map| { + right_header_map.into_iter().for_each(|(key, value)| { + // For each yielded item that has None provided for the `HeaderName`, + // then the associated header name is the same as that of the previously + // yielded item. The first yielded item will have `HeaderName` set. + // https://docs.rs/http/latest/http/header/struct.HeaderMap.html#method.into_iter-2 + match (last_header_key_seen, key) { + (_, Some(key)) => { + left_header_map.append(key, value); + last_header_key_seen = Some(key); + } + (Some(key), None) => { + left_header_map.append(key, value); + } + (None, None) => unreachable!(), + }; + }); + + left_header_map + }); + + Poll::Ready(Ok(header_map)) + } +} +``` + +## Implementing Checksums + +What follows is a simplified example of how this API could be used to introduce checksum validation for outgoing request payloads. In this example, the checksum calculation is fallible and no validation takes place. All it does it calculate +the checksum of some data and then returns the checksum of that data when `trailers` is called. This is fine because it's +being used to calculate the checksum of a streaming body in a request. + +```rust +#[derive(Default)] +struct Crc32cChecksumCallback { + state: Option, +} + +impl ReadCallback for Crc32cChecksumCallback { + fn update(&mut self, bytes: &[u8]) { + self.state = match self.state { + Some(crc) => { self.state = Some(crc32c_append(crc, bytes)) } + None => { Some(crc32c(&bytes)) } + }; + } + + fn trailers(&self) -> Option> { + let mut header_map = HeaderMap::new(); + // This checksum name is an Amazon standard and would be a `const` in the real implementation + let key = HeaderName::from_static("x-amz-checksum-crc32c"); + // If no data was provided to this callback and no CRC was ever calculated, we return zero as the checksum. + let crc = self.state.unwrap_or_default(); + // Convert the CRC to a string, base 64 encode it, and then convert it into a `HeaderValue`. + let value = HeaderValue::from_str(&base64::encode(crc.to_string())).expect("base64 will always produce valid header values"); + + header_map.insert(key, value); + + Some(header_map) + } + + fn make_new(&self) -> Box { + Box::new(Crc32cChecksumCallback::default()) + } +} +``` + +*NOTE: If `Crc32cChecksumCallback` needed to validate a response, then we could modify it to check its internal state against a target checksum value and calling `finally` would produce an error if the values didn't match.* + +In order to use this in a request, we'd modify codegen for that request's service. + +1. We'd check if the user had requested validation and also check if they'd pre-calculated a checksum. +2. If validation was requested but no pre-calculated checksum was given, we'd create a callback similar to the one above +3. Then, we'd create a new checksum callback and: + - (if streaming) we'd set the checksum callback on the request body object + - (if non-streaming) we'd immediately read the body and call `ReadCallback::update` manually. Once all data was read, we'd get the checksum by calling `trailers` and insert that data as a request header. + +## Other thoughts + +- What if we defined a `headers` method on `ReadCallback` too? We could just have it default to calling `trailers` internally by default (or vice versa.) This would make it less confusing when we manually call the checksum callback in order to set headers. + +[ByteStream impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/byte_stream.rs#L205 +[SdkBody impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/body.rs#L71 diff --git a/design/src/rfcs/rfc0012_bytestream_callback_api.md b/design/src/rfcs/rfc0012_bytestream_callback_api.md deleted file mode 100644 index abd59e7965..0000000000 --- a/design/src/rfcs/rfc0012_bytestream_callback_api.md +++ /dev/null @@ -1,119 +0,0 @@ -RFC: A callback API for `ByteStream` -==================================== - -> Status: RFC - -Adding a callback API to `ByteStream` will enable developers using the SDK to implement things like checksum validations and 'read progress' callbacks. - -## The Implementation - -*Note that comments starting with '//' are not necessarily going to be included in the actual implementation and are intended as clarifying comments for the purposes of this RFC.* - -```rust -// Each trait method defaults to doing nothing. It's up to implementors to -// implement one or both of the trait methods -/// Structs and enums implementing this trait can be inserted into a `ByteStream`, -/// and will then be called in reaction to various events during a `ByteStream`'s -/// lifecycle. -pub trait ByteStreamReadCallback -// I don't really like this bound but we need it because this will be inserted into an `Inner` -where - Self: Debug, -{ - /// This callback is called for each chunk **successfully** read. - /// If an error occurs while reading a chunk, this will not be called. - /// This function takes `&mut self` so that implementors may modify - /// an implementing struct/enum's internal state. - // In order to stop the compiler complaining about these empty default impls, - // we allow unused variables. - fn on_read_chunk(&mut self, #[allow(unused_variables)] chunk: &Bytes) {} - - /// This callback is called once all chunks have been successfully read. - /// It's passed a reference to the chunks in the form of an `AggregatedBytes`. - /// This function takes `&mut self` so that implementors may modify an - /// implementing struct/enum's internal state. - fn finally(&mut self, #[allow(unused_variables)] aggregated_bytes: &AggregatedBytes) {} - - /// return any trailers to be appended to this `ByteStream` if it's used to - /// create the body of an HTTP request. - // `HeaderMap`/`HeaderValue` are defined by `hyper` - fn trailers(&self) -> Option> {} -} - -// We add a new method to `ByteStream` for inserting callbacks -impl ByteStream { - // ...other impls omitted - - // A "builder-style" method for setting callbacks - pub fn with_callback(&mut self, callback: Arc) -> &mut Self { - self.inner.with_callback(callback); - self - } -} - -// Callbacks actually get stored in the `Inner` struct because that's where -// the chunk-reading actually occurs. -#[pin_project] -#[derive(Debug, Clone)] -struct Inner { - #[pin] - body: B, - // This field is new. It's going to store callbacks that get called when we're - // reading the `SdkBody` chunk-by-chunk. This field doesn't get checked for - // equality purposes. - callbacks: Vec> -} - -impl PartialEq for Inner { - fn eq(&self, rhs: &Self) -> bool { self.body == rhs.body } -} - -impl Eq for Inner {} - -impl Inner { - // ...other impls omitted - - pub fn new(body: B) -> Self { - Self { body, callbacks: Vec::new() } - } - - // `Inner` also gets a "builder-style" function for adding callbacks - pub fn with_callback(&mut self, callback: Arc) -> &mut self { - self.callbacks.push(callback); - self - } - - pub async fn collect(self) -> Result - where - B: http_body::Body, - { - let mut output = SegmentedBuf::new(); - let body = self.body; - crate::pin_mut!(body); - while let Some(buf) = body.data().await { - // If we successfully read some bytes, - // then we call each callback in turn, - // passing a ref to those bytes. - if let Ok(bytes) = buf.as_ref() { - self.callbacks.iter_mut().for_each(|callback| { - callback.on_read_chunk(bytes); - }) - } - output.push(buf?); - } - - let aggregated_bytes = AggregatedBytes(output); - - // We also call the callback at the end too. - self.callbacks.iter_mut().for_each(|callback| { - callback.finally(&aggregated_bytes) - }); - - Ok(aggregated_bytes) - } -} -``` - -The current version of `ByteStream` and `Inner` can be seen [here][ByteStream impls]. - -[ByteStream impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/byte_stream.rs#L205 From 0c3ab32e3f0f4ccaf14cfddeb599c5d60716e273 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Wed, 13 Apr 2022 09:47:43 -0500 Subject: [PATCH 5/9] update: respond to PR comments --- design/src/rfcs/rfc0012_body_callback_apis.md | 154 +++++++++++------- 1 file changed, 96 insertions(+), 58 deletions(-) diff --git a/design/src/rfcs/rfc0012_body_callback_apis.md b/design/src/rfcs/rfc0012_body_callback_apis.md index 1ec08f5aff..18c8847e1b 100644 --- a/design/src/rfcs/rfc0012_body_callback_apis.md +++ b/design/src/rfcs/rfc0012_body_callback_apis.md @@ -10,38 +10,35 @@ Adding a callback APIs to `ByteStream` and `SdkBody` will enable developers usin *Note that comments starting with '//' are not necessarily going to be included in the actual implementation and are intended as clarifying comments for the purposes of this RFC.* ```rust -// in aws_smithy_http::read_callback... +// in aws_smithy_http::callbacks... // Each trait method defaults to doing nothing. It's up to implementors to // implement one or both of the trait methods /// Structs and enums implementing this trait can be inserted into a `ByteStream`, /// and will then be called in reaction to various events during a `ByteStream`'s /// lifecycle. -pub trait ReadCallback: Send + Sync { - /// This callback is called for each chunk **successfully** read. - /// If an error occurs while reading a chunk, this will not be called. - /// This function takes `&mut self` so that implementors may modify - /// an implementing struct/enum's internal state. - // In order to stop the compiler complaining about these empty default impls, - // we allow unused variables. +pub trait BaseCallback: Send + Sync { + /// This callback is called for each chunk **successfully** read. If an error occurs while reading a chunk, + /// this will not be called. This function takes `&mut self` so that implementors may modify an implementing + /// struct/enum's internal state. + // In order to stop the compiler complaining about these empty default impls, we allow unused variables. fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) {} /// This callback is called once all chunks have been read. If the callback encountered 1 or more errors - /// while running `update`s, this is how those errors are raised. - fn finally(&self) -> Result<(), Box> { Ok(()) } + /// while running `update`s, this is how those errors are raised. Otherwise, this may optionally return + /// a [`HeaderMap`][HeaderMap] to be appended to an HTTP body as a trailer or inserted into a request's + /// headers. + fn finally(&self) -> Result>, Box> { Ok(()) } - /// return any trailers to be appended to this `ByteStream` if it's used to - /// create the body of an HTTP request. - // `HeaderMap`/`HeaderValue` are defined by `hyper` - fn trailers(&self) -> Option> { None } - - /// Create a new `ReadCallback` from an existing one. This is called when a `ReadCallback` need + /// Create a new `BaseCallback` from an existing one. This is called when a `BaseCallback` need /// to be re-initialized with default state. For example: when a request has a body that needs /// to be rebuilt, all read callbacks on that body need to be run again but with a fresh internal state. - fn make_new(&self) -> Box; + fn make_new(&self) -> Box; } -// We also impl `ReadCallback` for `Box` because it makes callback trait objects easier to work with. +// We also impl `BaseCallback` for `Box` because it makes callback trait objects easier to work with. + +// TODO add the ReadCallback and WriteCallback traits ``` The changes we need to make to `ByteStream`: @@ -56,8 +53,8 @@ impl ByteStream { // ...other impls omitted // A "builder-style" method for setting callbacks - pub fn with_callback(&mut self, callback: Box) -> &mut Self { - self.inner.with_callback(callback); + pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { + self.inner.with_callback(read_callback); self } } @@ -65,7 +62,7 @@ impl ByteStream { impl Inner { // `Inner` wraps an `SdkBody` which has a "builder-style" function for adding callbacks. pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { - self.body.with_read_callback(read_callback); + self.body.with_callback(read_callback); self } } @@ -85,7 +82,7 @@ pub struct SdkBody { rebuild: Option Inner) + Send + Sync>>, // We add a `Vec` to store the callbacks #[pin] - read_callbacks: Vec>, + callbacks: Vec>, } impl SdkBody { @@ -118,14 +115,14 @@ impl SdkBody { match &polling_result { // When we get some bytes back from polling, pass those bytes to each callback in turn Poll::Ready(Some(Ok(bytes))) => { - this.read_callbacks + this.callbacks .iter_mut() .for_each(|callback| callback.update(bytes)); } // When we're done polling for bytes, run each callback's `finally()` method. If any calls to // `finally()` return an error, propagate that error up. Otherwise, continue. Poll::Ready(None) => { - for callback_result in this.read_callbacks.iter().map(ReadCallback::finally) { + for callback_result in this.callbacks.iter().map(BaseCallback::finally) { if let Err(e) = callback_result { return Poll::Ready(Some(Err(e))); } @@ -143,26 +140,81 @@ impl SdkBody { pub fn try_clone(&self) -> Option { self.rebuild.as_ref().map(|rebuild| { let next = rebuild(); - let read_callbacks = self - .read_callbacks + let callbacks = self + .callbacks .iter() - .map(ReadCallback::make_new) + .map(BaseCallback::make_new) .collect(); Self { inner: next, rebuild: self.rebuild.clone(), - read_callbacks, + callbacks, } }) } - pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { - self.read_callbacks.push(read_callback); + pub fn with_callback(&mut self, callback: Box) -> &mut Self { + self.callbacks.push(callback); self } } +/// Given two [`HeaderMap`][HeaderMap]s, merge them together and return the merged `HeaderMap`. If the +/// two `HeaderMap`s share any keys, values from the right `HeaderMap` be appended to the left `HeaderMap`. +/// +/// # Example +/// +/// ```rust +/// let header_name = HeaderName::from_static("some_key"); +/// +/// let mut left_hand_side_headers = HeaderMap::new(); +/// left_hand_side_headers.insert( +/// header_name.clone(), +/// HeaderValue::from_str("lhs value").unwrap(), +/// ); +/// +/// let mut right_hand_side_headers = HeaderMap::new(); +/// right_hand_side_headers.insert( +/// header_name.clone(), +/// HeaderValue::from_str("rhs value").unwrap(), +/// ); +/// +/// let merged_header_map = +/// append_merge_header_maps(left_hand_side_headers, right_hand_side_headers); +/// let merged_values: Vec<_> = merged_header_map +/// .get_all(header_name.clone()) +/// .into_iter() +/// .collect(); +/// +/// // Will print 'some_key: ["lhs value", "rhs value"]' +/// println!("{}: {:?}", header_name.as_str(), merged_values); +/// ``` +fn append_merge_header_maps( + mut lhs: HeaderMap, + rhs: HeaderMap, +) -> HeaderMap { + let mut last_header_name_seen = None; + for (header_name, header_value) in rhs.into_iter() { + // For each yielded item that has None provided for the `HeaderName`, + // then the associated header name is the same as that of the previously + // yielded item. The first yielded item will have `HeaderName` set. + // https://docs.rs/http/latest/http/header/struct.HeaderMap.html#method.into_iter-2 + match (&mut last_header_name_seen, header_name) { + (_, Some(header_name)) => { + lhs.append(header_name.clone(), header_value); + last_header_name_seen = Some(header_name); + } + (Some(header_name), None) => { + lhs.append(header_name.clone(), header_value); + } + (None, None) => unreachable!(), + }; + } + + lhs +} + impl http_body::Body for SdkBody { // The other methods have been omitted because they haven't changed @@ -170,31 +222,18 @@ impl http_body::Body for SdkBody { self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>, Self::Error>> { - let mut last_header_key_seen = None; let header_map = self .read_callbacks .iter() - .filter_map(|callback| callback.trailers()) - .reduce(|mut left_header_map, mut right_header_map| { - right_header_map.into_iter().for_each(|(key, value)| { - // For each yielded item that has None provided for the `HeaderName`, - // then the associated header name is the same as that of the previously - // yielded item. The first yielded item will have `HeaderName` set. - // https://docs.rs/http/latest/http/header/struct.HeaderMap.html#method.into_iter-2 - match (last_header_key_seen, key) { - (_, Some(key)) => { - left_header_map.append(key, value); - last_header_key_seen = Some(key); - } - (Some(key), None) => { - left_header_map.append(key, value); - } - (None, None) => unreachable!(), - }; - }); - - left_header_map - }); + .filter_map(|callback| { + match callback.finally() { + Ok(optional_header_map) => optional_header_map, + // early return if a callback encountered an error + Err(e) => { return e }, + } + }) + // Merge any `HeaderMap`s from the last step together, one by one. + .reduce(append_merge_header_maps); Poll::Ready(Ok(header_map)) } @@ -221,7 +260,10 @@ impl ReadCallback for Crc32cChecksumCallback { }; } - fn trailers(&self) -> Option> { + fn finally(&self) -> + Result>, + Box> + { let mut header_map = HeaderMap::new(); // This checksum name is an Amazon standard and would be a `const` in the real implementation let key = HeaderName::from_static("x-amz-checksum-crc32c"); @@ -249,11 +291,7 @@ In order to use this in a request, we'd modify codegen for that request's servic 2. If validation was requested but no pre-calculated checksum was given, we'd create a callback similar to the one above 3. Then, we'd create a new checksum callback and: - (if streaming) we'd set the checksum callback on the request body object - - (if non-streaming) we'd immediately read the body and call `ReadCallback::update` manually. Once all data was read, we'd get the checksum by calling `trailers` and insert that data as a request header. - -## Other thoughts - -- What if we defined a `headers` method on `ReadCallback` too? We could just have it default to calling `trailers` internally by default (or vice versa.) This would make it less confusing when we manually call the checksum callback in order to set headers. + - (if non-streaming) we'd immediately read the body and call `ReadCallback::update` manually. Once all data was read, we'd get the checksum by calling `finally` and insert that data as a request header. [ByteStream impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/byte_stream.rs#L205 [SdkBody impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/body.rs#L71 From 2b4d04c1ca0935751fa8e33934d71ccfb58ab5c2 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Thu, 14 Apr 2022 15:29:55 -0500 Subject: [PATCH 6/9] update: callback impls --- design/src/rfcs/rfc0012_body_callback_apis.md | 130 +++++++++++++----- 1 file changed, 93 insertions(+), 37 deletions(-) diff --git a/design/src/rfcs/rfc0012_body_callback_apis.md b/design/src/rfcs/rfc0012_body_callback_apis.md index 18c8847e1b..6b9b71d888 100644 --- a/design/src/rfcs/rfc0012_body_callback_apis.md +++ b/design/src/rfcs/rfc0012_body_callback_apis.md @@ -12,33 +12,80 @@ Adding a callback APIs to `ByteStream` and `SdkBody` will enable developers usin ```rust // in aws_smithy_http::callbacks... -// Each trait method defaults to doing nothing. It's up to implementors to -// implement one or both of the trait methods -/// Structs and enums implementing this trait can be inserted into a `ByteStream`, -/// and will then be called in reaction to various events during a `ByteStream`'s -/// lifecycle. -pub trait BaseCallback: Send + Sync { - /// This callback is called for each chunk **successfully** read. If an error occurs while reading a chunk, - /// this will not be called. This function takes `&mut self` so that implementors may modify an implementing - /// struct/enum's internal state. - // In order to stop the compiler complaining about these empty default impls, we allow unused variables. - fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) {} - - /// This callback is called once all chunks have been read. If the callback encountered 1 or more errors - /// while running `update`s, this is how those errors are raised. Otherwise, this may optionally return - /// a [`HeaderMap`][HeaderMap] to be appended to an HTTP body as a trailer or inserted into a request's - /// headers. - fn finally(&self) -> Result>, Box> { Ok(()) } - - /// Create a new `BaseCallback` from an existing one. This is called when a `BaseCallback` need - /// to be re-initialized with default state. For example: when a request has a body that needs - /// to be rebuilt, all read callbacks on that body need to be run again but with a fresh internal state. - fn make_new(&self) -> Box; +// An internal-only type that `SdkBody` interacts with in order to call callbacks +pub(crate) enum Callback { + // A callback to be called when sending requests + Send(Box), + // A callback to be called when receiving responses + Receive(Box), } -// We also impl `BaseCallback` for `Box` because it makes callback trait objects easier to work with. +impl Callback { + /// This lifecycle function is called for each chunk **successfully** read. If an error occurs while reading a chunk, + /// this will not be called. This function takes `&mut self` so that implementors may modify an implementing + /// struct/enum's internal state. Implementors may return an error. + fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), Box> { + match self { + Callback::Send(send_callback) => send_callback.update(bytes), + Callback::Receive(receive_callback) => receive_callback.update(bytes), + } + } + + /// This callback is called once all chunks have been read. If the callback encountered 1 or more errors + /// while running `update`s, this is how those errors are raised. Otherwise, this may optionally return + /// a [`HeaderMap`][HeaderMap] to be appended to an HTTP body as a trailer or inserted into a request's + /// headers. + fn finally( + &self, + ) -> Result>, Box> { + match self { + Callback::Send(send_callback) => send_callback.headers(), + Callback::Receive(receive_callback) => receive_callback.trailers(), + } + } + + /// Create a new `Callback` from an existing one. This is called when a `Callback` needs to be + /// re-initialized with default state. For example: when a request has a body that need to be + /// rebuilt, all read callbacks on that body need to be run again but with a fresh internal state. + fn make_new(&self) -> Box { + match self { + Callback::Send(send_callback) => send_callback.make_new(), + Callback::Receive(receive_callback) => receive_callback.make_new(), + } + } +} + +/// A callback that, when inserted into a request body, will be called for corresponding lifecycle events. +// Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. +trait SendCallback: Send + Sync { + fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } + fn headers( + &self, + ) -> Result>, BoxError> { Ok(None) } + fn make_new() -> Box; +} + +impl From> for Callback { + fn from(send_callback: Box) -> Self { + Self::Send(send_callback) + } +} -// TODO add the ReadCallback and WriteCallback traits +/// A callback that, when inserted into a response body, will be called for corresponding lifecycle events. +// Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. +trait ReceiveCallback: Send + Sync { + fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } + fn trailers( + &self, + ) -> Result>, BoxError> { Ok(None) } + fn make_new() -> Box; +} + +impl From> for Callback { + fn from(receive_callback: Box) -> Self { + Self::Receive(receive_callback) + } +} ``` The changes we need to make to `ByteStream`: @@ -52,17 +99,23 @@ The changes we need to make to `ByteStream`: impl ByteStream { // ...other impls omitted - // A "builder-style" method for setting callbacks - pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { - self.inner.with_callback(read_callback); + // A "builder-style" method for setting callbacks that will be triggered if this `ByteStream` is being used as a request body + pub fn with_send_callback(&mut self, send_callback: Box) -> &mut Self { + self.inner.with_callback(send_callback.into()); self } + + // A "builder-style" method for setting callbacks that will be triggered if this `ByteStream` is being used as a response body + pub fn with_receive_callback(&mut self, receive_callback: Box) -> &mut Self { + self.inner.with_callback(receive_callback.into()); + self + } } impl Inner { // `Inner` wraps an `SdkBody` which has a "builder-style" function for adding callbacks. - pub fn with_read_callback(&mut self, read_callback: Box) -> &mut Self { - self.body.with_callback(read_callback); + pub fn with_callback(&mut self, callback: Callback) -> &mut Self { + self.body.with_callback(callback); self } } @@ -82,7 +135,7 @@ pub struct SdkBody { rebuild: Option Inner) + Send + Sync>>, // We add a `Vec` to store the callbacks #[pin] - callbacks: Vec>, + callbacks: Vec, } impl SdkBody { @@ -115,14 +168,15 @@ impl SdkBody { match &polling_result { // When we get some bytes back from polling, pass those bytes to each callback in turn Poll::Ready(Some(Ok(bytes))) => { - this.callbacks - .iter_mut() - .for_each(|callback| callback.update(bytes)); + for callback in this.callbacks.iter_mut() { + // Callbacks can run into errors when reading bytes. They'll be surfaced here + callback.update(bytes)?; + } } // When we're done polling for bytes, run each callback's `finally()` method. If any calls to // `finally()` return an error, propagate that error up. Otherwise, continue. Poll::Ready(None) => { - for callback_result in this.callbacks.iter().map(BaseCallback::finally) { + for callback_result in this.callbacks.iter().map(Callback::finally) { if let Err(e) = callback_result { return Poll::Ready(Some(Err(e))); } @@ -143,7 +197,7 @@ impl SdkBody { let callbacks = self .callbacks .iter() - .map(BaseCallback::make_new) + .map(Callback::make_new) .collect(); Self { @@ -154,7 +208,7 @@ impl SdkBody { }) } - pub fn with_callback(&mut self, callback: Box) -> &mut Self { + pub fn with_callback(&mut self, callback: Callback) -> &mut Self { self.callbacks.push(callback); self } @@ -253,11 +307,13 @@ struct Crc32cChecksumCallback { } impl ReadCallback for Crc32cChecksumCallback { - fn update(&mut self, bytes: &[u8]) { + fn update(&mut self, bytes: &[u8]) -> Result<(), BoxError> { self.state = match self.state { Some(crc) => { self.state = Some(crc32c_append(crc, bytes)) } None => { Some(crc32c(&bytes)) } }; + + Ok(()) } fn finally(&self) -> From 6082b2180b6f71caaef2c69154b2b95bf155d9dd Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Fri, 15 Apr 2022 11:35:01 -0500 Subject: [PATCH 7/9] remove: Sync bounds from callback traits --- design/src/rfcs/rfc0012_body_callback_apis.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/design/src/rfcs/rfc0012_body_callback_apis.md b/design/src/rfcs/rfc0012_body_callback_apis.md index 6b9b71d888..26c07efca3 100644 --- a/design/src/rfcs/rfc0012_body_callback_apis.md +++ b/design/src/rfcs/rfc0012_body_callback_apis.md @@ -57,7 +57,7 @@ impl Callback { /// A callback that, when inserted into a request body, will be called for corresponding lifecycle events. // Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. -trait SendCallback: Send + Sync { +trait SendCallback: Send { fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } fn headers( &self, @@ -73,7 +73,7 @@ impl From> for Callback { /// A callback that, when inserted into a response body, will be called for corresponding lifecycle events. // Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. -trait ReceiveCallback: Send + Sync { +trait ReceiveCallback: Send { fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } fn trailers( &self, From 96faa31f6d59ccd248cd692eed96c29397276939 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Fri, 15 Apr 2022 12:36:14 -0500 Subject: [PATCH 8/9] update: consolidate and simplify callback API --- design/src/rfcs/rfc0012_body_callback_apis.md | 106 +++++------------- 1 file changed, 25 insertions(+), 81 deletions(-) diff --git a/design/src/rfcs/rfc0012_body_callback_apis.md b/design/src/rfcs/rfc0012_body_callback_apis.md index 26c07efca3..defca812a6 100644 --- a/design/src/rfcs/rfc0012_body_callback_apis.md +++ b/design/src/rfcs/rfc0012_body_callback_apis.md @@ -3,7 +3,7 @@ RFC: Callback APIs for `ByteStream` and `SdkBody` > Status: RFC -Adding a callback APIs to `ByteStream` and `SdkBody` will enable developers using the SDK to implement things like checksum validations and 'read progress' callbacks. +Adding a callback API to `ByteStream` and `SdkBody` will enable developers using the SDK to implement things like checksum validations and 'read progress' callbacks. ## The Implementation @@ -12,79 +12,29 @@ Adding a callback APIs to `ByteStream` and `SdkBody` will enable developers usin ```rust // in aws_smithy_http::callbacks... -// An internal-only type that `SdkBody` interacts with in order to call callbacks -pub(crate) enum Callback { - // A callback to be called when sending requests - Send(Box), - // A callback to be called when receiving responses - Receive(Box), -} - -impl Callback { +/// A callback that, when inserted into a request body, will be called for corresponding lifecycle events. +// Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. +trait BodyCallback: Send { /// This lifecycle function is called for each chunk **successfully** read. If an error occurs while reading a chunk, /// this will not be called. This function takes `&mut self` so that implementors may modify an implementing /// struct/enum's internal state. Implementors may return an error. - fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), Box> { - match self { - Callback::Send(send_callback) => send_callback.update(bytes), - Callback::Receive(receive_callback) => receive_callback.update(bytes), - } - } + fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } /// This callback is called once all chunks have been read. If the callback encountered 1 or more errors - /// while running `update`s, this is how those errors are raised. Otherwise, this may optionally return - /// a [`HeaderMap`][HeaderMap] to be appended to an HTTP body as a trailer or inserted into a request's - /// headers. - fn finally( - &self, - ) -> Result>, Box> { - match self { - Callback::Send(send_callback) => send_callback.headers(), - Callback::Receive(receive_callback) => receive_callback.trailers(), - } - } + /// while running `update`s, this is how those errors are raised. Implementors may return a [`HeaderMap`][HeaderMap] + /// that will be appended to the HTTP body as a trailer. This is only useful to do for streaming requests. + fn trailers(&self) -> Result>, BoxError> { Ok(None) } /// Create a new `Callback` from an existing one. This is called when a `Callback` needs to be /// re-initialized with default state. For example: when a request has a body that need to be /// rebuilt, all read callbacks on that body need to be run again but with a fresh internal state. - fn make_new(&self) -> Box { - match self { - Callback::Send(send_callback) => send_callback.make_new(), - Callback::Receive(receive_callback) => receive_callback.make_new(), - } - } + fn make_new(&self) -> Box; } -/// A callback that, when inserted into a request body, will be called for corresponding lifecycle events. -// Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. -trait SendCallback: Send { - fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } - fn headers( - &self, - ) -> Result>, BoxError> { Ok(None) } - fn make_new() -> Box; -} - -impl From> for Callback { - fn from(send_callback: Box) -> Self { - Self::Send(send_callback) - } -} - -/// A callback that, when inserted into a response body, will be called for corresponding lifecycle events. -// Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. -trait ReceiveCallback: Send { - fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } - fn trailers( - &self, - ) -> Result>, BoxError> { Ok(None) } - fn make_new() -> Box; -} - -impl From> for Callback { - fn from(receive_callback: Box) -> Self { - Self::Receive(receive_callback) - } +impl BodyCallback for Box { + fn update(&mut self, bytes: &[u8]) -> Result<(), BoxError> { BodyCallback::update(self, bytes) } + fn trailers(&self) -> Result>, BoxError> { BodyCallback::trailers(self) } + fn make_new(&self) -> Box { BodyCallback::make_new(self) } } ``` @@ -99,23 +49,17 @@ The changes we need to make to `ByteStream`: impl ByteStream { // ...other impls omitted - // A "builder-style" method for setting callbacks that will be triggered if this `ByteStream` is being used as a request body - pub fn with_send_callback(&mut self, send_callback: Box) -> &mut Self { - self.inner.with_callback(send_callback.into()); + // A "builder-style" method for setting callbacks + pub fn with_body_callback(&mut self, body_callback: Box) -> &mut Self { + self.inner.with_body_callback(body_callback); self } - - // A "builder-style" method for setting callbacks that will be triggered if this `ByteStream` is being used as a response body - pub fn with_receive_callback(&mut self, receive_callback: Box) -> &mut Self { - self.inner.with_callback(receive_callback.into()); - self - } } impl Inner { // `Inner` wraps an `SdkBody` which has a "builder-style" function for adding callbacks. - pub fn with_callback(&mut self, callback: Callback) -> &mut Self { - self.body.with_callback(callback); + pub fn with_body_callback(&mut self, body_callback: Box) -> &mut Self { + self.body.with_body_callback(body_callback); self } } @@ -135,7 +79,7 @@ pub struct SdkBody { rebuild: Option Inner) + Send + Sync>>, // We add a `Vec` to store the callbacks #[pin] - callbacks: Vec, + callbacks: Vec>, } impl SdkBody { @@ -176,7 +120,7 @@ impl SdkBody { // When we're done polling for bytes, run each callback's `finally()` method. If any calls to // `finally()` return an error, propagate that error up. Otherwise, continue. Poll::Ready(None) => { - for callback_result in this.callbacks.iter().map(Callback::finally) { + for callback_result in this.callbacks.iter().map(Callback::trailers) { if let Err(e) = callback_result { return Poll::Ready(Some(Err(e))); } @@ -208,7 +152,7 @@ impl SdkBody { }) } - pub fn with_callback(&mut self, callback: Callback) -> &mut Self { + pub fn with_callback(&mut self, callback: BodyCallback) -> &mut Self { self.callbacks.push(callback); self } @@ -280,7 +224,7 @@ impl http_body::Body for SdkBody { .read_callbacks .iter() .filter_map(|callback| { - match callback.finally() { + match callback.trailers() { Ok(optional_header_map) => optional_header_map, // early return if a callback encountered an error Err(e) => { return e }, @@ -316,7 +260,7 @@ impl ReadCallback for Crc32cChecksumCallback { Ok(()) } - fn finally(&self) -> + fn trailers(&self) -> Result>, Box> { @@ -339,7 +283,7 @@ impl ReadCallback for Crc32cChecksumCallback { } ``` -*NOTE: If `Crc32cChecksumCallback` needed to validate a response, then we could modify it to check its internal state against a target checksum value and calling `finally` would produce an error if the values didn't match.* +*NOTE: If `Crc32cChecksumCallback` needed to validate a response, then we could modify it to check its internal state against a target checksum value and calling `trailers` would produce an error if the values didn't match.* In order to use this in a request, we'd modify codegen for that request's service. @@ -347,7 +291,7 @@ In order to use this in a request, we'd modify codegen for that request's servic 2. If validation was requested but no pre-calculated checksum was given, we'd create a callback similar to the one above 3. Then, we'd create a new checksum callback and: - (if streaming) we'd set the checksum callback on the request body object - - (if non-streaming) we'd immediately read the body and call `ReadCallback::update` manually. Once all data was read, we'd get the checksum by calling `finally` and insert that data as a request header. + - (if non-streaming) we'd immediately read the body and call `BodyCallback::update` manually. Once all data was read, we'd get the checksum by calling `trailers` and insert that data as a request header. [ByteStream impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/byte_stream.rs#L205 [SdkBody impls]: https://github.com/awslabs/smithy-rs/blob/f76bc159bf16510a0873f5fba691cb05816f4192/rust-runtime/aws-smithy-http/src/body.rs#L71 From adb6042ae01a575ce3bb1d3013da9f74438f0bd5 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Fri, 15 Apr 2022 12:51:44 -0500 Subject: [PATCH 9/9] fix: leftover terminology --- design/src/rfcs/rfc0012_body_callback_apis.md | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/design/src/rfcs/rfc0012_body_callback_apis.md b/design/src/rfcs/rfc0012_body_callback_apis.md index defca812a6..b6b8e93e60 100644 --- a/design/src/rfcs/rfc0012_body_callback_apis.md +++ b/design/src/rfcs/rfc0012_body_callback_apis.md @@ -13,22 +13,21 @@ Adding a callback API to `ByteStream` and `SdkBody` will enable developers using // in aws_smithy_http::callbacks... /// A callback that, when inserted into a request body, will be called for corresponding lifecycle events. -// Docs for these methods will mostly be the same as the docs on `Callback` so I've omitted them. trait BodyCallback: Send { /// This lifecycle function is called for each chunk **successfully** read. If an error occurs while reading a chunk, - /// this will not be called. This function takes `&mut self` so that implementors may modify an implementing + /// this method will not be called. This method takes `&mut self` so that implementors may modify an implementing /// struct/enum's internal state. Implementors may return an error. fn update(&mut self, #[allow(unused_variables)] bytes: &[u8]) -> Result<(), BoxError> { Ok(()) } - /// This callback is called once all chunks have been read. If the callback encountered 1 or more errors + /// This callback is called once all chunks have been read. If the callback encountered one or more errors /// while running `update`s, this is how those errors are raised. Implementors may return a [`HeaderMap`][HeaderMap] /// that will be appended to the HTTP body as a trailer. This is only useful to do for streaming requests. fn trailers(&self) -> Result>, BoxError> { Ok(None) } - /// Create a new `Callback` from an existing one. This is called when a `Callback` needs to be - /// re-initialized with default state. For example: when a request has a body that need to be - /// rebuilt, all read callbacks on that body need to be run again but with a fresh internal state. - fn make_new(&self) -> Box; + /// Create a new `BodyCallback` from an existing one. This is called when a `BodyCallback` needs to be + /// re-initialized with default state. For example: when a request has a body that needs to be + /// rebuilt, all callbacks for that body need to be run again but with a fresh internal state. + fn make_new(&self) -> Box; } impl BodyCallback for Box { @@ -117,10 +116,10 @@ impl SdkBody { callback.update(bytes)?; } } - // When we're done polling for bytes, run each callback's `finally()` method. If any calls to - // `finally()` return an error, propagate that error up. Otherwise, continue. + // When we're done polling for bytes, run each callback's `trailers()` method. If any calls to + // `trailers()` return an error, propagate that error up. Otherwise, continue. Poll::Ready(None) => { - for callback_result in this.callbacks.iter().map(Callback::trailers) { + for callback_result in this.callbacks.iter().map(BodyCallback::trailers) { if let Err(e) = callback_result { return Poll::Ready(Some(Err(e))); } @@ -221,7 +220,7 @@ impl http_body::Body for SdkBody { _cx: &mut Context<'_>, ) -> Poll>, Self::Error>> { let header_map = self - .read_callbacks + .callbacks .iter() .filter_map(|callback| { match callback.trailers() { @@ -242,7 +241,7 @@ impl http_body::Body for SdkBody { What follows is a simplified example of how this API could be used to introduce checksum validation for outgoing request payloads. In this example, the checksum calculation is fallible and no validation takes place. All it does it calculate the checksum of some data and then returns the checksum of that data when `trailers` is called. This is fine because it's -being used to calculate the checksum of a streaming body in a request. +being used to calculate the checksum of a streaming body for a request. ```rust #[derive(Default)]