Skip to content

Commit

Permalink
Fix service monitoring of tedge-mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
albinsuresh committed Apr 27, 2023
1 parent 700b2c0 commit 5450cac
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 51 deletions.
2 changes: 1 addition & 1 deletion crates/core/tedge_mapper/src/c8y/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use tedge_mqtt_ext::TopicFilter;
use tedge_timer_ext::SetTimeout;
use tedge_timer_ext::Timeout;

const SYNC_WINDOW: Duration = Duration::from_secs(3);
const SYNC_WINDOW: Duration = Duration::from_secs(300);

pub type SyncStart = SetTimeout<()>;
pub type SyncComplete = Timeout<()>;
Expand Down
70 changes: 21 additions & 49 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,24 @@ use c8y_api::smartrest::operations::Operations;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_http_proxy::credentials::C8YJwtRetriever;
use c8y_http_proxy::C8YHttpProxyBuilder;
use mqtt_channel::Connection;
use mqtt_channel::Config;
use mqtt_channel::TopicFilter;
use std::path::Path;
use tedge_actors::MessageSource;
use tedge_actors::ServiceConsumer;
use tedge_api::topic::ResponseTopic;
use tedge_config::ConfigSettingAccessor;
use tedge_config::DeviceIdSetting;
use tedge_config::ServiceTypeSetting;
use tedge_config::TEdgeConfig;
use tedge_file_system_ext::FsWatchActorBuilder;
use tedge_http_ext::HttpActor;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_timer_ext::TimerActor;
use tedge_utils::file::*;
use tracing::info;

const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";

pub struct CumulocityMapper {}

impl CumulocityMapper {
pub fn new() -> CumulocityMapper {
CumulocityMapper {}
}

pub fn subscriptions(operations: &Operations) -> Result<TopicFilter, anyhow::Error> {
let mut topic_filter: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
"tedge/alarms/+/+",
"tedge/alarms/+/+/+",
"c8y-internal/alarms/+/+",
"c8y-internal/alarms/+/+/+",
"tedge/events/+",
"tedge/events/+/+",
"tedge/health/+",
"tedge/health/+/+",
C8yTopic::SmartRestRequest.to_string().as_str(),
ResponseTopic::SoftwareListResponse.as_str(),
ResponseTopic::SoftwareUpdateResponse.as_str(),
ResponseTopic::RestartResponse.as_str(),
]
.try_into()
.expect("topics that mapper should subscribe to");

for topic in operations.topics_for_operations() {
topic_filter.add(&topic)?
}

Ok(topic_filter)
}
}
pub struct CumulocityMapper;

#[async_trait]
impl TEdgeComponent for CumulocityMapper {
Expand Down Expand Up @@ -97,38 +66,41 @@ impl TEdgeComponent for CumulocityMapper {
&mut fs_watch_actor,
);

// MQTT client dedicated to set service down status on shutdown, using a last-will message
// A separate MQTT actor/client is required as the last will message of the main MQTT actor
// is used to send down status to tedge/health topic
let service_monitor_actor =
MqttActorBuilder::new(service_monitor_client_config(&tedge_config)?);

runtime.spawn(mqtt_actor).await?;
runtime.spawn(jwt_actor).await?;
runtime.spawn(http_actor).await?;
runtime.spawn(c8y_http_proxy_actor).await?;
runtime.spawn(fs_watch_actor).await?;
runtime.spawn(timer_actor).await?;
runtime.spawn(c8y_mapper_actor).await?;
runtime.spawn(service_monitor_actor).await?;
runtime.run_to_completion().await?;

Ok(())
}
}

pub async fn create_mqtt_client_will_message(
device_name: &str,
app_name: &str,
service_type: &str,
tedge_config: &TEdgeConfig,
) -> Result<Connection, anyhow::Error> {
pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result<Config, anyhow::Error> {
let device_name = tedge_config.query(DeviceIdSetting)?;
let service_type = tedge_config.query(ServiceTypeSetting)?;

let mqtt_config = tedge_config
.mqtt_config()?
.with_session_name("last_will_c8y_mapper")
.with_last_will_message(service_monitor_status_message(
device_name,
app_name,
&device_name,
CUMULOCITY_MAPPER_NAME,
"down",
service_type,
&service_type,
None,
));
let mqtt_client = Connection::new(&mqtt_config).await?;

Ok(mqtt_client)
Ok(mqtt_config)
}

fn create_directories(config_dir: &Path) -> Result<(), anyhow::Error> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_mapper/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn lookup_component(component_name: &MapperName) -> Box<dyn TEdgeComponent> {
MapperName::Az => Box::new(AzureMapper::new()),
MapperName::Aws => Box::new(AwsMapper),
MapperName::Collectd => Box::new(CollectdMapper),
MapperName::C8y => Box::new(CumulocityMapper::new()),
MapperName::C8y => Box::new(CumulocityMapper),
}
}

Expand Down

0 comments on commit 5450cac

Please sign in to comment.