Skip to content

Commit

Permalink
Cumulocity mapper using actors #1909
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Apr 21, 2023
1 parent 1eb4842 commit c027b33
Show file tree
Hide file tree
Showing 16 changed files with 840 additions and 945 deletions.
205 changes: 95 additions & 110 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub fn log_message_sent<I: Debug>(target: &str, message: I) {
/// The basic message box
pub struct SimpleMessageBox<Input: Debug, Output> {
input_receiver: LoggingReceiver<Input>,
output_sender: LoggingSender<Output>,
pub output_sender: LoggingSender<Output>,
}

impl<Input: Message, Output: Message> SimpleMessageBox<Input, Output> {
Expand Down
5 changes: 5 additions & 0 deletions crates/core/tedge_mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ async-trait = "0.1"
aws_mapper_ext = { path = "../../extensions/aws_mapper_ext" }
az_mapper_ext = { path = "../../extensions/az_mapper_ext" }
batcher = { path = "../../common/batcher" }
camino = "1.1.4"
c8y_api = { path = "../c8y_api" }
c8y_http_proxy = { path = "../../extensions/c8y_http_proxy" }
clap = { version = "3.2", features = ["cargo", "derive"] }
clock = { path = "../../common/clock" }
collectd_ext = { path = "../../extensions/collectd_ext" }
Expand All @@ -85,8 +87,11 @@ tedge_actors = { path = "../../core/tedge_actors" }
tedge_api = { path = "../tedge_api" }
tedge_config = { path = "../../common/tedge_config" }
tedge_health_ext = { path = "../../extensions/tedge_health_ext" }
tedge_http_ext = { path = "../../extensions/tedge_http_ext" }
tedge_mqtt_ext = { path = "../../extensions/tedge_mqtt_ext" }
tedge_file_system_ext = { path = "../../extensions/tedge_file_system_ext" }
tedge_signal_ext = { path = "../../extensions/tedge_signal_ext" }
tedge_timer_ext = { path = "../../extensions/tedge_timer_ext" }
tedge_utils = { path = "../../common/tedge_utils", features = [
"logging",
"fs-notify",
Expand Down
315 changes: 315 additions & 0 deletions crates/core/tedge_mapper/src/c8y/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
use super::config::C8yMapperConfig;
use super::config::MQTT_MESSAGE_SIZE_THRESHOLD;
use super::converter::CumulocityConverter;
use super::converter::CumulocityDeviceInfo;
use super::dynamic_discovery::process_inotify_events;
use super::mapper::CumulocityMapper;
use crate::core::converter::Converter;
use crate::core::converter::MapperConfig;
use crate::core::size_threshold::SizeThreshold;
use async_trait::async_trait;
use c8y_api::smartrest::operations::Operations;
use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::C8YRestRequest;
use c8y_http_proxy::messages::C8YRestResult;
use mqtt_channel::Message;
use mqtt_channel::Topic;
use std::path::PathBuf;
use std::time::Duration;
use tedge_actors::adapt;
use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeError;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Sender;
use tedge_actors::ServiceConsumer;
use tedge_actors::ServiceProvider;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
use tedge_file_system_ext::FsWatchEvent;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::TopicFilter;
use tedge_timer_ext::SetTimeout;
use tedge_timer_ext::Timeout;

const SYNC_WINDOW: Duration = Duration::from_secs(3);

pub type SyncStart = SetTimeout<()>;
pub type SyncComplete = Timeout<()>;

fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete] : Debug);
type C8yMapperOutput = MqttMessage;

pub struct C8yMapperActor {
config: C8yMapperConfig,
converter: CumulocityConverter,
messages: SimpleMessageBox<C8yMapperInput, C8yMapperOutput>,
mqtt_publisher: LoggingSender<MqttMessage>,
timer_sender: LoggingSender<SyncStart>,
}

#[async_trait]
impl Actor for C8yMapperActor {
fn name(&self) -> &str {
"CumulocityMapper"
}

async fn run(&mut self) -> Result<(), RuntimeError> {
let init_messages = self.converter.init_messages();
for init_message in init_messages.into_iter() {
let _ = self.mqtt_publisher.send(init_message).await?;
}

// Start the sync phase
self.timer_sender
.send(SyncStart::new(SYNC_WINDOW, ()))
.await?;

while let Some(event) = self.messages.recv().await {
match event {
C8yMapperInput::MqttMessage(message) => {
self.process_mqtt_message(message).await?;
}
C8yMapperInput::FsWatchEvent(event) => {
self.process_file_watch_event(event).await?;
}
C8yMapperInput::SyncComplete(_) => {
self.process_sync_timeout().await?;
}
}
}
Ok(())
}
}

impl C8yMapperActor {
pub fn new(
config: C8yMapperConfig,
converter: CumulocityConverter,
messages: SimpleMessageBox<C8yMapperInput, C8yMapperOutput>,
mqtt_publisher: LoggingSender<MqttMessage>,
timer_sender: LoggingSender<SyncStart>,
) -> Self {
Self {
config,
converter,
messages,
mqtt_publisher,
timer_sender,
}
}

async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
let converted_messages = self.converter.convert(&message).await;

for converted_message in converted_messages.into_iter() {
let _ = self.mqtt_publisher.send(converted_message).await;
}

Ok(())
}

async fn process_file_watch_event(
&mut self,
file_event: FsWatchEvent,
) -> Result<(), RuntimeError> {
match file_event.clone() {
FsWatchEvent::DirectoryCreated(path) => {
if let Some(directory_name) = path.file_name() {
let child_id = directory_name.to_string_lossy().to_string();
let message = Message::new(
&Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
format!("101,{child_id},{child_id},thin-edge.io-child"),
);
self.mqtt_publisher.send(message).await?;
}
}
FsWatchEvent::FileCreated(path)
| FsWatchEvent::FileDeleted(path)
| FsWatchEvent::Modified(path)
| FsWatchEvent::DirectoryDeleted(path) => {
match process_inotify_events(&path, file_event) {
Ok(Some(discovered_ops)) => {
self.mqtt_publisher
.send(
self.converter
.process_operation_update_message(discovered_ops),
)
.await?;
}
Ok(None) => {}
Err(e) => {
eprintln!("Processing inotify event failed due to {}", e);
}
}
}
}

Ok(())
}

pub async fn process_sync_timeout(&mut self) -> Result<(), RuntimeError> {
// Once the sync phase is complete, retrieve all sync messages from the converter and process them
let sync_messages = self.converter.sync_messages();
for message in sync_messages {
self.process_mqtt_message(message).await?;
}

Ok(())
}
}

pub struct C8yMapperBuilder {
config: C8yMapperConfig,
box_builder: SimpleMessageBoxBuilder<C8yMapperInput, C8yMapperOutput>,
mqtt_publisher: Option<DynSender<MqttMessage>>,
http_proxy: Option<C8YHttpProxy>,
timer_sender: Option<DynSender<SyncStart>>,
}

impl C8yMapperBuilder {
//HIPPO: Accept all the providers as arguments
pub fn new(config: C8yMapperConfig) -> Self {
let box_builder = SimpleMessageBoxBuilder::new("CumulocityMapper", 16);

Self {
config,
box_builder,
mqtt_publisher: None,
http_proxy: None,
timer_sender: None,
}
}

pub fn with_c8y_http_proxy(
&mut self,
http: &mut impl ServiceProvider<C8YRestRequest, C8YRestResult, NoConfig>,
) -> Result<(), LinkError> {
self.http_proxy = Some(C8YHttpProxy::new("C8yMapper => C8Y", http));
Ok(())
}
}

impl ServiceConsumer<MqttMessage, MqttMessage, TopicFilter> for C8yMapperBuilder {
fn get_config(&self) -> TopicFilter {
let operations = Operations::try_new(format!(
"{}/operations/c8y",
self.config.config_dir.display()
))
.unwrap(); //HIPPO

CumulocityMapper::subscriptions(&operations).unwrap() //HIPPO
}

fn set_request_sender(&mut self, sender: DynSender<MqttMessage>) {
self.mqtt_publisher = Some(sender);
}

fn get_response_sender(&self) -> DynSender<MqttMessage> {
adapt(&self.box_builder.get_sender())
}
}

impl ServiceConsumer<SetTimeout<()>, Timeout<()>, NoConfig> for C8yMapperBuilder {
fn get_config(&self) -> NoConfig {
NoConfig
}

fn set_request_sender(&mut self, sender: DynSender<SetTimeout<()>>) {
self.timer_sender = Some(sender);
}

fn get_response_sender(&self) -> DynSender<Timeout<()>> {
adapt(&self.box_builder.get_sender())
}
}

impl MessageSink<FsWatchEvent, PathBuf> for C8yMapperBuilder {
fn get_config(&self) -> PathBuf {
self.config.ops_dir.clone()
}

fn get_sender(&self) -> DynSender<FsWatchEvent> {
tedge_actors::adapt(&self.box_builder.get_sender())
}
}

impl RuntimeRequestSink for C8yMapperBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
self.box_builder.get_signal_sender()
}
}

impl Builder<C8yMapperActor> for C8yMapperBuilder {
type Error = RuntimeError;

fn try_build(self) -> Result<C8yMapperActor, Self::Error> {
let mqtt_publisher = self
.mqtt_publisher
.ok_or_else(|| LinkError::MissingPeer {
role: "mqtt".into(),
})
.map(|mqtt_publisher| {
LoggingSender::new("CumulocityMapper MQTT".into(), mqtt_publisher)
})?;

let http_proxy = self.http_proxy.ok_or_else(|| LinkError::MissingPeer {
role: "http".to_string(),
})?;

let timer_sender = self
.timer_sender
.ok_or_else(|| LinkError::MissingPeer {
role: "timer".to_string(),
})
.map(|timer_sender| {
LoggingSender::new("CumulocityMapper Timer".into(), timer_sender)
})?;

let operations = Operations::try_new(self.config.ops_dir.clone()).unwrap(); //HIPPO
let child_ops = Operations::get_child_ops(self.config.ops_dir.clone()).unwrap(); //HIPPO
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let device_info = CumulocityDeviceInfo {
device_name: self.config.device_id.clone(),
device_type: self.config.device_type.clone(),
operations,
service_type: self.config.service_type.clone(),
};

let mapper_config = MapperConfig {
out_topic: Topic::new_unchecked("c8y/measurement/measurements/create"),
errors_topic: Topic::new_unchecked("tedge/errors"),
};

let converter = CumulocityConverter::new(
size_threshold,
device_info,
mqtt_publisher.clone(),
http_proxy,
&self.config.config_dir,
self.config.logs_path.clone(),
child_ops,
mapper_config,
)
.unwrap(); //HIPPO map to RuntimeError or a new BuildError

let message_box = self.box_builder.build();

Ok(C8yMapperActor::new(
self.config,
converter,
message_box,
mqtt_publisher,
timer_sender,
))
}
}
Loading

0 comments on commit c027b33

Please sign in to comment.