Skip to content

Commit

Permalink
Merge pull request #327 from rgallor/dynamic-introspection
Browse files Browse the repository at this point in the history
Dynamic introspection
  • Loading branch information
harlem88 authored May 13, 2024
2 parents bb9681e + 1d8a70d commit 9cbf280
Show file tree
Hide file tree
Showing 13 changed files with 639 additions and 209 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Update the Dynamic Introspection to support adding or removing interfaces from a MessageHub Node [#330](https://github.com/astarte-platform/astarte-device-sdk-rust/issues/330)

## [0.8.1] - 2024-05-03
### Fixed
- Correct the interfaces iterator logic to send the correct device
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ interface-strict = []
[workspace.dependencies]
astarte-device-sdk = { path = "./", version = "=0.8.1" }
astarte-device-sdk-derive = { version = "=0.8.1", path = "./astarte-device-sdk-derive" }
astarte-message-hub-proto = "0.6.2"
astarte-message-hub-proto = { git = "https://github.com/astarte-platform/astarte-message-hub-proto"}
async-trait = "0.1.50"
base64 = "0.22.0"
bson = "2.7.0"
Expand Down
55 changes: 55 additions & 0 deletions astarte-device-sdk-mock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ pub trait DynamicIntrospection {
async fn add_interface_from_str(&self, json_str: &str) -> Result<bool, Error>;

async fn remove_interface(&self, interface_name: &str) -> Result<bool, Error>;

async fn remove_interfaces<I>(&self, interfaces_name: I) -> Result<Vec<String>, Error>
where
I: IntoIterator<Item = String> + Send + 'static,
I::IntoIter: Send;

async fn remove_interfaces_vec(
&self,
interfaces_name: Vec<String>,
) -> Result<Vec<String>, Error>;
}

mock! {
Expand Down Expand Up @@ -172,6 +182,13 @@ mock! {
async fn add_interface_from_str(&self, json_str: &str) -> Result<bool, Error>;

async fn remove_interface(&self, interface_name: &str) -> Result<bool, Error>;

async fn remove_interfaces<I>(&self, interfaces_name: I) -> Result<Vec<String>, Error>
where
I: IntoIterator<Item = String> + Send + 'static,
I::IntoIter: Send;

async fn remove_interfaces_vec(&self, interfaces_name: Vec<String>) -> Result<Vec<String>, Error>;
}

impl<C> Clone for DeviceClient<C> {
Expand Down Expand Up @@ -417,6 +434,29 @@ mod tests {
)
.await
}

async fn remove_interfaces<I>(&self, interfaces_name: I) -> Result<Vec<String>, Error>
where
I: IntoIterator<Item = String> + Send + 'static,
I::IntoIter: Send,
{
astarte_device_sdk::introspection::DynamicIntrospection::remove_interfaces(
self,
interfaces_name,
)
.await
}

async fn remove_interfaces_vec(
&self,
interfaces_name: Vec<String>,
) -> Result<Vec<String>, Error> {
astarte_device_sdk::introspection::DynamicIntrospection::remove_interfaces_vec(
self,
interfaces_name,
)
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -453,6 +493,21 @@ mod tests {
async fn remove_interface(&self, _interface_name: &str) -> Result<bool, Error> {
Ok(Default::default())
}

async fn remove_interfaces<I>(&self, _interfaces_name: I) -> Result<Vec<String>, Error>
where
I: IntoIterator<Item = String> + Send,
I::IntoIter: Send,
{
Ok(Default::default())
}

async fn remove_interfaces_vec(
&self,
_interfaces_name: Vec<String>,
) -> Result<Vec<String>, Error> {
Ok(Default::default())
}
}

#[test]
Expand Down
25 changes: 25 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,4 +564,29 @@ where

rx.await.map_err(|_| Error::Disconnected)?
}

async fn remove_interfaces<I>(&self, interfaces_name: I) -> Result<Vec<String>, Error>
where
I: IntoIterator<Item = String> + Send,
I::IntoIter: Send,
{
let interfaces = interfaces_name.into_iter().collect();

self.remove_interfaces_vec(interfaces).await
}

async fn remove_interfaces_vec(
&self,
interfaces_name: Vec<String>,
) -> Result<Vec<String>, Error> {
let (tx, rx) = oneshot::channel();

self.send_msg(ClientMessage::RemoveInterfaces {
interfaces: interfaces_name.into_iter().collect(),
response: tx,
})
.await?;

rx.await.map_err(|_| Error::Disconnected)?
}
}
69 changes: 66 additions & 3 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

//! Connection to Astarte, for handling events and reconnection on error.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -285,8 +286,7 @@ impl<S, C> DeviceConnection<S, C> {
let mut interfaces = self.interfaces.write().await;

let Some(to_remove) = interfaces.get(interface_name) else {
debug!("interface {interface_name} not found");

debug!("{interface_name} not found, skipping");
return Ok(false);
};

Expand All @@ -295,7 +295,7 @@ impl<S, C> DeviceConnection<S, C> {
.await?;

if let Some(prop) = to_remove.as_prop() {
// We cannot error here since we already unsubscribed to the interface
// We cannot error here since we have already unsubscribed from the interface
if let Err(err) = self.store.delete_interface(prop.interface_name()).await {
error!("failed to remove property {err}");
}
Expand All @@ -306,6 +306,51 @@ impl<S, C> DeviceConnection<S, C> {
Ok(true)
}

async fn remove_interfaces<'a, I>(&mut self, interfaces_name: I) -> Result<Vec<String>, Error>
where
C: Register,
S: PropertyStore,
I: IntoIterator<Item = &'a str> + Send,
{
let mut interfaces = self.interfaces.write().await;

let to_remove: HashMap<&str, &Interface> = interfaces_name
.into_iter()
.filter_map(|iface_name| {
let interface = interfaces.get(iface_name).map(|i| (i.interface_name(), i));

if interface.is_none() {
debug!("{iface_name} not found, skipping");
}

interface
})
.collect();

if to_remove.is_empty() {
return Ok(Vec::new());
}

self.connection
.remove_interfaces(&interfaces, &to_remove)
.await?;

for (_, iface) in to_remove.iter() {
// We cannot error here since we have already unsubscribed from the interface
if let Some(prop) = iface.as_prop() {
if let Err(err) = self.store.delete_interface(prop.interface_name()).await {
error!("failed to remove property {err}");
}
}
}

let removed_names = to_remove.keys().map(|k| k.to_string()).collect_vec();

interfaces.remove_many(&removed_names);

Ok(removed_names)
}

async fn handle_connection_event(&self, event: ReceivedEvent<C::Payload>) -> Result<(), Error>
where
C: Receive + Sync,
Expand Down Expand Up @@ -403,6 +448,20 @@ impl<S, C> DeviceConnection<S, C> {
error!("client disconnected while failing to remove interfaces: {err}");
}

Ok(())
}
ClientMessage::RemoveInterfaces {
interfaces,
response,
} => {
let res = self
.remove_interfaces(interfaces.iter().map(|s| s.as_str()))
.await;

if let Err(Err(err)) = response.send(res) {
error!("client disconnected while failing to remove interfaces: {err}");
}

Ok(())
}
}
Expand Down Expand Up @@ -476,4 +535,8 @@ pub(crate) enum ClientMessage {
interface: String,
response: oneshot::Sender<Result<bool, Error>>,
},
RemoveInterfaces {
interfaces: Vec<String>,
response: oneshot::Sender<Result<Vec<String>, Error>>,
},
}
21 changes: 21 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ impl Interfaces {
self.interfaces.remove(interface_name)
}

/// Remove the interfaces which name is in the HashSet
pub(crate) fn remove_many<I, S>(&mut self, to_remove: I)
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
for i in to_remove {
self.remove(i.as_ref());
}
}

pub(crate) fn get_introspection_string(&self) -> String {
Introspection::new(self.interfaces.values()).to_string()
}
Expand Down Expand Up @@ -215,6 +226,16 @@ impl Interfaces {
.values()
.filter(|i| i.interface_name() != removed.interface_name())
}

/// Iter without many removed interfaces
pub(crate) fn iter_without_removed_many<'a>(
&'a self,
removed: &'a HashMap<&str, &Interface>,
) -> impl Iterator<Item = &'a Interface> + Clone {
self.interfaces
.values()
.filter(|i| !removed.contains_key(i.interface_name()))
}
}

impl FromIterator<Interface> for Interfaces {
Expand Down
Loading

0 comments on commit 9cbf280

Please sign in to comment.