Skip to content

Commit

Permalink
Implement Event Stream Sender/Receiver for use in generated types (#639)
Browse files Browse the repository at this point in the history
* Implement Event Stream Sender/Receiver for use in generated types

* Update CHANGELOG

* Fix AWS runtime build

* Fix warning

* Make `Sender` create the body channel and add signing

* Replace `Sender` with `MessageStreamAdapter` to take a `Stream` input

* Make signer mutable and change Arc to Box
  • Loading branch information
jdisanti authored Aug 10, 2021
1 parent 21df723 commit dced8c2
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ vNext (Month Day Year)
- (When complete) Add profile file provider for region (#594, #xyz)
- Add AssumeRoleProvider parser implementation. (#632)
- The closure passed to `async_provide_credentials_fn` can now borrow values (#637)
- Add `Sender`/`Receiver` implementations for Event Stream (#639)

v0.19 (August 3rd, 2021)
------------------------
Expand Down
19 changes: 10 additions & 9 deletions aws/sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,25 @@ val smithyVersion: String by project

val sdkOutputDir = buildDir.resolve("aws-sdk")
val runtimeModules = listOf(
"protocol-test-helpers",
"smithy-async",
"smithy-types",
"smithy-json",
"smithy-query",
"smithy-xml",
"smithy-client",
"smithy-eventstream",
"smithy-http",
"smithy-http-tower",
"smithy-client",
"protocol-test-helpers"
"smithy-json",
"smithy-query",
"smithy-types",
"smithy-xml"
)
val awsModules = listOf(
"aws-auth",
"aws-auth-providers",
"aws-endpoint",
"aws-types",
"aws-http",
"aws-hyper",
"aws-sig-auth",
"aws-http",
"aws-auth-providers"
"aws-types"
)

buildscript {
Expand Down
24 changes: 24 additions & 0 deletions rust-runtime/smithy-eventstream/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::error::Error;
use crate::str_bytes::StrBytes;
use bytes::{Buf, BufMut, Bytes};
use std::convert::{TryFrom, TryInto};
use std::error::Error as StdError;
use std::mem::size_of;

const PRELUDE_LENGTH_BYTES: u32 = 3 * size_of::<u32>() as u32;
Expand All @@ -19,6 +20,29 @@ const MESSAGE_CRC_LENGTH_BYTES: u32 = size_of::<u32>() as u32;
const MAX_HEADER_NAME_LEN: usize = 255;
const MIN_HEADER_LEN: usize = 2;

pub type SignMessageError = Box<dyn StdError + Send + Sync + 'static>;

/// Signs an Event Stream message.
pub trait SignMessage {
fn sign(&mut self, message: Message) -> Result<Message, SignMessageError>;
}

/// Converts a Smithy modeled Event Stream type into a [`Message`](Message).
pub trait MarshallMessage {
/// Smithy modeled input type to convert from.
type Input;

fn marshall(&self, input: Self::Input) -> Result<Message, Error>;
}

/// Converts an Event Stream [`Message`](Message) into a Smithy modeled type.
pub trait UnmarshallMessage {
/// Smithy modeled type to convert into.
type Output;

fn unmarshall(&self, message: Message) -> Result<Self::Output, Error>;
}

mod value {
use crate::error::Error;
use crate::frame::checked;
Expand Down
9 changes: 7 additions & 2 deletions rust-runtime/smithy-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ license = "Apache-2.0"

[features]
bytestream-util = ["tokio/fs", "tokio-util/io"]
default = ["bytestream-util"]
event-stream = ["smithy-eventstream"]
default = ["bytestream-util", "event-stream"]

[dependencies]
smithy-types = { path = "../smithy-types" }
smithy-eventstream = { path = "../smithy-eventstream", optional = true }
bytes = "1"
bytes-utils = "0.1"
http-body = "0.4.0"
http = "0.2.3"
thiserror = "1"
Expand All @@ -23,12 +26,14 @@ tracing = "0.1"
hyper = "0.14.5"

# ByteStream internals
bytes-utils = "0.1.1"
futures-core = "0.3.14"
tokio = { version = "1.6", optional = true }
tokio-util = { version = "0.6", optional = true}

[dev-dependencies]
async-stream = "0.3"
futures-util = "0.3"
hyper = { version = "0.14.5", features = ["stream"] }
proptest = "1"
tokio = {version = "1.6", features = ["macros", "rt", "fs", "io-util"]}
tokio-stream = "0.1.5"
Expand Down
Loading

0 comments on commit dced8c2

Please sign in to comment.