Skip to content

Commit

Permalink
adding metadata to mqtt connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul committed Oct 30, 2024
1 parent 28360fe commit e95d8b6
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
34 changes: 30 additions & 4 deletions crates/arroyo-connectors/src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl Connector for MqttConnector {
config: MqttConfig,
table: MqttTable,
schema: Option<&ConnectionSchema>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let (typ, desc) = match table.type_ {
TableType::Source { .. } => (
Expand All @@ -178,14 +178,21 @@ impl Connector for MqttConnector {
.map(|t| t.to_owned())
.ok_or_else(|| anyhow!("'format' must be set for Mqtt connection"))?;

let metadata_fields = metadata_fields.map(|fields| {
fields
.into_iter()
.map(|(k, (v, _))| (k, v))
.collect::<HashMap<String, String>>()
});

let config = OperatorConfig {
connection: serde_json::to_value(config).unwrap(),
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
additional_fields: None,
additional_fields: metadata_fields,
};

Ok(Connection {
Expand Down Expand Up @@ -246,7 +253,7 @@ impl Connector for MqttConnector {
options: &mut HashMap<String, String>,
schema: Option<&ConnectionSchema>,
profile: Option<&ConnectionProfile>,
_metadata_fields: Option<HashMap<String, (String, DataType)>>,
metadata_fields: Option<HashMap<String, (String, DataType)>>,
) -> anyhow::Result<Connection> {
let connection = profile
.map(|p| {
Expand All @@ -258,7 +265,25 @@ impl Connector for MqttConnector {

let table = Self::table_from_options(options)?;

Self::from_config(self, None, name, connection, table, schema, None)
if let Some(fields) = &metadata_fields {
for (k, (v, t)) in fields {
if v != "topic" {
return Err(anyhow!(
"Invalid metadata field name for mqtt connector: {}",
k
));
}
if *t != DataType::Utf8 {
return Err(anyhow!(
"Invalid datatype: {} for metadata field: {} for mqtt connector",
k,
v
));
}
}
}

Self::from_config(self, None, name, connection, table, schema, metadata_fields)
}

fn make_operator(
Expand Down Expand Up @@ -286,6 +311,7 @@ impl Connector for MqttConnector {
)
.unwrap(),
subscribed: Arc::new(AtomicBool::new(false)),
metadata_fields: config.additional_fields,
})),
TableType::Sink { retain } => OperatorNode::from_operator(Box::new(MqttSinkFunc {
config: profile,
Expand Down
20 changes: 19 additions & 1 deletion crates/arroyo-connectors/src/mqtt/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arroyo_formats::de::FieldValueType;
use async_trait::async_trait;
use std::collections::HashMap;
use std::num::NonZeroU32;
Expand Down Expand Up @@ -33,6 +34,7 @@ pub struct MqttSourceFunc {
pub bad_data: Option<BadData>,
pub messages_per_second: NonZeroU32,
pub subscribed: Arc<AtomicBool>,
pub metadata_fields: Option<HashMap<String, String>>,
}

#[async_trait]
Expand Down Expand Up @@ -65,6 +67,7 @@ impl SourceOperator for MqttSourceFunc {
}
}

#[allow(clippy::too_many_arguments)]
impl MqttSourceFunc {
pub fn new(
config: MqttConfig,
Expand All @@ -74,6 +77,7 @@ impl MqttSourceFunc {
framing: Option<Framing>,
bad_data: Option<BadData>,
messages_per_second: u32,
metadata_fields: Option<HashMap<String, String>>,
) -> Self {
Self {
config,
Expand All @@ -84,6 +88,7 @@ impl MqttSourceFunc {
bad_data,
messages_per_second: NonZeroU32::new(messages_per_second).unwrap(),
subscribed: Arc::new(AtomicBool::new(false)),
metadata_fields,
}
}

Expand Down Expand Up @@ -143,7 +148,20 @@ impl MqttSourceFunc {
event = eventloop.poll() => {
match event {
Ok(MqttEvent::Incoming(Incoming::Publish(p))) => {
ctx.deserialize_slice(&p.payload, SystemTime::now(), None).await?;
let topic = String::from_utf8_lossy(&p.topic).to_string();
let connector_metadata: Option<HashMap<&String, FieldValueType<'_>>> =
self.metadata_fields.as_ref().map(|fields| {
fields.iter()
.filter_map(|(k, v)| {
if v == "topic" {
Some((k, FieldValueType::String(&topic)))
} else {
None
}
})
.collect()
});
ctx.deserialize_slice(&p.payload, SystemTime::now(), connector_metadata).await?;
rate_limiter.until_ready().await;
}
Ok(MqttEvent::Outgoing(Outgoing::Subscribe(_))) => {
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-connectors/src/mqtt/source/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl MqttTopicTester {
None,
None,
10,
None,
);

let (to_control_tx, control_rx) = channel(128);
Expand Down

0 comments on commit e95d8b6

Please sign in to comment.