From 24c30b53fa82c289ebdfc5fe18028ca23003d082 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Sun, 14 Aug 2022 06:41:26 -0700 Subject: [PATCH 01/30] impl conversion btwn AmqpMessage & AmqpCloudEvent Signed-off-by: minghuaw --- Cargo.toml | 2 ++ src/binding/fe2o3_amqp/mod.rs | 54 +++++++++++++++++++++++++++++++++++ src/binding/mod.rs | 2 ++ 3 files changed, 58 insertions(+) create mode 100644 src/binding/fe2o3_amqp/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 5a7b5051..63a7e087 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"] axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] nats = ["nats-lib"] +fe2o3-amqp = ["fe2o3-amqp-lib"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -52,6 +53,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } +fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs new file mode 100644 index 00000000..7b900e9b --- /dev/null +++ b/src/binding/fe2o3_amqp/mod.rs @@ -0,0 +1,54 @@ +//! Implements AMQP 1.0 binding for CloudEvents + +use std::convert::TryFrom; + +use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Message, Body, Data as AmqpData, Properties}; +use fe2o3_amqp_lib::types::primitives::{Value, Binary}; + +use crate::Event; +use crate::message::Error; + +/// Type alias for an AMQP 1.0 message +/// +/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// convenience, this type alias chose `Value` as the value of the generic parameter +pub type AmqpMessage = Message; + +pub struct AmqpCloudEvent { + properties: Properties, + application_properties: ApplicationProperties, + data: Binary, +} + +impl From for AmqpMessage { + fn from(event: AmqpCloudEvent) -> Self { + Message::builder() + .properties(event.properties) + .application_properties(event.application_properties) + .data(event.data) + .build() + } +} + +impl TryFrom for AmqpCloudEvent { + type Error = Error; + + fn try_from(value: AmqpMessage) -> Result { + let data = match value.body { + Body::Data(AmqpData(data)) => data, + _ => return Err(Error::WrongEncoding { }) + }; + let properties = value.properties + .ok_or(Error::WrongEncoding { })?; + let application_properties = value.application_properties + .ok_or(Error::WrongEncoding { })?; + Ok(Self { + properties, + application_properties, + data, + }) + } +} + + diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 282008e8..7e9cad97 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -23,6 +23,8 @@ pub mod rdkafka; pub mod reqwest; #[cfg(feature = "warp")] pub mod warp; +#[cfg(feature = "fe2o3-amqp")] +pub mod fe2o3_amqp; #[cfg(feature = "rdkafka")] pub(crate) mod kafka { From 8b7e2a2ef580f06717d907e7569864ce75b786e1 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Sun, 14 Aug 2022 09:14:38 -0700 Subject: [PATCH 02/30] implemented From for AmqpCloudEvent Signed-off-by: minghuaw --- Cargo.toml | 3 +- src/binding/fe2o3_amqp/mod.rs | 208 ++++++++++++++++++++++++++++++---- 2 files changed, 190 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 63a7e087..921985ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"] axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] nats = ["nats-lib"] -fe2o3-amqp = ["fe2o3-amqp-lib"] +fe2o3-amqp = ["fe2o3-amqp-lib", "fe2o3-amqp-types"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -54,6 +54,7 @@ http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" } +fe2o3-amqp-types = { version = "0.3.3", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 7b900e9b..f21ffd6f 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -2,32 +2,41 @@ use std::convert::TryFrom; -use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Message, Body, Data as AmqpData, Properties}; -use fe2o3_amqp_lib::types::primitives::{Value, Binary}; +use fe2o3_amqp_lib::types::messaging::{ + ApplicationProperties, Body, Data as AmqpData, Message, Properties, +}; +use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::Event; -use crate::message::Error; +use crate::event::{AttributeValue, Attributes}; +use crate::message::{Error, MessageAttributeValue}; +use crate::{Event}; /// Type alias for an AMQP 1.0 message -/// +/// /// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of -/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For /// convenience, this type alias chose `Value` as the value of the generic parameter pub type AmqpMessage = Message; +pub type AmqpBody = Body; + pub struct AmqpCloudEvent { properties: Properties, application_properties: ApplicationProperties, - data: Binary, + body: AmqpBody, } impl From for AmqpMessage { fn from(event: AmqpCloudEvent) -> Self { - Message::builder() - .properties(event.properties) - .application_properties(event.application_properties) - .data(event.data) - .build() + Message { + header: None, + delivery_annotations: None, + message_annotations: None, + properties: Some(event.properties), + application_properties: Some(event.application_properties), + body: event.body, + footer: None, + } } } @@ -35,20 +44,179 @@ impl TryFrom for AmqpCloudEvent { type Error = Error; fn try_from(value: AmqpMessage) -> Result { - let data = match value.body { - Body::Data(AmqpData(data)) => data, - _ => return Err(Error::WrongEncoding { }) + let body = match value.body { + Body::Data(data) => Body::Data(data), + _ => return Err(Error::WrongEncoding {}), }; - let properties = value.properties - .ok_or(Error::WrongEncoding { })?; - let application_properties = value.application_properties - .ok_or(Error::WrongEncoding { })?; + let properties = value.properties.ok_or(Error::WrongEncoding {})?; + let application_properties = value + .application_properties + .ok_or(Error::WrongEncoding {})?; Ok(Self { properties, application_properties, - data, + body, }) } } +impl<'a> From> for SimpleValue { + fn from(value: AttributeValue) -> Self { + match value { + AttributeValue::SpecVersion(spec_ver) => { + SimpleValue::String(String::from(spec_ver.as_str())) + } + AttributeValue::String(s) => SimpleValue::String(String::from(s)), + AttributeValue::URI(uri) => SimpleValue::String(String::from(uri.as_str())), + AttributeValue::URIRef(uri) => SimpleValue::String(uri.clone()), + AttributeValue::Boolean(val) => SimpleValue::Bool(*val), + AttributeValue::Integer(val) => SimpleValue::Long(*val), + AttributeValue::Time(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + SimpleValue::Timestamp(timestamp) + } + } + } +} + +impl<'a> From> for Value { + fn from(value: AttributeValue) -> Self { + match value { + AttributeValue::SpecVersion(spec_ver) => Value::String(String::from(spec_ver.as_str())), + AttributeValue::String(s) => Value::String(String::from(s)), + AttributeValue::URI(uri) => Value::String(String::from(uri.as_str())), + AttributeValue::URIRef(uri) => Value::String(uri.clone()), + AttributeValue::Boolean(val) => Value::Bool(*val), + AttributeValue::Integer(val) => Value::Long(*val), + AttributeValue::Time(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + Value::Timestamp(timestamp) + } + } + } +} + +impl From for SimpleValue { + fn from(value: MessageAttributeValue) -> Self { + match value { + MessageAttributeValue::String(s) => SimpleValue::String(String::from(s)), + MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())), + MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri.clone()), + MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val), + MessageAttributeValue::Integer(val) => SimpleValue::Long(val), + MessageAttributeValue::DateTime(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + SimpleValue::Timestamp(timestamp) + } + MessageAttributeValue::Binary(val) => SimpleValue::Binary(Binary::from(val)), + } + } +} + +impl From for Value { + fn from(value: MessageAttributeValue) -> Self { + match value { + MessageAttributeValue::String(s) => Value::String(String::from(s)), + MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())), + MessageAttributeValue::UriRef(uri) => Value::String(uri.clone()), + MessageAttributeValue::Boolean(val) => Value::Bool(val), + MessageAttributeValue::Integer(val) => Value::Long(val), + MessageAttributeValue::DateTime(datetime) => { + let millis = datetime.timestamp_millis(); + let timestamp = Timestamp::from_milliseconds(millis); + Value::Timestamp(timestamp) + } + MessageAttributeValue::Binary(val) => Value::Binary(Binary::from(val)), + } + } +} + +/// The `BinarySerializer`/`StructuredSerializer` traits are not implemented because +/// "datacontenttype" needs special treatment in AMQP. However, `StructureSerializer` doesn't +/// provide access to "datacontenttype" +impl TryFrom for AmqpCloudEvent { + type Error = Error; + + fn try_from(mut event: Event) -> Result { + let mut properties = Properties::default(); + properties.content_type = match &mut event.attributes { + Attributes::V03(attributes) => attributes.datacontenttype.take(), + Attributes::V10(attributes) => attributes.datacontenttype.take(), + }.map(Symbol::from); + + let mut application_properties = ApplicationProperties::default(); + for (key, value) in event.attributes.iter() { + if key == "datacontenttype" { + continue; + } else { + let key = format!("cloudEvents:{}", key); + application_properties.insert(key, SimpleValue::from(value)); + } + } + + let body = match event.data { + Some(data) => match data { + crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))), + crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))), + crate::Data::Json(val) => { + let bytes = serde_json::to_vec(&val)?; + Body::Data(AmqpData(Binary::from(bytes))) + }, + }, + None => AmqpBody::Nothing, + }; + + Ok(Self { + properties, + application_properties, + body, + }) + } +} + +// impl BinarySerializer for AmqpCloudEvent { +// fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { +// let key = String::from("cloudEvents:specversion"); +// let value = String::from(spec_version.as_str()); +// self.application_properties.insert(key, SimpleValue::from(value)); +// Ok(self) +// } + +// fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { +// if name == "datacontenttype" { +// self.properties.content_type = match value { +// MessageAttributeValue::String(s) => Some(Symbol::from(s)), +// _ => return Err(Error::WrongEncoding { }) +// } +// } else { +// let key = format!("cloudEvents:{}", name); +// let value = SimpleValue::from(value); +// self.application_properties.insert(key, value); +// } + +// Ok(self) +// } + +// fn set_extension(self, name: &str, value: MessageAttributeValue) -> crate::message::Result { +// todo!() +// } + +// fn end_with_data(mut self, bytes: Vec) -> crate::message::Result { +// let data = Binary::from(bytes); +// self.body = Body::Data(AmqpData(data)); +// Ok(self) +// } + +// fn end(self) -> crate::message::Result { +// Ok(self) +// } +// } +// impl StructuredSerializer for AmqpCloudEvent { +// fn set_structured_event(self, bytes: Vec) -> crate::message::Result { +// todo!() +// } +// } From 4017ffc9009ec979645e028a6e3eaa36eed7978d Mon Sep 17 00:00:00 2001 From: minghuaw Date: Sun, 14 Aug 2022 19:59:45 -0700 Subject: [PATCH 03/30] added udf to handle extensions Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/extensions.rs | 77 ++++++++++++++++++++++ src/binding/fe2o3_amqp/mod.rs | 98 ++++++++++++++++++---------- 2 files changed, 140 insertions(+), 35 deletions(-) create mode 100644 src/binding/fe2o3_amqp/extensions.rs diff --git a/src/binding/fe2o3_amqp/extensions.rs b/src/binding/fe2o3_amqp/extensions.rs new file mode 100644 index 00000000..64280ad5 --- /dev/null +++ b/src/binding/fe2o3_amqp/extensions.rs @@ -0,0 +1,77 @@ +use std::collections::HashMap; + +use fe2o3_amqp_types::messaging::{ApplicationProperties, Properties}; + +use crate::{event::ExtensionValue, message::Error, Event}; + +use super::{ + from_event_attributes, from_event_data, AmqpBody, AmqpCloudEvent, AmqpMessage, Extensions, +}; + +pub struct ExtensionsHandler +where + F: FnOnce(Extensions) -> AmqpMessage, +{ + handler: F, +} + +impl ExtensionsHandler +where + F: FnOnce(Extensions) -> AmqpMessage, +{ + pub fn new(handler: F) -> Self { + Self { handler } + } + + pub fn from_event(self, event: Event) -> Result, Error> { + let (content_type, application_properties) = from_event_attributes(event.attributes); + let body = from_event_data(event.data)?; + let inner = AmqpCloudEvent { + content_type, + application_properties, + body, + }; + Ok(AmqpCloudEventExt { + inner, + extensions: event.extensions, + handler: self.handler, + }) + } +} + +pub struct AmqpCloudEventExt +where + F: FnOnce(Extensions) -> AmqpMessage, +{ + inner: AmqpCloudEvent, + extensions: Extensions, + handler: F, +} + +impl AmqpCloudEventExt where F: FnOnce(Extensions) -> AmqpMessage {} + +impl From> for AmqpMessage +where + F: FnOnce(Extensions) -> AmqpMessage, +{ + fn from(mut value: AmqpCloudEventExt) -> Self { + let mut message = (value.handler)(value.extensions); + + // Set content_type to "datacontenttype" + let properties = message.properties.get_or_insert(Properties::default()); + properties.content_type = value.inner.content_type; + + // Append ApplicationProperties + let application_properties = message + .application_properties + .get_or_insert(ApplicationProperties::default()); + application_properties + .0 + .append(&mut value.inner.application_properties.0); + + // Overrite the message body + message.body = value.inner.body; + + message + } +} diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index f21ffd6f..0850cca5 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,5 +1,6 @@ //! Implements AMQP 1.0 binding for CloudEvents +use std::collections::HashMap; use std::convert::TryFrom; use fe2o3_amqp_lib::types::messaging::{ @@ -7,9 +8,13 @@ use fe2o3_amqp_lib::types::messaging::{ }; use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::event::{AttributeValue, Attributes}; +use crate::event::{AttributeValue, Attributes, ExtensionValue}; use crate::message::{Error, MessageAttributeValue}; -use crate::{Event}; +use crate::{Event, AttributesReader, Data}; + +use self::extensions::ExtensionsHandler; + +pub mod extensions; /// Type alias for an AMQP 1.0 message /// @@ -20,19 +25,36 @@ pub type AmqpMessage = Message; pub type AmqpBody = Body; +pub type Extensions = HashMap; + pub struct AmqpCloudEvent { - properties: Properties, + content_type: Option, application_properties: ApplicationProperties, body: AmqpBody, } +impl AmqpCloudEvent { + pub fn with_extensions_handler(handler: F) -> ExtensionsHandler + where + F: FnOnce(Extensions) -> AmqpMessage + { + ExtensionsHandler::new(handler) + } + + pub fn from_event(event: Event) -> Result { + Self::try_from(event) + } +} + impl From for AmqpMessage { fn from(event: AmqpCloudEvent) -> Self { + let mut properties = Properties::default(); + properties.content_type = event.content_type; Message { header: None, delivery_annotations: None, message_annotations: None, - properties: Some(event.properties), + properties: Some(properties), application_properties: Some(event.application_properties), body: event.body, footer: None, @@ -48,12 +70,13 @@ impl TryFrom for AmqpCloudEvent { Body::Data(data) => Body::Data(data), _ => return Err(Error::WrongEncoding {}), }; - let properties = value.properties.ok_or(Error::WrongEncoding {})?; + let content_type = value.properties.ok_or(Error::WrongEncoding {})? + .content_type.take(); let application_properties = value .application_properties .ok_or(Error::WrongEncoding {})?; Ok(Self { - properties, + content_type, application_properties, body, }) @@ -140,43 +163,48 @@ impl From for Value { impl TryFrom for AmqpCloudEvent { type Error = Error; - fn try_from(mut event: Event) -> Result { - let mut properties = Properties::default(); - properties.content_type = match &mut event.attributes { - Attributes::V03(attributes) => attributes.datacontenttype.take(), - Attributes::V10(attributes) => attributes.datacontenttype.take(), - }.map(Symbol::from); - - let mut application_properties = ApplicationProperties::default(); - for (key, value) in event.attributes.iter() { - if key == "datacontenttype" { - continue; - } else { - let key = format!("cloudEvents:{}", key); - application_properties.insert(key, SimpleValue::from(value)); - } - } - - let body = match event.data { - Some(data) => match data { - crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))), - crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))), - crate::Data::Json(val) => { - let bytes = serde_json::to_vec(&val)?; - Body::Data(AmqpData(Binary::from(bytes))) - }, - }, - None => AmqpBody::Nothing, - }; + fn try_from(event: Event) -> Result { + let (content_type, application_properties) = from_event_attributes(event.attributes); + let body = from_event_data(event.data)?; Ok(Self { - properties, + content_type, application_properties, body, }) } } +fn from_event_attributes(attributes: Attributes) -> (Option, ApplicationProperties) { + let content_type = attributes.datacontenttype().map(Symbol::from); + + let mut application_properties = ApplicationProperties::default(); + for (key, value) in attributes.iter() { + if key == "datacontenttype" { + continue; + } else { + let key = format!("cloudEvents:{}", key); + application_properties.insert(key, SimpleValue::from(value)); + } + } + (content_type, application_properties) +} + +fn from_event_data(data: Option) -> Result { + let body = match data { + Some(data) => match data { + crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))), + crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))), + crate::Data::Json(val) => { + let bytes = serde_json::to_vec(&val)?; + Body::Data(AmqpData(Binary::from(bytes))) + }, + }, + None => AmqpBody::Nothing, + }; + Ok(body) +} + // impl BinarySerializer for AmqpCloudEvent { // fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { // let key = String::from("cloudEvents:specversion"); From e51648938c0e3c98224d6e734fcf3f3ca5d1309d Mon Sep 17 00:00:00 2001 From: minghuaw Date: Wed, 17 Aug 2022 23:35:27 -0700 Subject: [PATCH 04/30] initial impl of deserializers Signed-off-by: minghuaw --- Cargo.toml | 2 +- src/binding/fe2o3_amqp/constants.rs | 25 ++++ src/binding/fe2o3_amqp/deserializer.rs | 101 ++++++++++++++ src/binding/fe2o3_amqp/extensions.rs | 77 ---------- src/binding/fe2o3_amqp/mod.rs | 186 +++++++++---------------- src/binding/fe2o3_amqp/serializer.rs | 70 ++++++++++ 6 files changed, 265 insertions(+), 196 deletions(-) create mode 100644 src/binding/fe2o3_amqp/constants.rs create mode 100644 src/binding/fe2o3_amqp/deserializer.rs delete mode 100644 src/binding/fe2o3_amqp/extensions.rs create mode 100644 src/binding/fe2o3_amqp/serializer.rs diff --git a/Cargo.toml b/Cargo.toml index 921985ba..b272905c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" } -fe2o3-amqp-types = { version = "0.3.3", optional = true } +fe2o3-amqp-types = { version = "0.3.4", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" diff --git a/src/binding/fe2o3_amqp/constants.rs b/src/binding/fe2o3_amqp/constants.rs new file mode 100644 index 00000000..827491bc --- /dev/null +++ b/src/binding/fe2o3_amqp/constants.rs @@ -0,0 +1,25 @@ + +// Required +pub(super) const ID: &str = "id"; +pub(super) const SOURCE: &str = "source"; +pub(super) const SPECVERSION: &str = "specversion"; +pub(super) const TYPE: &str = "type"; + +// Optional +pub(super) const DATACONTENTTYPE: &str = "datacontenttype"; +pub(super) const DATASCHEMA: &str = "dataschema"; +pub(super) const SUBJECT: &str = "subject"; +pub(super) const TIME: &str = "time"; + +pub(super) mod prefixed { + // Required + pub const ID: &str = "cloudEvents:id"; + pub const SOURCE: &str = "cloudEvents:source"; + pub const SPECVERSION: &str = "cloudEvents:specversion"; + pub const TYPE: &str = "cloudEvents:type"; + + // Optional + pub const DATASCHEMA: &str = "cloudEvents:dataschema"; + pub const SUBJECT: &str = "cloudEvents:subject"; + pub const TIME: &str = "cloudEvents:time"; +} \ No newline at end of file diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs new file mode 100644 index 00000000..ec53e3f3 --- /dev/null +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -0,0 +1,101 @@ +use std::convert::TryFrom; + +use fe2o3_amqp_types::primitives::{SimpleValue, Symbol}; + +use crate::{ + binding::CLOUDEVENTS_JSON_HEADER, + event::SpecVersion, + message::{ + BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue, + MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, + }, +}; + +use super::{ + constants::{self, prefixed, DATACONTENTTYPE}, + AmqpCloudEvent, ATTRIBUTE_PREFIX, +}; + +impl BinaryDeserializer for AmqpCloudEvent { + fn deserialize_binary>( + mut self, + mut serializer: V, + ) -> Result { + use fe2o3_amqp_types::messaging::Body; + + // specversion + let spec_version = { + let value = self + .application_properties + .remove(prefixed::SPECVERSION) + .ok_or(Error::WrongEncoding {}) + .map(|val| match val { + SimpleValue::String(s) => Ok(s), + _ => Err(Error::WrongEncoding {}), + })??; + SpecVersion::try_from(&value[..])? + }; + serializer = serializer.set_spec_version(spec_version.clone())?; + + // datacontenttype + serializer = match self.content_type { + Some(Symbol(content_type)) => serializer + .set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?, + None => serializer, + }; + + // remaining attributes + let attributes = spec_version.attribute_names(); + + for (key, value) in self.application_properties.0.into_iter() { + if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { + if attributes.contains(&key) { + let value = MessageAttributeValue::try_from((key, value))?; + serializer = serializer.set_attribute(key, value)?; + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(key, value)?; + } + } + } + + match self.body { + Body::Data(data) => { + let bytes = data.0.into_vec(); + serializer.end_with_data(bytes) + } + Body::Nothing => serializer.end(), + Body::Sequence(_) | Body::Value(_) => Err(Error::WrongEncoding {}), + } + } +} + +impl StructuredDeserializer for AmqpCloudEvent { + fn deserialize_structured>( + self, + serializer: V, + ) -> Result { + use fe2o3_amqp_types::messaging::Body; + let bytes = match self.body { + Body::Data(data) => data.0.into_vec(), + Body::Nothing => vec![], + Body::Sequence(_) + | Body::Value(_) => return Err(Error::WrongEncoding { }), + }; + serializer.set_structured_event(bytes) + } +} + +impl MessageDeserializer for AmqpCloudEvent { + fn encoding(&self) -> Encoding { + match self + .content_type + .as_ref() + .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) + { + Some(true) => Encoding::STRUCTURED, + Some(false) => Encoding::BINARY, + None => Encoding::UNKNOWN, + } + } +} diff --git a/src/binding/fe2o3_amqp/extensions.rs b/src/binding/fe2o3_amqp/extensions.rs deleted file mode 100644 index 64280ad5..00000000 --- a/src/binding/fe2o3_amqp/extensions.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::collections::HashMap; - -use fe2o3_amqp_types::messaging::{ApplicationProperties, Properties}; - -use crate::{event::ExtensionValue, message::Error, Event}; - -use super::{ - from_event_attributes, from_event_data, AmqpBody, AmqpCloudEvent, AmqpMessage, Extensions, -}; - -pub struct ExtensionsHandler -where - F: FnOnce(Extensions) -> AmqpMessage, -{ - handler: F, -} - -impl ExtensionsHandler -where - F: FnOnce(Extensions) -> AmqpMessage, -{ - pub fn new(handler: F) -> Self { - Self { handler } - } - - pub fn from_event(self, event: Event) -> Result, Error> { - let (content_type, application_properties) = from_event_attributes(event.attributes); - let body = from_event_data(event.data)?; - let inner = AmqpCloudEvent { - content_type, - application_properties, - body, - }; - Ok(AmqpCloudEventExt { - inner, - extensions: event.extensions, - handler: self.handler, - }) - } -} - -pub struct AmqpCloudEventExt -where - F: FnOnce(Extensions) -> AmqpMessage, -{ - inner: AmqpCloudEvent, - extensions: Extensions, - handler: F, -} - -impl AmqpCloudEventExt where F: FnOnce(Extensions) -> AmqpMessage {} - -impl From> for AmqpMessage -where - F: FnOnce(Extensions) -> AmqpMessage, -{ - fn from(mut value: AmqpCloudEventExt) -> Self { - let mut message = (value.handler)(value.extensions); - - // Set content_type to "datacontenttype" - let properties = message.properties.get_or_insert(Properties::default()); - properties.content_type = value.inner.content_type; - - // Append ApplicationProperties - let application_properties = message - .application_properties - .get_or_insert(ApplicationProperties::default()); - application_properties - .0 - .append(&mut value.inner.application_properties.0); - - // Overrite the message body - message.body = value.inner.body; - - message - } -} diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 0850cca5..e014e0f0 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -3,18 +3,24 @@ use std::collections::HashMap; use std::convert::TryFrom; -use fe2o3_amqp_lib::types::messaging::{ - ApplicationProperties, Body, Data as AmqpData, Message, Properties, -}; +use chrono::{DateTime, Utc, TimeZone}; +use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::event::{AttributeValue, Attributes, ExtensionValue}; +use crate::event::{AttributeValue, ExtensionValue, UriReference}; use crate::message::{Error, MessageAttributeValue}; -use crate::{Event, AttributesReader, Data}; +use crate::Event; + +use self::constants::{ + prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, +}; + +const ATTRIBUTE_PREFIX: &str = "cloudEvents:"; -use self::extensions::ExtensionsHandler; +pub mod deserializer; +pub mod serializer; -pub mod extensions; +mod constants; /// Type alias for an AMQP 1.0 message /// @@ -27,6 +33,10 @@ pub type AmqpBody = Body; pub type Extensions = HashMap; +/// The receiver of the event can distinguish between the two modes by inspecting the content-type +/// message property field. If the value is prefixed with the CloudEvents media type +/// application/cloudevents, indicating the use of a known event format, the receiver uses +/// structured mode, otherwise it defaults to binary mode. pub struct AmqpCloudEvent { content_type: Option, application_properties: ApplicationProperties, @@ -34,15 +44,8 @@ pub struct AmqpCloudEvent { } impl AmqpCloudEvent { - pub fn with_extensions_handler(handler: F) -> ExtensionsHandler - where - F: FnOnce(Extensions) -> AmqpMessage - { - ExtensionsHandler::new(handler) - } - pub fn from_event(event: Event) -> Result { - Self::try_from(event) + todo!() } } @@ -62,27 +65,6 @@ impl From for AmqpMessage { } } -impl TryFrom for AmqpCloudEvent { - type Error = Error; - - fn try_from(value: AmqpMessage) -> Result { - let body = match value.body { - Body::Data(data) => Body::Data(data), - _ => return Err(Error::WrongEncoding {}), - }; - let content_type = value.properties.ok_or(Error::WrongEncoding {})? - .content_type.take(); - let application_properties = value - .application_properties - .ok_or(Error::WrongEncoding {})?; - Ok(Self { - content_type, - application_properties, - body, - }) - } -} - impl<'a> From> for SimpleValue { fn from(value: AttributeValue) -> Self { match value { @@ -157,94 +139,62 @@ impl From for Value { } } -/// The `BinarySerializer`/`StructuredSerializer` traits are not implemented because -/// "datacontenttype" needs special treatment in AMQP. However, `StructureSerializer` doesn't -/// provide access to "datacontenttype" -impl TryFrom for AmqpCloudEvent { +impl TryFrom for MessageAttributeValue { type Error = Error; - fn try_from(event: Event) -> Result { - let (content_type, application_properties) = from_event_attributes(event.attributes); - let body = from_event_data(event.data)?; - - Ok(Self { - content_type, - application_properties, - body, - }) + fn try_from(value: SimpleValue) -> Result { + match value { + SimpleValue::Bool(val) => Ok(MessageAttributeValue::Boolean(val)), + SimpleValue::Long(val) => Ok(MessageAttributeValue::Integer(val)), + SimpleValue::Timestamp(val) => { + let datetime = Utc.timestamp_millis(val.into_inner()); + Ok(MessageAttributeValue::DateTime(datetime)) + }, + SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())), + SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)), + _ => Err(Error::WrongEncoding { }) + } } } -fn from_event_attributes(attributes: Attributes) -> (Option, ApplicationProperties) { - let content_type = attributes.datacontenttype().map(Symbol::from); +impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { + type Error = Error; - let mut application_properties = ApplicationProperties::default(); - for (key, value) in attributes.iter() { - if key == "datacontenttype" { - continue; - } else { - let key = format!("cloudEvents:{}", key); - application_properties.insert(key, SimpleValue::from(value)); + fn try_from((key, value): (&'a str, SimpleValue)) -> Result { + match key { + // String + ID | prefixed::ID + // String + | SPECVERSION | prefixed::SPECVERSION + // String + | TYPE | prefixed::TYPE + // String + | DATACONTENTTYPE + // String + | SUBJECT | prefixed::SUBJECT => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; + Ok(MessageAttributeValue::String(val)) + }, + // URI-reference + SOURCE | prefixed::SOURCE => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; + Ok(MessageAttributeValue::UriRef(val)) + }, + // URI + DATASCHEMA | prefixed::DATASCHEMA => { + let val = String::try_from(value).map_err(|_| Error::WrongEncoding { })?; + let url_val = url::Url::parse(&val)?; + Ok(MessageAttributeValue::Uri(url_val)) + } + // Timestamp + TIME | prefixed::TIME => { + let val = Timestamp::try_from(value).map_err(|_| Error::WrongEncoding { })?; + let datetime = Utc.timestamp_millis(val.into_inner()); + Ok(MessageAttributeValue::DateTime(datetime)) + } + _ => { + MessageAttributeValue::try_from(value) + } } } - (content_type, application_properties) -} - -fn from_event_data(data: Option) -> Result { - let body = match data { - Some(data) => match data { - crate::Data::Binary(data) => Body::Data(AmqpData(Binary::from(data))), - crate::Data::String(val) => Body::Data(AmqpData(Binary::from(val))), - crate::Data::Json(val) => { - let bytes = serde_json::to_vec(&val)?; - Body::Data(AmqpData(Binary::from(bytes))) - }, - }, - None => AmqpBody::Nothing, - }; - Ok(body) } - -// impl BinarySerializer for AmqpCloudEvent { -// fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { -// let key = String::from("cloudEvents:specversion"); -// let value = String::from(spec_version.as_str()); -// self.application_properties.insert(key, SimpleValue::from(value)); -// Ok(self) -// } - -// fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { -// if name == "datacontenttype" { -// self.properties.content_type = match value { -// MessageAttributeValue::String(s) => Some(Symbol::from(s)), -// _ => return Err(Error::WrongEncoding { }) -// } -// } else { -// let key = format!("cloudEvents:{}", name); -// let value = SimpleValue::from(value); -// self.application_properties.insert(key, value); -// } - -// Ok(self) -// } - -// fn set_extension(self, name: &str, value: MessageAttributeValue) -> crate::message::Result { -// todo!() -// } - -// fn end_with_data(mut self, bytes: Vec) -> crate::message::Result { -// let data = Binary::from(bytes); -// self.body = Body::Data(AmqpData(data)); -// Ok(self) -// } - -// fn end(self) -> crate::message::Result { -// Ok(self) -// } -// } - -// impl StructuredSerializer for AmqpCloudEvent { -// fn set_structured_event(self, bytes: Vec) -> crate::message::Result { -// todo!() -// } -// } diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs new file mode 100644 index 00000000..365c5d16 --- /dev/null +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -0,0 +1,70 @@ +use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary}; +use fe2o3_amqp_types::messaging::{Data as AmqpData}; + +use crate::message::StructuredSerializer; +use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion}; + +use super::constants::DATACONTENTTYPE; +use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody}; + +impl BinarySerializer for AmqpCloudEvent { + fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { + let key = String::from("cloudEvents:specversion"); + let value = String::from(spec_version.as_str()); + self.application_properties.insert(key, SimpleValue::from(value)); + Ok(self) + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + // For the binary mode, the AMQP content-type property field value maps directly to the + // CloudEvents datacontenttype attribute. + // + // All CloudEvents attributes with exception of datacontenttype MUST be individually mapped + // to and from the AMQP application-properties section. + if name == DATACONTENTTYPE { + self.content_type = match value { + MessageAttributeValue::String(s) => Some(Symbol::from(s)), + _ => return Err(Error::WrongEncoding { }) + } + } else { + // CloudEvent attributes are prefixed with "cloudEvents:" for use in the + // application-properties section + let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + let value = SimpleValue::from(value); + self.application_properties.insert(key, value); + } + + Ok(self) + } + + // Extension attributes are always serialized according to binding rules like standard + // attributes. However this specification does not prevent an extension from copying event + // attribute values to other parts of a message, in order to interact with non-CloudEvents + // systems that also process the message. Extension specifications that do this SHOULD specify + // how receivers are to interpret messages if the copied values differ from the cloud-event + // serialized values. + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + let value = SimpleValue::from(value); + self.application_properties.insert(key, value); + Ok(self) + } + + fn end_with_data(mut self, bytes: Vec) -> crate::message::Result { + let data = Binary::from(bytes); + self.body = AmqpBody::Data(AmqpData(data)); + Ok(self) + } + + fn end(self) -> crate::message::Result { + Ok(self) + } +} + +impl StructuredSerializer for AmqpCloudEvent { + fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { + self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); + self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); + Ok(self) + } +} From 64693d17b317b7c3a2c3fb92c8944d646a7a6e1f Mon Sep 17 00:00:00 2001 From: minghuaw Date: Wed, 17 Aug 2022 23:36:08 -0700 Subject: [PATCH 05/30] removed unused imports Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/deserializer.rs | 2 +- src/binding/fe2o3_amqp/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index ec53e3f3..708f0ba7 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -12,7 +12,7 @@ use crate::{ }; use super::{ - constants::{self, prefixed, DATACONTENTTYPE}, + constants::{prefixed, DATACONTENTTYPE}, AmqpCloudEvent, ATTRIBUTE_PREFIX, }; diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index e014e0f0..8e3b483e 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -3,11 +3,11 @@ use std::collections::HashMap; use std::convert::TryFrom; -use chrono::{DateTime, Utc, TimeZone}; +use chrono::{Utc, TimeZone}; use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::event::{AttributeValue, ExtensionValue, UriReference}; +use crate::event::{AttributeValue, ExtensionValue}; use crate::message::{Error, MessageAttributeValue}; use crate::Event; From f9b50586bdb3a5798a233c77db67fb517b503f2d Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 00:42:48 -0700 Subject: [PATCH 06/30] removed AmqpCloudEvent Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/deserializer.rs | 41 ++++++++++--------- src/binding/fe2o3_amqp/mod.rs | 47 +++------------------ src/binding/fe2o3_amqp/serializer.rs | 56 ++++++++++++++++++-------- 3 files changed, 67 insertions(+), 77 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 708f0ba7..195dcbaa 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -13,10 +13,10 @@ use crate::{ use super::{ constants::{prefixed, DATACONTENTTYPE}, - AmqpCloudEvent, ATTRIBUTE_PREFIX, + ATTRIBUTE_PREFIX, AmqpMessage, }; -impl BinaryDeserializer for AmqpCloudEvent { +impl BinaryDeserializer for AmqpMessage { fn deserialize_binary>( mut self, mut serializer: V, @@ -27,6 +27,8 @@ impl BinaryDeserializer for AmqpCloudEvent { let spec_version = { let value = self .application_properties + .as_mut() + .ok_or(Error::WrongEncoding { })? .remove(prefixed::SPECVERSION) .ok_or(Error::WrongEncoding {}) .map(|val| match val { @@ -38,23 +40,24 @@ impl BinaryDeserializer for AmqpCloudEvent { serializer = serializer.set_spec_version(spec_version.clone())?; // datacontenttype - serializer = match self.content_type { - Some(Symbol(content_type)) => serializer + serializer = match self.properties.map(|p| p.content_type) { + Some(Some(Symbol(content_type))) => serializer .set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?, - None => serializer, + _ => serializer, }; // remaining attributes let attributes = spec_version.attribute_names(); - - for (key, value) in self.application_properties.0.into_iter() { - if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { - if attributes.contains(&key) { - let value = MessageAttributeValue::try_from((key, value))?; - serializer = serializer.set_attribute(key, value)?; - } else { - let value = MessageAttributeValue::try_from(value)?; - serializer = serializer.set_extension(key, value)?; + if let Some(application_properties) = self.application_properties { + for (key, value) in application_properties.0.into_iter() { + if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { + if attributes.contains(&key) { + let value = MessageAttributeValue::try_from((key, value))?; + serializer = serializer.set_attribute(key, value)?; + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(key, value)?; + } } } } @@ -70,7 +73,7 @@ impl BinaryDeserializer for AmqpCloudEvent { } } -impl StructuredDeserializer for AmqpCloudEvent { +impl StructuredDeserializer for AmqpMessage { fn deserialize_structured>( self, serializer: V, @@ -86,12 +89,14 @@ impl StructuredDeserializer for AmqpCloudEvent { } } -impl MessageDeserializer for AmqpCloudEvent { +impl MessageDeserializer for AmqpMessage { fn encoding(&self) -> Encoding { match self - .content_type + .properties .as_ref() - .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) + .map(|p| p.content_type.as_ref() + .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) + ).flatten() { Some(true) => Encoding::STRUCTURED, Some(false) => Encoding::BINARY, diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 8e3b483e..5a6332fa 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,15 +1,13 @@ //! Implements AMQP 1.0 binding for CloudEvents -use std::collections::HashMap; use std::convert::TryFrom; use chrono::{Utc, TimeZone}; -use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; -use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; +use fe2o3_amqp_lib::types::messaging::{Body, Message}; +use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Timestamp, Value}; -use crate::event::{AttributeValue, ExtensionValue}; +use crate::event::{AttributeValue}; use crate::message::{Error, MessageAttributeValue}; -use crate::Event; use self::constants::{ prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, @@ -27,43 +25,8 @@ mod constants; /// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of /// no importance because all CloudEvents are using the `Body::Data` as the body section type. For /// convenience, this type alias chose `Value` as the value of the generic parameter -pub type AmqpMessage = Message; - -pub type AmqpBody = Body; - -pub type Extensions = HashMap; - -/// The receiver of the event can distinguish between the two modes by inspecting the content-type -/// message property field. If the value is prefixed with the CloudEvents media type -/// application/cloudevents, indicating the use of a known event format, the receiver uses -/// structured mode, otherwise it defaults to binary mode. -pub struct AmqpCloudEvent { - content_type: Option, - application_properties: ApplicationProperties, - body: AmqpBody, -} - -impl AmqpCloudEvent { - pub fn from_event(event: Event) -> Result { - todo!() - } -} - -impl From for AmqpMessage { - fn from(event: AmqpCloudEvent) -> Self { - let mut properties = Properties::default(); - properties.content_type = event.content_type; - Message { - header: None, - delivery_annotations: None, - message_annotations: None, - properties: Some(properties), - application_properties: Some(event.application_properties), - body: event.body, - footer: None, - } - } -} +type AmqpMessage = Message; +type AmqpBody = Body; impl<'a> From> for SimpleValue { fn from(value: AttributeValue) -> Self { diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index 365c5d16..6fa23829 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -1,37 +1,51 @@ -use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary}; -use fe2o3_amqp_types::messaging::{Data as AmqpData}; +use fe2o3_amqp_types::messaging::{Data as AmqpData, Properties, ApplicationProperties}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol}; +use crate::binding::header_prefix; use crate::message::StructuredSerializer; -use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion}; +use crate::{ + event::SpecVersion, + message::{BinarySerializer, Error, MessageAttributeValue}, +}; use super::constants::DATACONTENTTYPE; -use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody}; +use super::{AmqpBody, AmqpMessage, ATTRIBUTE_PREFIX}; -impl BinarySerializer for AmqpCloudEvent { +impl BinarySerializer for AmqpMessage { fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { let key = String::from("cloudEvents:specversion"); let value = String::from(spec_version.as_str()); - self.application_properties.insert(key, SimpleValue::from(value)); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, SimpleValue::from(value)); Ok(self) } - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + fn set_attribute( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { // For the binary mode, the AMQP content-type property field value maps directly to the // CloudEvents datacontenttype attribute. - // + // // All CloudEvents attributes with exception of datacontenttype MUST be individually mapped // to and from the AMQP application-properties section. if name == DATACONTENTTYPE { - self.content_type = match value { + self.properties + .get_or_insert(Properties::default()) + .content_type = match value { MessageAttributeValue::String(s) => Some(Symbol::from(s)), - _ => return Err(Error::WrongEncoding { }) + _ => return Err(Error::WrongEncoding {}), } } else { // CloudEvent attributes are prefixed with "cloudEvents:" for use in the // application-properties section - let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties.insert(key, value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); } Ok(self) @@ -43,10 +57,16 @@ impl BinarySerializer for AmqpCloudEvent { // systems that also process the message. Extension specifications that do this SHOULD specify // how receivers are to interpret messages if the copied values differ from the cloud-event // serialized values. - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { - let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + fn set_extension( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties.insert(key, value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); Ok(self) } @@ -61,9 +81,11 @@ impl BinarySerializer for AmqpCloudEvent { } } -impl StructuredSerializer for AmqpCloudEvent { +impl StructuredSerializer for AmqpMessage { fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { - self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); + self.properties + .get_or_insert(Properties::default()) + .content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); Ok(self) } From 73e1f6253b7f77285e97ca2d400946eeaba6b65f Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 01:02:25 -0700 Subject: [PATCH 07/30] Revert "removed AmqpCloudEvent" This reverts commit 1214976b3cbb1fcec8dc72066cc3da56bb192733. Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/deserializer.rs | 41 +++++++++---------- src/binding/fe2o3_amqp/mod.rs | 47 ++++++++++++++++++--- src/binding/fe2o3_amqp/serializer.rs | 56 ++++++++------------------ 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 195dcbaa..708f0ba7 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -13,10 +13,10 @@ use crate::{ use super::{ constants::{prefixed, DATACONTENTTYPE}, - ATTRIBUTE_PREFIX, AmqpMessage, + AmqpCloudEvent, ATTRIBUTE_PREFIX, }; -impl BinaryDeserializer for AmqpMessage { +impl BinaryDeserializer for AmqpCloudEvent { fn deserialize_binary>( mut self, mut serializer: V, @@ -27,8 +27,6 @@ impl BinaryDeserializer for AmqpMessage { let spec_version = { let value = self .application_properties - .as_mut() - .ok_or(Error::WrongEncoding { })? .remove(prefixed::SPECVERSION) .ok_or(Error::WrongEncoding {}) .map(|val| match val { @@ -40,24 +38,23 @@ impl BinaryDeserializer for AmqpMessage { serializer = serializer.set_spec_version(spec_version.clone())?; // datacontenttype - serializer = match self.properties.map(|p| p.content_type) { - Some(Some(Symbol(content_type))) => serializer + serializer = match self.content_type { + Some(Symbol(content_type)) => serializer .set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?, - _ => serializer, + None => serializer, }; // remaining attributes let attributes = spec_version.attribute_names(); - if let Some(application_properties) = self.application_properties { - for (key, value) in application_properties.0.into_iter() { - if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { - if attributes.contains(&key) { - let value = MessageAttributeValue::try_from((key, value))?; - serializer = serializer.set_attribute(key, value)?; - } else { - let value = MessageAttributeValue::try_from(value)?; - serializer = serializer.set_extension(key, value)?; - } + + for (key, value) in self.application_properties.0.into_iter() { + if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { + if attributes.contains(&key) { + let value = MessageAttributeValue::try_from((key, value))?; + serializer = serializer.set_attribute(key, value)?; + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(key, value)?; } } } @@ -73,7 +70,7 @@ impl BinaryDeserializer for AmqpMessage { } } -impl StructuredDeserializer for AmqpMessage { +impl StructuredDeserializer for AmqpCloudEvent { fn deserialize_structured>( self, serializer: V, @@ -89,14 +86,12 @@ impl StructuredDeserializer for AmqpMessage { } } -impl MessageDeserializer for AmqpMessage { +impl MessageDeserializer for AmqpCloudEvent { fn encoding(&self) -> Encoding { match self - .properties + .content_type .as_ref() - .map(|p| p.content_type.as_ref() - .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) - ).flatten() + .map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER)) { Some(true) => Encoding::STRUCTURED, Some(false) => Encoding::BINARY, diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 5a6332fa..8e3b483e 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,13 +1,15 @@ //! Implements AMQP 1.0 binding for CloudEvents +use std::collections::HashMap; use std::convert::TryFrom; use chrono::{Utc, TimeZone}; -use fe2o3_amqp_lib::types::messaging::{Body, Message}; -use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Timestamp, Value}; +use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; +use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::event::{AttributeValue}; +use crate::event::{AttributeValue, ExtensionValue}; use crate::message::{Error, MessageAttributeValue}; +use crate::Event; use self::constants::{ prefixed, DATACONTENTTYPE, DATASCHEMA, ID, SOURCE, SPECVERSION, SUBJECT, TIME, TYPE, @@ -25,8 +27,43 @@ mod constants; /// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of /// no importance because all CloudEvents are using the `Body::Data` as the body section type. For /// convenience, this type alias chose `Value` as the value of the generic parameter -type AmqpMessage = Message; -type AmqpBody = Body; +pub type AmqpMessage = Message; + +pub type AmqpBody = Body; + +pub type Extensions = HashMap; + +/// The receiver of the event can distinguish between the two modes by inspecting the content-type +/// message property field. If the value is prefixed with the CloudEvents media type +/// application/cloudevents, indicating the use of a known event format, the receiver uses +/// structured mode, otherwise it defaults to binary mode. +pub struct AmqpCloudEvent { + content_type: Option, + application_properties: ApplicationProperties, + body: AmqpBody, +} + +impl AmqpCloudEvent { + pub fn from_event(event: Event) -> Result { + todo!() + } +} + +impl From for AmqpMessage { + fn from(event: AmqpCloudEvent) -> Self { + let mut properties = Properties::default(); + properties.content_type = event.content_type; + Message { + header: None, + delivery_annotations: None, + message_annotations: None, + properties: Some(properties), + application_properties: Some(event.application_properties), + body: event.body, + footer: None, + } + } +} impl<'a> From> for SimpleValue { fn from(value: AttributeValue) -> Self { diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index 6fa23829..365c5d16 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -1,51 +1,37 @@ -use fe2o3_amqp_types::messaging::{Data as AmqpData, Properties, ApplicationProperties}; -use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol}; +use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary}; +use fe2o3_amqp_types::messaging::{Data as AmqpData}; -use crate::binding::header_prefix; use crate::message::StructuredSerializer; -use crate::{ - event::SpecVersion, - message::{BinarySerializer, Error, MessageAttributeValue}, -}; +use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion}; use super::constants::DATACONTENTTYPE; -use super::{AmqpBody, AmqpMessage, ATTRIBUTE_PREFIX}; +use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody}; -impl BinarySerializer for AmqpMessage { +impl BinarySerializer for AmqpCloudEvent { fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { let key = String::from("cloudEvents:specversion"); let value = String::from(spec_version.as_str()); - self.application_properties - .get_or_insert(ApplicationProperties::default()) - .insert(key, SimpleValue::from(value)); + self.application_properties.insert(key, SimpleValue::from(value)); Ok(self) } - fn set_attribute( - mut self, - name: &str, - value: MessageAttributeValue, - ) -> crate::message::Result { + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { // For the binary mode, the AMQP content-type property field value maps directly to the // CloudEvents datacontenttype attribute. - // + // // All CloudEvents attributes with exception of datacontenttype MUST be individually mapped // to and from the AMQP application-properties section. if name == DATACONTENTTYPE { - self.properties - .get_or_insert(Properties::default()) - .content_type = match value { + self.content_type = match value { MessageAttributeValue::String(s) => Some(Symbol::from(s)), - _ => return Err(Error::WrongEncoding {}), + _ => return Err(Error::WrongEncoding { }) } } else { // CloudEvent attributes are prefixed with "cloudEvents:" for use in the // application-properties section - let key = header_prefix(ATTRIBUTE_PREFIX, name); + let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties - .get_or_insert(ApplicationProperties::default()) - .insert(key, value); + self.application_properties.insert(key, value); } Ok(self) @@ -57,16 +43,10 @@ impl BinarySerializer for AmqpMessage { // systems that also process the message. Extension specifications that do this SHOULD specify // how receivers are to interpret messages if the copied values differ from the cloud-event // serialized values. - fn set_extension( - mut self, - name: &str, - value: MessageAttributeValue, - ) -> crate::message::Result { - let key = header_prefix(ATTRIBUTE_PREFIX, name); + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties - .get_or_insert(ApplicationProperties::default()) - .insert(key, value); + self.application_properties.insert(key, value); Ok(self) } @@ -81,11 +61,9 @@ impl BinarySerializer for AmqpMessage { } } -impl StructuredSerializer for AmqpMessage { +impl StructuredSerializer for AmqpCloudEvent { fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { - self.properties - .get_or_insert(Properties::default()) - .content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); + self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); Ok(self) } From 90f84e397dcf0e4760bb79b52516b6bddd7ce3fb Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 01:05:48 -0700 Subject: [PATCH 08/30] replace format with header_prefix Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/serializer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index 365c5d16..eca3c111 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -1,6 +1,7 @@ use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary}; use fe2o3_amqp_types::messaging::{Data as AmqpData}; +use crate::binding::header_prefix; use crate::message::StructuredSerializer; use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion}; @@ -29,7 +30,7 @@ impl BinarySerializer for AmqpCloudEvent { } else { // CloudEvent attributes are prefixed with "cloudEvents:" for use in the // application-properties section - let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); self.application_properties.insert(key, value); } @@ -44,7 +45,7 @@ impl BinarySerializer for AmqpCloudEvent { // how receivers are to interpret messages if the copied values differ from the cloud-event // serialized values. fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { - let key = format!("{}:{}", ATTRIBUTE_PREFIX, name); + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); self.application_properties.insert(key, value); Ok(self) From 73ff402e4fb4301e84f06be19cc71f5772085b9c Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 01:21:26 -0700 Subject: [PATCH 09/30] impl From for AmqpCloudEvent Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/constants.rs | 3 +- src/binding/fe2o3_amqp/deserializer.rs | 23 ++++++++------- src/binding/fe2o3_amqp/mod.rs | 41 +++++++++++++++++++------- src/binding/fe2o3_amqp/serializer.rs | 39 +++++++++++++++++------- src/binding/mod.rs | 4 +-- 5 files changed, 74 insertions(+), 36 deletions(-) diff --git a/src/binding/fe2o3_amqp/constants.rs b/src/binding/fe2o3_amqp/constants.rs index 827491bc..dddf1c07 100644 --- a/src/binding/fe2o3_amqp/constants.rs +++ b/src/binding/fe2o3_amqp/constants.rs @@ -1,4 +1,3 @@ - // Required pub(super) const ID: &str = "id"; pub(super) const SOURCE: &str = "source"; @@ -22,4 +21,4 @@ pub(super) mod prefixed { pub const DATASCHEMA: &str = "cloudEvents:dataschema"; pub const SUBJECT: &str = "cloudEvents:subject"; pub const TIME: &str = "cloudEvents:time"; -} \ No newline at end of file +} diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 708f0ba7..17d5f8e2 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -27,6 +27,8 @@ impl BinaryDeserializer for AmqpCloudEvent { let spec_version = { let value = self .application_properties + .as_mut() + .ok_or(Error::WrongEncoding {})? .remove(prefixed::SPECVERSION) .ok_or(Error::WrongEncoding {}) .map(|val| match val { @@ -47,14 +49,16 @@ impl BinaryDeserializer for AmqpCloudEvent { // remaining attributes let attributes = spec_version.attribute_names(); - for (key, value) in self.application_properties.0.into_iter() { - if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { - if attributes.contains(&key) { - let value = MessageAttributeValue::try_from((key, value))?; - serializer = serializer.set_attribute(key, value)?; - } else { - let value = MessageAttributeValue::try_from(value)?; - serializer = serializer.set_extension(key, value)?; + if let Some(application_properties) = self.application_properties { + for (key, value) in application_properties.0.into_iter() { + if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) { + if attributes.contains(&key) { + let value = MessageAttributeValue::try_from((key, value))?; + serializer = serializer.set_attribute(key, value)?; + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(key, value)?; + } } } } @@ -79,8 +83,7 @@ impl StructuredDeserializer for AmqpCloudEvent { let bytes = match self.body { Body::Data(data) => data.0.into_vec(), Body::Nothing => vec![], - Body::Sequence(_) - | Body::Value(_) => return Err(Error::WrongEncoding { }), + Body::Sequence(_) | Body::Value(_) => return Err(Error::WrongEncoding {}), }; serializer.set_structured_event(bytes) } diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 8e3b483e..79db0d31 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -3,12 +3,12 @@ use std::collections::HashMap; use std::convert::TryFrom; -use chrono::{Utc, TimeZone}; +use chrono::{TimeZone, Utc}; use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; use crate::event::{AttributeValue, ExtensionValue}; -use crate::message::{Error, MessageAttributeValue}; +use crate::message::{BinaryDeserializer, Error, MessageAttributeValue}; use crate::Event; use self::constants::{ @@ -39,13 +39,21 @@ pub type Extensions = HashMap; /// structured mode, otherwise it defaults to binary mode. pub struct AmqpCloudEvent { content_type: Option, - application_properties: ApplicationProperties, + application_properties: Option, body: AmqpBody, } impl AmqpCloudEvent { - pub fn from_event(event: Event) -> Result { - todo!() + fn new() -> Self { + Self { + content_type: None, + application_properties: None, + body: Body::Nothing, + } + } + + pub fn from_binary_event(event: Event) -> Result { + BinaryDeserializer::deserialize_binary(event, Self::new()) } } @@ -58,13 +66,24 @@ impl From for AmqpMessage { delivery_annotations: None, message_annotations: None, properties: Some(properties), - application_properties: Some(event.application_properties), + application_properties: event.application_properties, body: event.body, footer: None, } } } +impl From for AmqpCloudEvent { + fn from(message: AmqpMessage) -> Self { + let content_type = message.properties.map(|p| p.content_type).flatten(); + Self { + content_type, + application_properties: message.application_properties, + body: message.body, + } + } +} + impl<'a> From> for SimpleValue { fn from(value: AttributeValue) -> Self { match value { @@ -149,10 +168,10 @@ impl TryFrom for MessageAttributeValue { SimpleValue::Timestamp(val) => { let datetime = Utc.timestamp_millis(val.into_inner()); Ok(MessageAttributeValue::DateTime(datetime)) - }, + } SimpleValue::Binary(val) => Ok(MessageAttributeValue::Binary(val.into_vec())), SimpleValue::String(val) => Ok(MessageAttributeValue::String(val)), - _ => Err(Error::WrongEncoding { }) + _ => Err(Error::WrongEncoding {}), } } } @@ -163,13 +182,13 @@ impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { fn try_from((key, value): (&'a str, SimpleValue)) -> Result { match key { // String - ID | prefixed::ID + ID | prefixed::ID // String - | SPECVERSION | prefixed::SPECVERSION + | SPECVERSION | prefixed::SPECVERSION // String | TYPE | prefixed::TYPE // String - | DATACONTENTTYPE + | DATACONTENTTYPE // String | SUBJECT | prefixed::SUBJECT => { let val = String::try_from(value).map_err(|_| Error::WrongEncoding {})?; diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index eca3c111..e5b206be 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -1,38 +1,49 @@ -use fe2o3_amqp_types::primitives::{SimpleValue, Symbol, Binary}; -use fe2o3_amqp_types::messaging::{Data as AmqpData}; +use fe2o3_amqp_types::messaging::{ApplicationProperties, Data as AmqpData}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol}; use crate::binding::header_prefix; use crate::message::StructuredSerializer; -use crate::{message::{BinarySerializer, MessageAttributeValue, Error}, event::SpecVersion}; +use crate::{ + event::SpecVersion, + message::{BinarySerializer, Error, MessageAttributeValue}, +}; use super::constants::DATACONTENTTYPE; -use super::{AmqpCloudEvent, ATTRIBUTE_PREFIX, AmqpBody}; +use super::{AmqpBody, AmqpCloudEvent, ATTRIBUTE_PREFIX}; impl BinarySerializer for AmqpCloudEvent { fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { let key = String::from("cloudEvents:specversion"); let value = String::from(spec_version.as_str()); - self.application_properties.insert(key, SimpleValue::from(value)); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, SimpleValue::from(value)); Ok(self) } - fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + fn set_attribute( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { // For the binary mode, the AMQP content-type property field value maps directly to the // CloudEvents datacontenttype attribute. - // + // // All CloudEvents attributes with exception of datacontenttype MUST be individually mapped // to and from the AMQP application-properties section. if name == DATACONTENTTYPE { self.content_type = match value { MessageAttributeValue::String(s) => Some(Symbol::from(s)), - _ => return Err(Error::WrongEncoding { }) + _ => return Err(Error::WrongEncoding {}), } } else { // CloudEvent attributes are prefixed with "cloudEvents:" for use in the // application-properties section let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties.insert(key, value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); } Ok(self) @@ -44,10 +55,16 @@ impl BinarySerializer for AmqpCloudEvent { // systems that also process the message. Extension specifications that do this SHOULD specify // how receivers are to interpret messages if the copied values differ from the cloud-event // serialized values. - fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> crate::message::Result { + fn set_extension( + mut self, + name: &str, + value: MessageAttributeValue, + ) -> crate::message::Result { let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); - self.application_properties.insert(key, value); + self.application_properties + .get_or_insert(ApplicationProperties::default()) + .insert(key, value); Ok(self) } diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 7e9cad97..6eae5eb6 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -4,6 +4,8 @@ pub mod actix; #[cfg(feature = "axum")] pub mod axum; +#[cfg(feature = "fe2o3-amqp")] +pub mod fe2o3_amqp; #[cfg(any( feature = "http-binding", feature = "actix", @@ -23,8 +25,6 @@ pub mod rdkafka; pub mod reqwest; #[cfg(feature = "warp")] pub mod warp; -#[cfg(feature = "fe2o3-amqp")] -pub mod fe2o3_amqp; #[cfg(feature = "rdkafka")] pub(crate) mod kafka { From 6e90ddba6be29d6bc68d738038479b5a6fff3fb0 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 01:24:26 -0700 Subject: [PATCH 10/30] renamed to AmqpBinding Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/deserializer.rs | 8 ++++---- src/binding/fe2o3_amqp/mod.rs | 16 ++++++++++------ src/binding/fe2o3_amqp/serializer.rs | 6 +++--- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 17d5f8e2..41c9e02d 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -13,10 +13,10 @@ use crate::{ use super::{ constants::{prefixed, DATACONTENTTYPE}, - AmqpCloudEvent, ATTRIBUTE_PREFIX, + AmqpBinding, ATTRIBUTE_PREFIX, }; -impl BinaryDeserializer for AmqpCloudEvent { +impl BinaryDeserializer for AmqpBinding { fn deserialize_binary>( mut self, mut serializer: V, @@ -74,7 +74,7 @@ impl BinaryDeserializer for AmqpCloudEvent { } } -impl StructuredDeserializer for AmqpCloudEvent { +impl StructuredDeserializer for AmqpBinding { fn deserialize_structured>( self, serializer: V, @@ -89,7 +89,7 @@ impl StructuredDeserializer for AmqpCloudEvent { } } -impl MessageDeserializer for AmqpCloudEvent { +impl MessageDeserializer for AmqpBinding { fn encoding(&self) -> Encoding { match self .content_type diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 79db0d31..223e174d 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -8,7 +8,7 @@ use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Pro use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; use crate::event::{AttributeValue, ExtensionValue}; -use crate::message::{BinaryDeserializer, Error, MessageAttributeValue}; +use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; use crate::Event; use self::constants::{ @@ -37,13 +37,13 @@ pub type Extensions = HashMap; /// message property field. If the value is prefixed with the CloudEvents media type /// application/cloudevents, indicating the use of a known event format, the receiver uses /// structured mode, otherwise it defaults to binary mode. -pub struct AmqpCloudEvent { +pub struct AmqpBinding { content_type: Option, application_properties: Option, body: AmqpBody, } -impl AmqpCloudEvent { +impl AmqpBinding { fn new() -> Self { Self { content_type: None, @@ -55,10 +55,14 @@ impl AmqpCloudEvent { pub fn from_binary_event(event: Event) -> Result { BinaryDeserializer::deserialize_binary(event, Self::new()) } + + pub fn from_structured_event(event: Event) -> Result { + StructuredDeserializer::deserialize_structured(event, Self::new()) + } } -impl From for AmqpMessage { - fn from(event: AmqpCloudEvent) -> Self { +impl From for AmqpMessage { + fn from(event: AmqpBinding) -> Self { let mut properties = Properties::default(); properties.content_type = event.content_type; Message { @@ -73,7 +77,7 @@ impl From for AmqpMessage { } } -impl From for AmqpCloudEvent { +impl From for AmqpBinding { fn from(message: AmqpMessage) -> Self { let content_type = message.properties.map(|p| p.content_type).flatten(); Self { diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index e5b206be..39f97c83 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -9,9 +9,9 @@ use crate::{ }; use super::constants::DATACONTENTTYPE; -use super::{AmqpBody, AmqpCloudEvent, ATTRIBUTE_PREFIX}; +use super::{AmqpBody, AmqpBinding, ATTRIBUTE_PREFIX}; -impl BinarySerializer for AmqpCloudEvent { +impl BinarySerializer for AmqpBinding { fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { let key = String::from("cloudEvents:specversion"); let value = String::from(spec_version.as_str()); @@ -79,7 +79,7 @@ impl BinarySerializer for AmqpCloudEvent { } } -impl StructuredSerializer for AmqpCloudEvent { +impl StructuredSerializer for AmqpBinding { fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); From 508976c547e497f3a2257351ee012ce671165f1e Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 01:42:46 -0700 Subject: [PATCH 11/30] renamed to EventMessage Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/deserializer.rs | 8 ++++---- src/binding/fe2o3_amqp/mod.rs | 12 +++++++----- src/binding/fe2o3_amqp/serializer.rs | 6 +++--- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 41c9e02d..9c0c5f35 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -13,10 +13,10 @@ use crate::{ use super::{ constants::{prefixed, DATACONTENTTYPE}, - AmqpBinding, ATTRIBUTE_PREFIX, + EventMessage, ATTRIBUTE_PREFIX, }; -impl BinaryDeserializer for AmqpBinding { +impl BinaryDeserializer for EventMessage { fn deserialize_binary>( mut self, mut serializer: V, @@ -74,7 +74,7 @@ impl BinaryDeserializer for AmqpBinding { } } -impl StructuredDeserializer for AmqpBinding { +impl StructuredDeserializer for EventMessage { fn deserialize_structured>( self, serializer: V, @@ -89,7 +89,7 @@ impl StructuredDeserializer for AmqpBinding { } } -impl MessageDeserializer for AmqpBinding { +impl MessageDeserializer for EventMessage { fn encoding(&self) -> Encoding { match self .content_type diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 223e174d..08d23f3e 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -37,13 +37,13 @@ pub type Extensions = HashMap; /// message property field. If the value is prefixed with the CloudEvents media type /// application/cloudevents, indicating the use of a known event format, the receiver uses /// structured mode, otherwise it defaults to binary mode. -pub struct AmqpBinding { +pub struct EventMessage { content_type: Option, application_properties: Option, body: AmqpBody, } -impl AmqpBinding { +impl EventMessage { fn new() -> Self { Self { content_type: None, @@ -52,17 +52,19 @@ impl AmqpBinding { } } + /// Create an [`EventMessage`] from an event using a binary serializer pub fn from_binary_event(event: Event) -> Result { BinaryDeserializer::deserialize_binary(event, Self::new()) } + /// Create an [`EventMessage`] from an event using a structured serializer pub fn from_structured_event(event: Event) -> Result { StructuredDeserializer::deserialize_structured(event, Self::new()) } } -impl From for AmqpMessage { - fn from(event: AmqpBinding) -> Self { +impl From for AmqpMessage { + fn from(event: EventMessage) -> Self { let mut properties = Properties::default(); properties.content_type = event.content_type; Message { @@ -77,7 +79,7 @@ impl From for AmqpMessage { } } -impl From for AmqpBinding { +impl From for EventMessage { fn from(message: AmqpMessage) -> Self { let content_type = message.properties.map(|p| p.content_type).flatten(); Self { diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index 39f97c83..66051d42 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -9,9 +9,9 @@ use crate::{ }; use super::constants::DATACONTENTTYPE; -use super::{AmqpBody, AmqpBinding, ATTRIBUTE_PREFIX}; +use super::{AmqpBody, EventMessage, ATTRIBUTE_PREFIX}; -impl BinarySerializer for AmqpBinding { +impl BinarySerializer for EventMessage { fn set_spec_version(mut self, spec_version: SpecVersion) -> crate::message::Result { let key = String::from("cloudEvents:specversion"); let value = String::from(spec_version.as_str()); @@ -79,7 +79,7 @@ impl BinarySerializer for AmqpBinding { } } -impl StructuredSerializer for AmqpBinding { +impl StructuredSerializer for EventMessage { fn set_structured_event(mut self, bytes: Vec) -> crate::message::Result { self.content_type = Some(Symbol::from("application/cloudevents+json; charset=utf-8")); self.body = AmqpBody::Data(AmqpData(Binary::from(bytes))); From 832098c03e2a50863549c529e4a752dec49d3785 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 02:00:23 -0700 Subject: [PATCH 12/30] removed prefix from extension name Signed-off-by: minghuaw --- Cargo.toml | 3 +-- src/binding/fe2o3_amqp/mod.rs | 4 ++-- src/binding/fe2o3_amqp/serializer.rs | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b272905c..b4ea879e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"] axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"] poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"] nats = ["nats-lib"] -fe2o3-amqp = ["fe2o3-amqp-lib", "fe2o3-amqp-types"] +fe2o3-amqp = ["fe2o3-amqp-types"] [dependencies] serde = { version = "^1.0", features = ["derive"] } @@ -53,7 +53,6 @@ axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } -fe2o3-amqp-lib = { version = "0.4.0", optional = true, package = "fe2o3-amqp" } fe2o3-amqp-types = { version = "0.3.4", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 08d23f3e..bfddd140 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -4,8 +4,8 @@ use std::collections::HashMap; use std::convert::TryFrom; use chrono::{TimeZone, Utc}; -use fe2o3_amqp_lib::types::messaging::{ApplicationProperties, Body, Message, Properties}; -use fe2o3_amqp_lib::types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; +use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties}; +use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; use crate::event::{AttributeValue, ExtensionValue}; use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index 66051d42..b412725a 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -60,7 +60,7 @@ impl BinarySerializer for EventMessage { name: &str, value: MessageAttributeValue, ) -> crate::message::Result { - let key = header_prefix(ATTRIBUTE_PREFIX, name); + let key = name.to_string(); let value = SimpleValue::from(value); self.application_properties .get_or_insert(ApplicationProperties::default()) From 3d835a6250064e29ca678395145db822ca7dd295 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 02:55:52 -0700 Subject: [PATCH 13/30] added amqp example Signed-off-by: minghuaw --- .../fe2o3-amqp-example/Cargo.toml | 12 ++++ .../fe2o3-amqp-example/src/main.rs | 56 +++++++++++++++++++ src/binding/fe2o3_amqp/mod.rs | 2 +- 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 example-projects/fe2o3-amqp-example/Cargo.toml create mode 100644 example-projects/fe2o3-amqp-example/src/main.rs diff --git a/example-projects/fe2o3-amqp-example/Cargo.toml b/example-projects/fe2o3-amqp-example/Cargo.toml new file mode 100644 index 00000000..c49e1c15 --- /dev/null +++ b/example-projects/fe2o3-amqp-example/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "fe2o3-amqp-example" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cloudevents-sdk = { path = "../..", features = ["fe2o3-amqp"] } +fe2o3-amqp = "0.4.0" +tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] } +serde_json = "1" \ No newline at end of file diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs new file mode 100644 index 00000000..59c9fad0 --- /dev/null +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -0,0 +1,56 @@ +//! AMQP 1.0 binding example +//! +//! You need a running AMQP 1.0 broker to try out this example. +//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis + +use cloudevents::{binding::fe2o3_amqp::EventMessage, message::BinaryDeserializer, Event, EventBuilderV10, EventBuilder}; +use fe2o3_amqp::{Connection, Sender, Receiver, types::messaging::Message, Session}; +use serde_json::json; + +type BoxError = Box; +type Result = std::result::Result; + +async fn send_event(sender: &mut Sender, i: usize) -> Result<()> { + let event = EventBuilderV10::new() + .id(i.to_string()) + .ty("example.test") + .source("localhost") + .data("application/json", json!({"hello": "world"})) + .build()?; + let event_message = EventMessage::from_binary_event(event)?; + let message = Message::from(event_message); + sender.send(message).await? + .accepted_or("not accepted")?; + Ok(()) +} + +async fn recv_event(receiver: &mut Receiver) -> Result { + use fe2o3_amqp::types::primitives::Value; + + let delivery = receiver.recv::().await?; + receiver.accept(&delivery).await?; + + let event_message = EventMessage::from(delivery.into_message()); + let event = event_message.into_event()?; + Ok(event) +} + +#[tokio::main] +async fn main() { + let mut connection = + Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") + .await + .unwrap(); + let mut session = Session::begin(&mut connection).await.unwrap(); + let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); + let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); + + send_event(&mut sender, 1).await.unwrap(); + let event = recv_event(&mut receiver).await.unwrap(); + println!("{:?}", event); + + sender.close().await.unwrap(); + receiver.close().await.unwrap(); + session.end().await.unwrap(); + connection.close().await.unwrap(); +} diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index bfddd140..628e556c 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -222,4 +222,4 @@ impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { } } } -} +} \ No newline at end of file From 880da6983d5359411daf65919205c68ced7c1fa6 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 03:01:19 -0700 Subject: [PATCH 14/30] removed unused type alias Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 628e556c..4787d22b 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,13 +1,12 @@ //! Implements AMQP 1.0 binding for CloudEvents -use std::collections::HashMap; use std::convert::TryFrom; use chrono::{TimeZone, Utc}; use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties}; use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::event::{AttributeValue, ExtensionValue}; +use crate::event::{AttributeValue}; use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; use crate::Event; @@ -29,10 +28,13 @@ mod constants; /// convenience, this type alias chose `Value` as the value of the generic parameter pub type AmqpMessage = Message; +/// Type alias for an AMQP 1.0 Body +/// +/// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of +/// no importance because all CloudEvents are using the `Body::Data` as the body section type. For +/// convenience, this type alias chose `Value` as the value of the generic parameter pub type AmqpBody = Body; -pub type Extensions = HashMap; - /// The receiver of the event can distinguish between the two modes by inspecting the content-type /// message property field. If the value is prefixed with the CloudEvents media type /// application/cloudevents, indicating the use of a known event format, the receiver uses From b193012953a7806294c48e39b6cab5720a524916 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 03:12:34 -0700 Subject: [PATCH 15/30] fixed example Signed-off-by: minghuaw --- example-projects/fe2o3-amqp-example/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index 59c9fad0..3aa5ed18 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -3,7 +3,7 @@ //! You need a running AMQP 1.0 broker to try out this example. //! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis -use cloudevents::{binding::fe2o3_amqp::EventMessage, message::BinaryDeserializer, Event, EventBuilderV10, EventBuilder}; +use cloudevents::{binding::fe2o3_amqp::EventMessage, Event, EventBuilderV10, EventBuilder, message::MessageDeserializer}; use fe2o3_amqp::{Connection, Sender, Receiver, types::messaging::Message, Session}; use serde_json::json; @@ -17,7 +17,7 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> { .source("localhost") .data("application/json", json!({"hello": "world"})) .build()?; - let event_message = EventMessage::from_binary_event(event)?; + let event_message = EventMessage::from_structured_event(event)?; let message = Message::from(event_message); sender.send(message).await? .accepted_or("not accepted")?; @@ -31,7 +31,7 @@ async fn recv_event(receiver: &mut Receiver) -> Result { receiver.accept(&delivery).await?; let event_message = EventMessage::from(delivery.into_message()); - let event = event_message.into_event()?; + let event = MessageDeserializer::into_event(event_message)?; Ok(event) } From 55c5c8f81b226032cd639a6ee49eb4bfd2c88a04 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 03:35:58 -0700 Subject: [PATCH 16/30] treat missing cloutEvents prefix as extension Signed-off-by: minghuaw --- example-projects/fe2o3-amqp-example/src/main.rs | 17 +++++++++++------ src/binding/fe2o3_amqp/deserializer.rs | 3 +++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index 3aa5ed18..f4fab61c 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -3,8 +3,11 @@ //! You need a running AMQP 1.0 broker to try out this example. //! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis -use cloudevents::{binding::fe2o3_amqp::EventMessage, Event, EventBuilderV10, EventBuilder, message::MessageDeserializer}; -use fe2o3_amqp::{Connection, Sender, Receiver, types::messaging::Message, Session}; +use cloudevents::{ + binding::fe2o3_amqp::EventMessage, message::MessageDeserializer, Event, EventBuilder, + EventBuilderV10, +}; +use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session}; use serde_json::json; type BoxError = Box; @@ -15,12 +18,12 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> { .id(i.to_string()) .ty("example.test") .source("localhost") + .extension("ext-name", "AMQP") .data("application/json", json!({"hello": "world"})) .build()?; - let event_message = EventMessage::from_structured_event(event)?; + let event_message = EventMessage::from_binary_event(event)?; let message = Message::from(event_message); - sender.send(message).await? - .accepted_or("not accepted")?; + sender.send(message).await?.accepted_or("not accepted")?; Ok(()) } @@ -43,7 +46,9 @@ async fn main() { .unwrap(); let mut session = Session::begin(&mut connection).await.unwrap(); let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); - let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); + let mut receiver = Receiver::attach(&mut session, "receiver", "q1") + .await + .unwrap(); send_event(&mut sender, 1).await.unwrap(); let event = recv_event(&mut receiver).await.unwrap(); diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 9c0c5f35..b3d74f35 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -59,6 +59,9 @@ impl BinaryDeserializer for EventMessage { let value = MessageAttributeValue::try_from(value)?; serializer = serializer.set_extension(key, value)?; } + } else { + let value = MessageAttributeValue::try_from(value)?; + serializer = serializer.set_extension(&key, value)?; } } } From 8739e9d206afb9c319d49d3800fbaf93ff5ea03b Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 09:37:44 -0700 Subject: [PATCH 17/30] prefix extension values like standard attr values Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/deserializer.rs | 3 --- src/binding/fe2o3_amqp/serializer.rs | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index b3d74f35..9c0c5f35 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -59,9 +59,6 @@ impl BinaryDeserializer for EventMessage { let value = MessageAttributeValue::try_from(value)?; serializer = serializer.set_extension(key, value)?; } - } else { - let value = MessageAttributeValue::try_from(value)?; - serializer = serializer.set_extension(&key, value)?; } } } diff --git a/src/binding/fe2o3_amqp/serializer.rs b/src/binding/fe2o3_amqp/serializer.rs index b412725a..66051d42 100644 --- a/src/binding/fe2o3_amqp/serializer.rs +++ b/src/binding/fe2o3_amqp/serializer.rs @@ -60,7 +60,7 @@ impl BinarySerializer for EventMessage { name: &str, value: MessageAttributeValue, ) -> crate::message::Result { - let key = name.to_string(); + let key = header_prefix(ATTRIBUTE_PREFIX, name); let value = SimpleValue::from(value); self.application_properties .get_or_insert(ApplicationProperties::default()) From 22a88e8c7a2d56e740e8a5731fda572527d5f6d9 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 09:51:29 -0700 Subject: [PATCH 18/30] updated example Signed-off-by: minghuaw --- .../fe2o3-amqp-example/src/main.rs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index f4fab61c..a639b29c 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -8,18 +8,18 @@ use cloudevents::{ EventBuilderV10, }; use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session}; -use serde_json::json; +use serde_json::{json, from_slice, from_str}; type BoxError = Box; type Result = std::result::Result; -async fn send_event(sender: &mut Sender, i: usize) -> Result<()> { +async fn send_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { let event = EventBuilderV10::new() .id(i.to_string()) .ty("example.test") .source("localhost") .extension("ext-name", "AMQP") - .data("application/json", json!({"hello": "world"})) + .data("application/json", value) .build()?; let event_message = EventMessage::from_binary_event(event)?; let message = Message::from(event_message); @@ -28,9 +28,7 @@ async fn send_event(sender: &mut Sender, i: usize) -> Result<()> { } async fn recv_event(receiver: &mut Receiver) -> Result { - use fe2o3_amqp::types::primitives::Value; - - let delivery = receiver.recv::().await?; + let delivery = receiver.recv().await?; receiver.accept(&delivery).await?; let event_message = EventMessage::from(delivery.into_message()); @@ -50,9 +48,16 @@ async fn main() { .await .unwrap(); - send_event(&mut sender, 1).await.unwrap(); + let expected = json!({"hello": "world"}); + send_event(&mut sender, 1, expected.clone()).await.unwrap(); let event = recv_event(&mut receiver).await.unwrap(); - println!("{:?}", event); + let data: serde_json::Value = match event.data().unwrap() { + cloudevents::Data::Binary(bytes) => from_slice(bytes).unwrap(), + cloudevents::Data::String(s) => from_str(s).unwrap(), + cloudevents::Data::Json(value) => value.clone(), + }; + + assert_eq!(data, expected); sender.close().await.unwrap(); receiver.close().await.unwrap(); From 1188f17aea27c0e2c8889a80f7a4b284ffe942f5 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:08:21 -0700 Subject: [PATCH 19/30] added assertions in example Signed-off-by: minghuaw --- .../fe2o3-amqp-example/src/main.rs | 65 ++++++++++++++++--- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index a639b29c..286397c9 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -5,7 +5,7 @@ use cloudevents::{ binding::fe2o3_amqp::EventMessage, message::MessageDeserializer, Event, EventBuilder, - EventBuilderV10, + EventBuilderV10, AttributesReader, event::ExtensionValue, }; use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session}; use serde_json::{json, from_slice, from_str}; @@ -13,7 +13,26 @@ use serde_json::{json, from_slice, from_str}; type BoxError = Box; type Result = std::result::Result; -async fn send_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { +const EXAMPLE_TYPE: &str = "example.test"; +const EXAMPLE_SOURCE: &str = "localhost"; +const EXTENSION_NAME: &str = "ext-name"; +const EXTENSION_VALUE: &str = "AMQP"; + +async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { + let event = EventBuilderV10::new() + .id(i.to_string()) + .ty(EXAMPLE_TYPE) + .source(EXAMPLE_SOURCE) + .extension(EXTENSION_NAME, EXTENSION_VALUE) + .data("application/json", value) + .build()?; + let event_message = EventMessage::from_binary_event(event)?; + let message = Message::from(event_message); + sender.send(message).await?.accepted_or("not accepted")?; + Ok(()) +} + +async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { let event = EventBuilderV10::new() .id(i.to_string()) .ty("example.test") @@ -21,7 +40,7 @@ async fn send_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> .extension("ext-name", "AMQP") .data("application/json", value) .build()?; - let event_message = EventMessage::from_binary_event(event)?; + let event_message = EventMessage::from_structured_event(event)?; let message = Message::from(event_message); sender.send(message).await?.accepted_or("not accepted")?; Ok(()) @@ -36,6 +55,15 @@ async fn recv_event(receiver: &mut Receiver) -> Result { Ok(event) } +fn convert_data_into_json_value(data: &cloudevents::Data) -> Result { + let value = match data { + cloudevents::Data::Binary(bytes) => from_slice(bytes)?, + cloudevents::Data::String(s) => from_str(s)?, + cloudevents::Data::Json(value) => value.clone(), + }; + Ok(value) +} + #[tokio::main] async fn main() { let mut connection = @@ -49,15 +77,32 @@ async fn main() { .unwrap(); let expected = json!({"hello": "world"}); - send_event(&mut sender, 1, expected.clone()).await.unwrap(); + + // Binary content mode + send_binary_event(&mut sender, 1, expected.clone()).await.unwrap(); let event = recv_event(&mut receiver).await.unwrap(); - let data: serde_json::Value = match event.data().unwrap() { - cloudevents::Data::Binary(bytes) => from_slice(bytes).unwrap(), - cloudevents::Data::String(s) => from_str(s).unwrap(), - cloudevents::Data::Json(value) => value.clone(), - }; + let value = convert_data_into_json_value(event.data().unwrap()).unwrap(); + assert_eq!(event.id(), "1"); + assert_eq!(event.ty(), EXAMPLE_TYPE); + assert_eq!(event.source(), EXAMPLE_SOURCE); + match event.extension(EXTENSION_NAME).unwrap() { + ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE), + _ => panic!("Expect a String"), + } + assert_eq!(value, expected); - assert_eq!(data, expected); + // Structured content mode + send_structured_event(&mut sender, 2, expected.clone()).await.unwrap(); + let event = recv_event(&mut receiver).await.unwrap(); + let value = convert_data_into_json_value(event.data().unwrap()).unwrap(); + assert_eq!(event.id(), "2"); + assert_eq!(event.ty(), EXAMPLE_TYPE); + assert_eq!(event.source(), EXAMPLE_SOURCE); + match event.extension(EXTENSION_NAME).unwrap() { + ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE), + _ => panic!("Expect a String"), + } + assert_eq!(value, expected); sender.close().await.unwrap(); receiver.close().await.unwrap(); From f7d3b8696be2e5637085ce200bf84aa061e812f2 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:15:49 -0700 Subject: [PATCH 20/30] updated docs Signed-off-by: minghuaw --- .../fe2o3-amqp-example/src/main.rs | 8 ++-- src/binding/fe2o3_amqp/mod.rs | 39 ++++++++++++++----- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index 286397c9..025e96b8 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -4,10 +4,10 @@ //! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis use cloudevents::{ - binding::fe2o3_amqp::EventMessage, message::MessageDeserializer, Event, EventBuilder, + binding::fe2o3_amqp::{EventMessage, AmqpMessage}, message::MessageDeserializer, Event, EventBuilder, EventBuilderV10, AttributesReader, event::ExtensionValue, }; -use fe2o3_amqp::{types::messaging::Message, Connection, Receiver, Sender, Session}; +use fe2o3_amqp::{Connection, Receiver, Sender, Session}; use serde_json::{json, from_slice, from_str}; type BoxError = Box; @@ -27,7 +27,7 @@ async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Val .data("application/json", value) .build()?; let event_message = EventMessage::from_binary_event(event)?; - let message = Message::from(event_message); + let message = AmqpMessage::from(event_message); sender.send(message).await?.accepted_or("not accepted")?; Ok(()) } @@ -41,7 +41,7 @@ async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json: .data("application/json", value) .build()?; let event_message = EventMessage::from_structured_event(event)?; - let message = Message::from(event_message); + let message = AmqpMessage::from(event_message); sender.send(message).await?.accepted_or("not accepted")?; Ok(()) } diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 4787d22b..c980fc64 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -25,24 +25,45 @@ mod constants; /// /// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of /// no importance because all CloudEvents are using the `Body::Data` as the body section type. For -/// convenience, this type alias chose `Value` as the value of the generic parameter +/// convenience, this type alias chooses `Value` as the value of the generic parameter pub type AmqpMessage = Message; /// Type alias for an AMQP 1.0 Body /// /// The generic parameter can be anything that implements `Serialize` and `Deserialize` but is of /// no importance because all CloudEvents are using the `Body::Data` as the body section type. For -/// convenience, this type alias chose `Value` as the value of the generic parameter +/// convenience, this type alias chooses `Value` as the value of the generic parameter pub type AmqpBody = Body; -/// The receiver of the event can distinguish between the two modes by inspecting the content-type -/// message property field. If the value is prefixed with the CloudEvents media type -/// application/cloudevents, indicating the use of a known event format, the receiver uses -/// structured mode, otherwise it defaults to binary mode. +/// This struct contains the necessary fields required for AMQP 1.0 binding. +/// It provides conversion between [`Event`] and [`AmqpMessage`] +/// +/// # Examples +/// +/// ## [`Event`] -> [`AmqpMessage`] in binary content mode +/// +/// ```rust +/// let event_message = EventMessage::from_binary_event(event).unwrap(); +/// let amqp_message = AmqpMessage:from(event_message); +/// ``` +/// +/// ## [`Event`] -> [`AmqpMessage`] in structured content mode +/// +/// ```rust +/// let event_message = EventMessage::from_structured_event(event).unwrap(); +/// let amqp_message = AmqpMessage:from(event_message); +/// ``` +/// +/// ## [`AmqpMessage`] -> [`Event`] +/// +/// ```rust +/// let event_message = EventMessage::from(amqp_message); +/// let event = MessageDeserializer::into_event(event_message).unwrap(); +/// ``` pub struct EventMessage { - content_type: Option, - application_properties: Option, - body: AmqpBody, + pub content_type: Option, + pub application_properties: Option, + pub body: AmqpBody, } impl EventMessage { From c26131f8cba9ef4ded92ba76be231c00fe14d89a Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:44:17 -0700 Subject: [PATCH 21/30] updated mod level doc Signed-off-by: minghuaw --- .../fe2o3-amqp-example/src/main.rs | 6 +- src/binding/fe2o3_amqp/mod.rs | 71 ++++++++++++++++++- src/binding/mod.rs | 1 + src/lib.rs | 1 + 4 files changed, 75 insertions(+), 4 deletions(-) diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index 025e96b8..498673dc 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -35,9 +35,9 @@ async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Val async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> { let event = EventBuilderV10::new() .id(i.to_string()) - .ty("example.test") - .source("localhost") - .extension("ext-name", "AMQP") + .ty(EXAMPLE_TYPE) + .source(EXAMPLE_SOURCE) + .extension(EXTENSION_NAME, EXTENSION_VALUE) .data("application/json", value) .build()?; let event_message = EventMessage::from_structured_event(event)?; diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index c980fc64..b26cf288 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,4 +1,73 @@ -//! Implements AMQP 1.0 binding for CloudEvents +//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with +//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receiver CloudEvents +//! +//! To send CloudEvents +//! +//! ```rust +//! use fe2o3_amqp::{Connection, Sender, Session}; +//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; +//! +//! // You need a running AMQP 1.0 broker to try out this example. +//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis +//! +//! #[tokio::main] +//! async fn main() { +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); +//! +//! let event = EventBuilderV10::new() +//! .id(i.to_string()) +//! .ty("example.test") +//! .source("localhost") +//! .extension("ext-name", "AMQP") +//! .data("application/json", value) +//! .build() +//! .unwrap(); +//! +//! let event_message = EventMessage::from_binary_event(event).unwrap(); +//! let message = AmqpMessage::from(event_message); +//! sender.send(message).await.unwrap() +//! .accepted_or("not accepted").unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); +//! } +//! ``` +//! +//! To receiver CloudEvents +//! +//! ```rust +//! use fe2o3_amqp::{Connection, Receiver, Session}; +//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; +//! +//! // You need a running AMQP 1.0 broker to try out this example. +//! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis +//! +//! #[tokio::main] +//! async fn main() { +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); +//! +//! let delivery = receiver.recv().await.unwrap(); +//! receiver.accept(&delivery).await.unwrap(); +//! +//! let event_message = EventMessage::from(delivery.into_message()); +//! let event = MessageDeserializer::into_event(event_message).unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); +//! } +//! ``` use std::convert::TryFrom; diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 6eae5eb6..48de36f7 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -4,6 +4,7 @@ pub mod actix; #[cfg(feature = "axum")] pub mod axum; +#[cfg_attr(docrs, doc(cfg(feature = "fe2o3-amqp")))] #[cfg(feature = "fe2o3-amqp")] pub mod fe2o3_amqp; #[cfg(any( diff --git a/src/lib.rs b/src/lib.rs index a1837528..4f2a6de8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,7 @@ #![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.5.0")] #![deny(rustdoc::broken_intra_doc_links)] +#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc pub mod binding; pub mod event; From c477db43052dc11ae5f41294473380dae89dc75d Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 10:55:11 -0700 Subject: [PATCH 22/30] updated mod level doc Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 82 +++++++++++++++++------------------ src/binding/mod.rs | 1 - src/lib.rs | 1 - 3 files changed, 41 insertions(+), 43 deletions(-) diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index b26cf288..9d2cf7b9 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,5 +1,5 @@ //! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with -//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receiver CloudEvents +//! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents //! //! To send CloudEvents //! @@ -12,30 +12,30 @@ //! //! #[tokio::main] //! async fn main() { -//! let mut connection = -//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") -//! .await -//! .unwrap(); -//! let mut session = Session::begin(&mut connection).await.unwrap(); -//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); -//! -//! let event = EventBuilderV10::new() -//! .id(i.to_string()) -//! .ty("example.test") -//! .source("localhost") -//! .extension("ext-name", "AMQP") -//! .data("application/json", value) -//! .build() -//! .unwrap(); -//! -//! let event_message = EventMessage::from_binary_event(event).unwrap(); -//! let message = AmqpMessage::from(event_message); -//! sender.send(message).await.unwrap() -//! .accepted_or("not accepted").unwrap(); -//! -//! sender.close().await.unwrap(); -//! session.end().await.unwrap(); -//! connection.close().await.unwrap(); +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); +//! +//! let event = EventBuilderV10::new() +//! .id(i.to_string()) +//! .ty("example.test") +//! .source("localhost") +//! .extension("ext-name", "AMQP") +//! .data("application/json", value) +//! .build() +//! .unwrap(); +//! +//! let event_message = EventMessage::from_binary_event(event).unwrap(); +//! let message = AmqpMessage::from(event_message); +//! sender.send(message).await.unwrap() +//! .accepted_or("not accepted").unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); //! } //! ``` //! @@ -50,22 +50,22 @@ //! //! #[tokio::main] //! async fn main() { -//! let mut connection = -//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") -//! .await -//! .unwrap(); -//! let mut session = Session::begin(&mut connection).await.unwrap(); -//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); -//! -//! let delivery = receiver.recv().await.unwrap(); -//! receiver.accept(&delivery).await.unwrap(); -//! -//! let event_message = EventMessage::from(delivery.into_message()); -//! let event = MessageDeserializer::into_event(event_message).unwrap(); -//! -//! sender.close().await.unwrap(); -//! session.end().await.unwrap(); -//! connection.close().await.unwrap(); +//! let mut connection = +//! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") +//! .await +//! .unwrap(); +//! let mut session = Session::begin(&mut connection).await.unwrap(); +//! let mut receiver = Receiver::attach(&mut session, "receiver", "q1").await.unwrap(); +//! +//! let delivery = receiver.recv().await.unwrap(); +//! receiver.accept(&delivery).await.unwrap(); +//! +//! let event_message = EventMessage::from(delivery.into_message()); +//! let event = MessageDeserializer::into_event(event_message).unwrap(); +//! +//! sender.close().await.unwrap(); +//! session.end().await.unwrap(); +//! connection.close().await.unwrap(); //! } //! ``` diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 48de36f7..6eae5eb6 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -4,7 +4,6 @@ pub mod actix; #[cfg(feature = "axum")] pub mod axum; -#[cfg_attr(docrs, doc(cfg(feature = "fe2o3-amqp")))] #[cfg(feature = "fe2o3-amqp")] pub mod fe2o3_amqp; #[cfg(any( diff --git a/src/lib.rs b/src/lib.rs index 4f2a6de8..a1837528 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,6 @@ #![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.5.0")] #![deny(rustdoc::broken_intra_doc_links)] -#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc pub mod binding; pub mod event; From d8df9c03c906d2fa3220b4f8d27373f6b79837e4 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 12:28:55 -0700 Subject: [PATCH 23/30] cargo fmt Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 36 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 9d2cf7b9..e4381580 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -1,15 +1,15 @@ -//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with +//! This module integrated the [cloudevents-sdk](https://docs.rs/cloudevents-sdk) with //! [fe2o3-amqp](https://docs.rs/fe2o3-amqp/) to easily send and receive CloudEvents -//! +//! //! To send CloudEvents -//! +//! //! ```rust //! use fe2o3_amqp::{Connection, Sender, Session}; //! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; -//! +//! //! // You need a running AMQP 1.0 broker to try out this example. //! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis -//! +//! //! #[tokio::main] //! async fn main() { //! let mut connection = @@ -38,16 +38,16 @@ //! connection.close().await.unwrap(); //! } //! ``` -//! +//! //! To receiver CloudEvents -//! +//! //! ```rust //! use fe2o3_amqp::{Connection, Receiver, Session}; //! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; -//! +//! //! // You need a running AMQP 1.0 broker to try out this example. //! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis -//! +//! //! #[tokio::main] //! async fn main() { //! let mut connection = @@ -75,7 +75,7 @@ use chrono::{TimeZone, Utc}; use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Message, Properties}; use fe2o3_amqp_types::primitives::{Binary, SimpleValue, Symbol, Timestamp, Value}; -use crate::event::{AttributeValue}; +use crate::event::AttributeValue; use crate::message::{BinaryDeserializer, Error, MessageAttributeValue, StructuredDeserializer}; use crate::Event; @@ -106,25 +106,25 @@ pub type AmqpBody = Body; /// This struct contains the necessary fields required for AMQP 1.0 binding. /// It provides conversion between [`Event`] and [`AmqpMessage`] -/// +/// /// # Examples -/// +/// /// ## [`Event`] -> [`AmqpMessage`] in binary content mode -/// +/// /// ```rust /// let event_message = EventMessage::from_binary_event(event).unwrap(); /// let amqp_message = AmqpMessage:from(event_message); /// ``` -/// +/// /// ## [`Event`] -> [`AmqpMessage`] in structured content mode -/// +/// /// ```rust /// let event_message = EventMessage::from_structured_event(event).unwrap(); /// let amqp_message = AmqpMessage:from(event_message); /// ``` -/// +/// /// ## [`AmqpMessage`] -> [`Event`] -/// +/// /// ```rust /// let event_message = EventMessage::from(amqp_message); /// let event = MessageDeserializer::into_event(event_message).unwrap(); @@ -314,4 +314,4 @@ impl<'a> TryFrom<(&'a str, SimpleValue)> for MessageAttributeValue { } } } -} \ No newline at end of file +} From 2ab34e4a97cc5d53bd4002308f412bae16ba7630 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 18 Aug 2022 12:33:59 -0700 Subject: [PATCH 24/30] fixed cargo fmt check and clippy warnings Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index e4381580..3dcdb2dc 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -157,8 +157,10 @@ impl EventMessage { impl From for AmqpMessage { fn from(event: EventMessage) -> Self { - let mut properties = Properties::default(); - properties.content_type = event.content_type; + let properties = Properties { + content_type: event.content_type, + ..Default::default() + }; Message { header: None, delivery_annotations: None, @@ -173,7 +175,7 @@ impl From for AmqpMessage { impl From for EventMessage { fn from(message: AmqpMessage) -> Self { - let content_type = message.properties.map(|p| p.content_type).flatten(); + let content_type = message.properties.and_then(|p| p.content_type); Self { content_type, application_properties: message.application_properties, @@ -223,9 +225,9 @@ impl<'a> From> for Value { impl From for SimpleValue { fn from(value: MessageAttributeValue) -> Self { match value { - MessageAttributeValue::String(s) => SimpleValue::String(String::from(s)), + MessageAttributeValue::String(s) => SimpleValue::String(s), MessageAttributeValue::Uri(uri) => SimpleValue::String(String::from(uri.as_str())), - MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri.clone()), + MessageAttributeValue::UriRef(uri) => SimpleValue::String(uri), MessageAttributeValue::Boolean(val) => SimpleValue::Bool(val), MessageAttributeValue::Integer(val) => SimpleValue::Long(val), MessageAttributeValue::DateTime(datetime) => { @@ -241,9 +243,9 @@ impl From for SimpleValue { impl From for Value { fn from(value: MessageAttributeValue) -> Self { match value { - MessageAttributeValue::String(s) => Value::String(String::from(s)), + MessageAttributeValue::String(s) => Value::String(s), MessageAttributeValue::Uri(uri) => Value::String(String::from(uri.as_str())), - MessageAttributeValue::UriRef(uri) => Value::String(uri.clone()), + MessageAttributeValue::UriRef(uri) => Value::String(uri), MessageAttributeValue::Boolean(val) => Value::Bool(val), MessageAttributeValue::Integer(val) => Value::Long(val), MessageAttributeValue::DateTime(datetime) => { From 730b94ed9fb5e59d8f6d03048e37c9c44396e1a6 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Wed, 24 Aug 2022 14:30:43 -0700 Subject: [PATCH 25/30] Show feature gated bindings in documentaion (#187) * show feature gated bindings in docsrs * moved crate root docsrs feature * fixed cargo fmt check * removed files that should go with another PR Signed-off-by: minghuaw --- Cargo.toml | 5 +++++ src/binding/mod.rs | 19 +++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 25 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index b4ea879e..c7ff12af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,11 @@ exclude = [ ] categories = ["web-programming", "encoding", "data-structures"] +# Enable all features when building on docs.rs to show feature gated bindings +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [lib] name = "cloudevents" diff --git a/src/binding/mod.rs b/src/binding/mod.rs index 6eae5eb6..f57ff3e7 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -1,11 +1,25 @@ //! Provides protocol binding implementations for [`crate::Event`]. +#[cfg_attr(docsrs, doc(cfg(feature = "actix")))] #[cfg(feature = "actix")] pub mod actix; +#[cfg_attr(docsrs, doc(cfg(feature = "axum")))] #[cfg(feature = "axum")] pub mod axum; #[cfg(feature = "fe2o3-amqp")] pub mod fe2o3_amqp; + +#[cfg_attr( + docsrs, + doc(cfg(any( + feature = "http-binding", + feature = "actix", + feature = "warp", + feature = "reqwest", + feature = "axum", + feature = "poem" + ))) +)] #[cfg(any( feature = "http-binding", feature = "actix", @@ -15,14 +29,19 @@ pub mod fe2o3_amqp; feature = "poem" ))] pub mod http; +#[cfg_attr(docsrs, doc(cfg(feature = "nats")))] #[cfg(feature = "nats")] pub mod nats; +#[cfg_attr(docsrs, doc(cfg(feature = "poem")))] #[cfg(feature = "poem")] pub mod poem; +#[cfg_attr(docsrs, doc(cfg(feature = "rdkafka")))] #[cfg(feature = "rdkafka")] pub mod rdkafka; +#[cfg_attr(docsrs, doc(cfg(feature = "reqwest")))] #[cfg(feature = "reqwest")] pub mod reqwest; +#[cfg_attr(docsrs, doc(cfg(feature = "warp")))] #[cfg(feature = "warp")] pub mod warp; diff --git a/src/lib.rs b/src/lib.rs index a1837528..4f2a6de8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,6 +56,7 @@ #![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.5.0")] #![deny(rustdoc::broken_intra_doc_links)] +#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc pub mod binding; pub mod event; From 04362098197d5e922d067e3907b7bd12f71afb92 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Fri, 26 Aug 2022 00:33:39 -0700 Subject: [PATCH 26/30] fixed doctest Signed-off-by: minghuaw --- Cargo.toml | 1 + src/binding/fe2o3_amqp/mod.rs | 73 +++++++++++++++++++++++++++-------- src/binding/mod.rs | 1 + 3 files changed, 59 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c7ff12af..ff59c395 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,3 +81,4 @@ mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } mime = "0.3" tower = { version = "0.4", features = ["util"] } +fe2o3-amqp = { version = "0.4" } diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 3dcdb2dc..267bf149 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -4,14 +4,17 @@ //! To send CloudEvents //! //! ```rust +//! use serde_json::json; //! use fe2o3_amqp::{Connection, Sender, Session}; -//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; +//! use cloudevents::{ +//! EventBuilder, EventBuilderV10, +//! binding::fe2o3_amqp::{EventMessage, AmqpMessage} +//! }; //! //! // You need a running AMQP 1.0 broker to try out this example. //! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis //! -//! #[tokio::main] -//! async fn main() { +//! # async fn send_event() { //! let mut connection = //! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") //! .await @@ -20,11 +23,11 @@ //! let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap(); //! //! let event = EventBuilderV10::new() -//! .id(i.to_string()) +//! .id("1") //! .ty("example.test") //! .source("localhost") //! .extension("ext-name", "AMQP") -//! .data("application/json", value) +//! .data("application/json", json!({"hello": "world"})) //! .build() //! .unwrap(); //! @@ -36,20 +39,22 @@ //! sender.close().await.unwrap(); //! session.end().await.unwrap(); //! connection.close().await.unwrap(); -//! } +//! # } //! ``` //! //! To receiver CloudEvents //! //! ```rust //! use fe2o3_amqp::{Connection, Receiver, Session}; -//! use cloudevents::{EventBuilderV10, binding::fe2o3_amqp::{EventMessage, AmqpMessage}}; +//! use cloudevents::{ +//! EventBuilderV10, message::MessageDeserializer, +//! binding::fe2o3_amqp::{EventMessage, AmqpMessage} +//! }; //! //! // You need a running AMQP 1.0 broker to try out this example. //! // With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis //! -//! #[tokio::main] -//! async fn main() { +//! # async fn receive_event() { //! let mut connection = //! Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672") //! .await @@ -60,13 +65,14 @@ //! let delivery = receiver.recv().await.unwrap(); //! receiver.accept(&delivery).await.unwrap(); //! -//! let event_message = EventMessage::from(delivery.into_message()); +//! let message: AmqpMessage = delivery.into_message(); +//! let event_message = EventMessage::from(message); //! let event = MessageDeserializer::into_event(event_message).unwrap(); //! -//! sender.close().await.unwrap(); +//! receiver.close().await.unwrap(); //! session.end().await.unwrap(); //! connection.close().await.unwrap(); -//! } +//! # } //! ``` use std::convert::TryFrom; @@ -112,22 +118,57 @@ pub type AmqpBody = Body; /// ## [`Event`] -> [`AmqpMessage`] in binary content mode /// /// ```rust +/// use serde_json::json; +/// use fe2o3_amqp_types::messaging::Message; +/// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage}; +/// +/// let event = EventBuilderV10::new() +/// .id("1") +/// .ty("example.test") +/// .source("localhost") +/// .extension("ext-name", "AMQP") +/// .data("application/json", json!({"hello": "world"})) +/// .build() +/// .unwrap(); /// let event_message = EventMessage::from_binary_event(event).unwrap(); -/// let amqp_message = AmqpMessage:from(event_message); +/// let amqp_message = Message::from(event_message); /// ``` /// /// ## [`Event`] -> [`AmqpMessage`] in structured content mode /// /// ```rust +/// use serde_json::json; +/// use fe2o3_amqp_types::messaging::Message; +/// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage}; +/// +/// let event = EventBuilderV10::new() +/// .id("1") +/// .ty("example.test") +/// .source("localhost") +/// .extension("ext-name", "AMQP") +/// .data("application/json", json!({"hello": "world"})) +/// .build() +/// .unwrap(); /// let event_message = EventMessage::from_structured_event(event).unwrap(); -/// let amqp_message = AmqpMessage:from(event_message); +/// let amqp_message = Message::from(event_message); /// ``` /// /// ## [`AmqpMessage`] -> [`Event`] /// /// ```rust -/// let event_message = EventMessage::from(amqp_message); -/// let event = MessageDeserializer::into_event(event_message).unwrap(); +/// use fe2o3_amqp::Receiver; +/// use cloudevents::{ +/// message::MessageDeserializer, +/// binding::fe2o3_amqp::{AmqpMessage, EventMessage} +/// }; +/// +/// # async fn receive_event(receiver: &mut Receiver) { +/// let delivery = receiver.recv().await.unwrap(); +/// receiver.accept(&delivery).await.unwrap(); +/// let amqp_message: AmqpMessage = delivery.into_message(); +/// let event_message = EventMessage::from(amqp_message); +/// let event = MessageDeserializer::into_event(event_message).unwrap(); +/// # } /// ``` pub struct EventMessage { pub content_type: Option, diff --git a/src/binding/mod.rs b/src/binding/mod.rs index f57ff3e7..9c2736f4 100644 --- a/src/binding/mod.rs +++ b/src/binding/mod.rs @@ -6,6 +6,7 @@ pub mod actix; #[cfg_attr(docsrs, doc(cfg(feature = "axum")))] #[cfg(feature = "axum")] pub mod axum; +#[cfg_attr(docsrs, doc(cfg(feature = "fe2o3-amqp")))] #[cfg(feature = "fe2o3-amqp")] pub mod fe2o3_amqp; From f623f15acf2e2ef1123ef9e69bc6ede16e559644 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Fri, 26 Aug 2022 00:35:40 -0700 Subject: [PATCH 27/30] cargo fix --all && cargo fmt --all Signed-off-by: minghuaw --- src/binding/fe2o3_amqp/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index 267bf149..e326820e 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -7,7 +7,7 @@ //! use serde_json::json; //! use fe2o3_amqp::{Connection, Sender, Session}; //! use cloudevents::{ -//! EventBuilder, EventBuilderV10, +//! EventBuilder, EventBuilderV10, //! binding::fe2o3_amqp::{EventMessage, AmqpMessage} //! }; //! @@ -121,7 +121,7 @@ pub type AmqpBody = Body; /// use serde_json::json; /// use fe2o3_amqp_types::messaging::Message; /// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage}; -/// +/// /// let event = EventBuilderV10::new() /// .id("1") /// .ty("example.test") @@ -140,7 +140,7 @@ pub type AmqpBody = Body; /// use serde_json::json; /// use fe2o3_amqp_types::messaging::Message; /// use cloudevents::{EventBuilder, EventBuilderV10, binding::fe2o3_amqp::EventMessage}; -/// +/// /// let event = EventBuilderV10::new() /// .id("1") /// .ty("example.test") @@ -161,7 +161,7 @@ pub type AmqpBody = Body; /// message::MessageDeserializer, /// binding::fe2o3_amqp::{AmqpMessage, EventMessage} /// }; -/// +/// /// # async fn receive_event(receiver: &mut Receiver) { /// let delivery = receiver.recv().await.unwrap(); /// receiver.accept(&delivery).await.unwrap(); From 18385a6aa3f24d4849123818dd348a5fb5102b30 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Sun, 28 Aug 2022 04:50:04 -0700 Subject: [PATCH 28/30] updated fe2o3-amqp to latest version Signed-off-by: minghuaw --- Cargo.toml | 4 ++-- example-projects/fe2o3-amqp-example/Cargo.toml | 2 +- example-projects/fe2o3-amqp-example/src/main.rs | 8 +++----- src/binding/fe2o3_amqp/deserializer.rs | 4 ++-- src/binding/fe2o3_amqp/mod.rs | 2 +- 5 files changed, 9 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ff59c395..fd05552d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } -fe2o3-amqp-types = { version = "0.3.4", optional = true } +fe2o3-amqp-types = { version = "0.4.0", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -81,4 +81,4 @@ mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } mime = "0.3" tower = { version = "0.4", features = ["util"] } -fe2o3-amqp = { version = "0.4" } +fe2o3-amqp = { version = "0.5.1" } diff --git a/example-projects/fe2o3-amqp-example/Cargo.toml b/example-projects/fe2o3-amqp-example/Cargo.toml index c49e1c15..ae8d8b46 100644 --- a/example-projects/fe2o3-amqp-example/Cargo.toml +++ b/example-projects/fe2o3-amqp-example/Cargo.toml @@ -7,6 +7,6 @@ edition = "2021" [dependencies] cloudevents-sdk = { path = "../..", features = ["fe2o3-amqp"] } -fe2o3-amqp = "0.4.0" +fe2o3-amqp = "0.5.1" tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] } serde_json = "1" \ No newline at end of file diff --git a/example-projects/fe2o3-amqp-example/src/main.rs b/example-projects/fe2o3-amqp-example/src/main.rs index 498673dc..5ee6f50f 100644 --- a/example-projects/fe2o3-amqp-example/src/main.rs +++ b/example-projects/fe2o3-amqp-example/src/main.rs @@ -4,7 +4,7 @@ //! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis use cloudevents::{ - binding::fe2o3_amqp::{EventMessage, AmqpMessage}, message::MessageDeserializer, Event, EventBuilder, + binding::fe2o3_amqp::{EventMessage}, message::MessageDeserializer, Event, EventBuilder, EventBuilderV10, AttributesReader, event::ExtensionValue, }; use fe2o3_amqp::{Connection, Receiver, Sender, Session}; @@ -27,8 +27,7 @@ async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Val .data("application/json", value) .build()?; let event_message = EventMessage::from_binary_event(event)?; - let message = AmqpMessage::from(event_message); - sender.send(message).await?.accepted_or("not accepted")?; + sender.send(event_message).await?.accepted_or("not accepted")?; Ok(()) } @@ -41,8 +40,7 @@ async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json: .data("application/json", value) .build()?; let event_message = EventMessage::from_structured_event(event)?; - let message = AmqpMessage::from(event_message); - sender.send(message).await?.accepted_or("not accepted")?; + sender.send(event_message).await?.accepted_or("not accepted")?; Ok(()) } diff --git a/src/binding/fe2o3_amqp/deserializer.rs b/src/binding/fe2o3_amqp/deserializer.rs index 9c0c5f35..3590e56c 100644 --- a/src/binding/fe2o3_amqp/deserializer.rs +++ b/src/binding/fe2o3_amqp/deserializer.rs @@ -68,7 +68,7 @@ impl BinaryDeserializer for EventMessage { let bytes = data.0.into_vec(); serializer.end_with_data(bytes) } - Body::Nothing => serializer.end(), + Body::Empty => serializer.end(), Body::Sequence(_) | Body::Value(_) => Err(Error::WrongEncoding {}), } } @@ -82,7 +82,7 @@ impl StructuredDeserializer for EventMessage { use fe2o3_amqp_types::messaging::Body; let bytes = match self.body { Body::Data(data) => data.0.into_vec(), - Body::Nothing => vec![], + Body::Empty => vec![], Body::Sequence(_) | Body::Value(_) => return Err(Error::WrongEncoding {}), }; serializer.set_structured_event(bytes) diff --git a/src/binding/fe2o3_amqp/mod.rs b/src/binding/fe2o3_amqp/mod.rs index e326820e..c696b06d 100644 --- a/src/binding/fe2o3_amqp/mod.rs +++ b/src/binding/fe2o3_amqp/mod.rs @@ -181,7 +181,7 @@ impl EventMessage { Self { content_type: None, application_properties: None, - body: Body::Nothing, + body: Body::Empty, } } From 26b41f1dbe70d4c08e290c5fbd9552ab653206ff Mon Sep 17 00:00:00 2001 From: minghuaw Date: Tue, 30 Aug 2022 13:31:35 -0700 Subject: [PATCH 29/30] updated fe2o3-amqp-types Signed-off-by: minghuaw --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fd05552d..b1a9b77b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } -fe2o3-amqp-types = { version = "0.4.0", optional = true } +fe2o3-amqp-types = { version = "0.4.1", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" From 695abf4425c22115040a87e4902ccbd644173781 Mon Sep 17 00:00:00 2001 From: minghuaw Date: Thu, 1 Sep 2022 05:56:12 -0700 Subject: [PATCH 30/30] updated AMQP deps Signed-off-by: minghuaw --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b1a9b77b..8d0e1e21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"} http-body = { version = "^0.4", optional = true } poem-lib = { version = "=1.2.34", optional = true, package = "poem" } nats-lib = { version = "0.21.0", optional = true, package = "nats" } -fe2o3-amqp-types = { version = "0.4.1", optional = true } +fe2o3-amqp-types = { version = "0.5.1", optional = true } [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -81,4 +81,4 @@ mockito = "0.25.1" tokio = { version = "^1.0", features = ["full"] } mime = "0.3" tower = { version = "0.4", features = ["util"] } -fe2o3-amqp = { version = "0.5.1" } +fe2o3-amqp = { version = "0.6.1" }