Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tedge-agent: move to new health topic #2271

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,19 @@ impl Agent {
let signal_actor_builder = SignalActor::builder(&runtime.get_handle());

// Health actor
let health_actor = HealthMonitorBuilder::new(TEDGE_AGENT, &mut mqtt_actor_builder);
let service_topic_id = self.config.mqtt_device_topic_id.to_service_topic_id("tedge-agent")
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", self.config.mqtt_device_topic_id))?;
let health_actor = HealthMonitorBuilder::from_service_topic_id(
service_topic_id,
&mut mqtt_actor_builder,
self.config.mqtt_topic_root.clone(),
);

// Tedge to Te topic converter
let tedge_to_te_converter = create_tedge_to_te_converter(&mut mqtt_actor_builder)?;
let tedge_to_te_converter = create_tedge_to_te_converter(
&mut mqtt_actor_builder,
self.config.mqtt_topic_root.to_string(),
)?;

// Spawn all
runtime.spawn(signal_actor_builder).await?;
Expand Down Expand Up @@ -213,8 +222,10 @@ impl Agent {

pub fn create_tedge_to_te_converter(
mqtt_actor_builder: &mut MqttActorBuilder,
mqtt_topic_root: String,
) -> Result<ConvertingActorBuilder<TedgetoTeConverter, TopicFilter>, anyhow::Error> {
let tedge_to_te_converter = TedgetoTeConverter::new();
let tedge_to_te_converter = TedgetoTeConverter::with_root(mqtt_topic_root.clone());

let subscriptions: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
Expand All @@ -224,6 +235,8 @@ pub fn create_tedge_to_te_converter(
"tedge/alarms/+/+/+",
"tedge/health/+",
"tedge/health/+/+",
// currently only tedge-agent uses new topics
&format!("{mqtt_topic_root}/device/main/service/tedge-agent/status/health"),
]
.try_into()?;

Expand Down
149 changes: 144 additions & 5 deletions crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use serde_json::Value;
use std::collections::HashMap;
use std::convert::Infallible;
use tedge_actors::Converter;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;

pub struct TedgetoTeConverter {}
pub struct TedgetoTeConverter {
mqtt_schema: MqttSchema,
}

impl Converter for TedgetoTeConverter {
type Input = MqttMessage;
Expand All @@ -19,20 +23,56 @@ impl Converter for TedgetoTeConverter {
}

impl TedgetoTeConverter {
/// Creates a new converter with default prefix `"te"`.
#[cfg(test)]
pub fn new() -> Self {
TedgetoTeConverter {}
Self {
mqtt_schema: MqttSchema::new(),
}
}

pub fn with_root(root: String) -> Self {
Self {
mqtt_schema: MqttSchema::with_root(root),
}
}

fn try_convert(&mut self, message: MqttMessage) -> Vec<tedge_mqtt_ext::Message> {
match message.topic.clone() {
match &message.topic {
topic if topic.name.starts_with("tedge/measurements") => {
self.convert_measurement(message)
}
topic if topic.name.starts_with("tedge/events") => self.convert_event(message),
topic if topic.name.starts_with("tedge/alarms") => self.convert_alarm(message),
topic if topic.name.starts_with("tedge/health") => {

// to be able to move different services to new health topic at different times, we
// selectively map services either from old to new topics, and vice versa.
// tedge-agent publishes on new health topic, exclude it from old mapping not to produce
// a publish loop
topic
if topic.name.starts_with("tedge/health")
&& !topic.name.contains("tedge-agent") =>
{
self.convert_health_status_message(message)
}
Comment on lines +48 to 57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not be simpler to update all the services in a single PR?

Or even simpler:

  • to start with the service consuming health check messages (the mapper and tedge-watchdog, so all the health status are consumed from the te topics.
  • to update one after the other the services (so they publish health status on the te topics).

I realized for that to work, one needs to update the tedge_to_te_converter to also converts health check requests. So, we will be able to update the services (as the agent) one after the other so they react to te/+/+/+/+/cmd/health/check.

The very last step will be to update tedge-watchdog to send health check requests on the te topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it would be much simpler and it's something that we have to do anyway, so better to do it now.


// Convert messages from new topics to old topics for backward compatibility
topic if topic.name.starts_with(&self.mqtt_schema.root) => {
match self.mqtt_schema.entity_channel_of(topic) {
Ok((entity_topic_id, Channel::Health))
if entity_topic_id.as_str().contains("tedge-agent") =>
{
self.convert_new_health_status_message_to_old(message)
}
Ok(_) => vec![],
Err(_) => vec![],
}
}

topic if topic.name.starts_with("tedge/health-check") => {
self.convert_health_check_command(message)
}

_ => vec![],
}
}
Expand Down Expand Up @@ -118,19 +158,118 @@ impl TedgetoTeConverter {
message.retain = true;
vec![message]
}

/// Maps health messages from a new topic scheme to the old.
///
/// The `message` is assumed to be a health message coming from a service under a new topic
/// scheme. This function should be called for services already ported to the new topic scheme
/// and these services should be excluded from mapping old -> new, or else a message outputted
/// by this function will cause `convert_health_status_message`, which in turn will output
/// a message which will cause this function to be called again, resultin in a loop.
fn convert_new_health_status_message_to_old(
&mut self,
mut message: MqttMessage,
) -> Vec<MqttMessage> {
// message fits new topic scheme
let topic = message.topic;
let (entity_topic_id, channel) = self
.mqtt_schema
.entity_channel_of(&topic)
.expect("topic should be confirmed to fit new schema in try_convert");

if channel != Channel::Health {
return vec![];
}

// TODO: move topic schema mapping into tedge-api
let topic = match entity_topic_id.as_str().split('/').collect::<Vec<&str>>()[..] {
["device", "main", "service", service_name] => format!("tedge/health/{service_name}"),
["device", cid, "service", service_name] => {
format!("tedge/health/{cid}/{service_name}")
}
// topics which do not fit a default schema are not mapped
_ => return vec![],
};

let topic = Topic::new_unchecked(&topic);
message.topic = topic;
message.retain = true;
vec![message]
}

fn convert_health_check_command(
&self,
mut message: tedge_mqtt_ext::Message,
) -> Vec<tedge_mqtt_ext::Message> {
let topic = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "health-check"] => Topic::new_unchecked("te/device/main///cmd/health/check"),
["tedge", "health-check", service_name] => Topic::new_unchecked(
format!("te/device/main/service/{service_name}/cmd/health/check").as_str(),
),
_ => return vec![],
};
message.topic = topic;
message.retain = true;
vec![message]
}
}

#[cfg(test)]
mod tests {
use super::*;

use crate::tedge_to_te_converter::converter::TedgetoTeConverter;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;

#[test]

fn convert_incoming_wrong_topic() {
let mqtt_message = MqttMessage::new(&Topic::new_unchecked("tedge///MyCustomAlarm"), "");
let mut converter = TedgetoTeConverter::new();
let res = converter.try_convert(mqtt_message);
assert!(res.is_empty())
}

/// Ensures that private method `convert_new_health_status_message_to_old` only converts new
/// health messages to old messages for updated components.
// this test will have to be altered as components are updated to work with new health topics
#[test]
fn converts_health_status_messages_for_agent() {
let mut converter = TedgetoTeConverter::new();

let entities_incorrect = [
"device/main/service/other-service",
"device/child001/service/other-service",
"factory01/hallA/packaging/belt001",
];

for topic in entities_incorrect {
let topic = converter
.mqtt_schema
.topic_for(&topic.parse().unwrap(), &Channel::Health);
let message = MqttMessage::new(&topic, "");

assert_eq!(converter.convert(&message).unwrap(), vec![]);
}

let topic = converter.mqtt_schema.topic_for(
&"device/main/service/tedge-agent".parse().unwrap(),
&Channel::Health,
);
let message = MqttMessage::new(&topic, "");
let expected_topic = "tedge/health/tedge-agent";
let messages = converter.convert(&message).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].topic.name, expected_topic);

let topic = converter.mqtt_schema.topic_for(
&"device/child001/service/tedge-agent".parse().unwrap(),
&Channel::Health,
);
let message = MqttMessage::new(&topic, "");
let expected_topic = "tedge/health/child001/tedge-agent";
let messages = converter.convert(&message).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].topic.name, expected_topic);
}
}
84 changes: 84 additions & 0 deletions crates/core/tedge_api/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,96 @@
use std::process;
use std::sync::Arc;

use crate::mqtt_topics::ServiceTopicId;
use mqtt_channel::Message;
use mqtt_channel::PubChannel;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use serde_json::json;
use time::OffsetDateTime;

/// Encodes a valid health topic.
///
/// Health topics are topics on which messages about health status of services are published. To be
/// able to send health messages, a health topic needs to be constructed for a given entity.
// TODO: replace `Arc<str>` with `ServiceTopicId` after we're done with transition to new topics
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServiceHealthTopic(Arc<str>);

impl ServiceHealthTopic {
pub fn new(service: ServiceTopicId) -> Self {
ServiceHealthTopic(Arc::from(format!("te/{}/status/health", service.as_str())))
}

pub fn from_old_topic(topic: String) -> Result<Self, HealthTopicError> {
match topic.split('/').collect::<Vec<&str>>()[..] {
["tedge", "health", _service_name] => {}
["tedge", "health", _child_id, _service_name] => {}
_ => return Err(HealthTopicError),
}

Ok(Self(Arc::from(topic)))
}

pub fn is_health_topic(topic: &str) -> bool {
matches!(
topic.split('/').collect::<Vec<&str>>()[..],
["te", _, _, _, _, "status", "health"]
)
}

pub fn as_str(&self) -> &str {
&self.0
}

pub async fn send_health_status(&self, responses: &mut impl PubChannel) {
let response_topic_health = Topic::new_unchecked(self.as_str());

let health_status = json!({
"status": "up",
"pid": process::id(),
"time": OffsetDateTime::now_utc().unix_timestamp(),
})
.to_string();

let health_message = Message::new(&response_topic_health, health_status).with_retain();
let _ = responses.send(health_message).await;
}

pub fn down_message(&self) -> Message {
Message {
topic: Topic::new_unchecked(self.as_str()),
payload: json!({
"status": "down",
"pid": process::id()})
.to_string()
.into(),
qos: mqtt_channel::QoS::AtLeastOnce,
retain: true,
}
}

pub fn up_message(&self) -> Message {
let response_topic_health = Topic::new_unchecked(self.as_str());

let health_status = json!({
"status": "up",
"pid": process::id(),
"time": OffsetDateTime::now_utc().unix_timestamp(),
})
.to_string();

Message::new(&response_topic_health, health_status)
.with_qos(mqtt_channel::QoS::AtLeastOnce)
.with_retain()
}
}

#[derive(Debug)]
pub struct HealthTopicError;

// TODO: remove below functions once components moved to new health topics

pub fn health_check_topics(daemon_name: &str) -> TopicFilter {
vec![
"tedge/health-check".into(),
Expand Down
Loading