Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Fix remaining test failures from built-in bridge #2879

Merged
merged 14 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ pub fn service_creation_message(
ancestors: &[String],
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?,
))
}

/// Create a SmartREST message for creating a service on device.
/// The provided ancestors list must contain all the parents of the given service
/// starting from its immediate parent device.
pub fn service_creation_message_payload(
service_id: &str,
service_name: &str,
service_type: &str,
service_status: &str,
) -> Result<String, InvalidValueError> {
// TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format
if service_id.is_empty() {
return Err(InvalidValueError {
Expand All @@ -94,16 +109,13 @@ pub fn service_creation_message(
});
}

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
fields_to_csv_string(&[
"102",
service_id,
service_type,
service_name,
service_status,
]),
))
Ok(fields_to_csv_string(&[
"102",
service_id,
service_type,
service_name,
service_status,
]))
}

/// Create a SmartREST message for updating service status.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge/src/cli/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use tedge_config::system_services::SystemService;

#[derive(Copy, Clone, Debug, strum_macros::Display, strum_macros::IntoStaticStr)]
#[derive(Copy, Clone, Debug, strum_macros::Display, strum_macros::IntoStaticStr, PartialEq, Eq)]
pub enum Cloud {
#[strum(serialize = "Cumulocity")]
C8y,
Expand Down
74 changes: 55 additions & 19 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,19 +502,25 @@ fn new_bridge(
}
}

if let Err(err) =
write_generic_mosquitto_config_to_file(config_location, common_mosquitto_config)
{
// We want to preserve previous errors and therefore discard result of this function.
let _ = clean_up(config_location, bridge_config);
return Err(err);
}

if bridge_config.bridge_location == BridgeLocation::Mosquitto {
println!("Saving configuration for requested bridge.\n");

if let Err(err) =
write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config)
{
if let Err(err) = write_bridge_config_to_file(config_location, bridge_config) {
// We want to preserve previous errors and therefore discard result of this function.
let _ = clean_up(config_location, bridge_config);
return Err(err);
}
} else {
println!("Deleting mosquitto bridge configuration in favour of built-in bridge\n");
clean_up(config_location, bridge_config)?;
use_built_in_bridge(config_location, bridge_config)?;
}

if let Err(err) = service_manager_result {
Expand Down Expand Up @@ -543,18 +549,7 @@ fn new_bridge(
Ok(())
}

fn restart_mosquitto(
bridge_config: &BridgeConfig,
service_manager: &dyn SystemServiceManager,
config_location: &TEdgeConfigLocation,
) -> Result<(), ConnectError> {
println!("Restarting mosquitto service.\n");

if let Err(err) = service_manager.stop_service(SystemService::Mosquitto) {
clean_up(config_location, bridge_config)?;
return Err(err.into());
}

pub fn chown_certificate_and_key(bridge_config: &BridgeConfig) {
let (user, group) = match bridge_config.bridge_location {
BridgeLocation::BuiltIn => ("tedge", "tedge"),
BridgeLocation::Mosquitto => (crate::BROKER_USER, crate::BROKER_GROUP),
Expand All @@ -570,6 +565,21 @@ fn restart_mosquitto(
warn!("Failed to change ownership of {path} to {user}:{group}: {err}");
}
}
}

fn restart_mosquitto(
bridge_config: &BridgeConfig,
service_manager: &dyn SystemServiceManager,
config_location: &TEdgeConfigLocation,
) -> Result<(), ConnectError> {
println!("Restarting mosquitto service.\n");

if let Err(err) = service_manager.stop_service(SystemService::Mosquitto) {
clean_up(config_location, bridge_config)?;
return Err(err.into());
}

chown_certificate_and_key(bridge_config);

if let Err(err) = service_manager.restart_service(SystemService::Mosquitto) {
clean_up(config_location, bridge_config)?;
Expand Down Expand Up @@ -597,7 +607,7 @@ fn enable_software_management(

// To preserve error chain and not discard other errors we need to ignore error here
// (don't use '?' with the call to this function to preserve original error).
fn clean_up(
pub fn clean_up(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
) -> Result<(), ConnectError> {
Expand All @@ -606,6 +616,19 @@ fn clean_up(
Ok(())
}

pub fn use_built_in_bridge(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
) -> Result<(), ConnectError> {
let path = get_bridge_config_file_path(config_location, bridge_config);
std::fs::write(
path,
"# This file is left empty as the built-in bridge is enabled",
)
.or_else(ok_if_not_found)?;
Ok(())
}

fn bridge_config_exists(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
Expand All @@ -619,9 +642,8 @@ fn bridge_config_exists(
Ok(())
}

fn write_bridge_config_to_file(
fn write_generic_mosquitto_config_to_file(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
common_mosquitto_config: &CommonMosquittoConfig,
) -> Result<(), ConnectError> {
let dir_path = config_location
Expand All @@ -637,6 +659,20 @@ fn write_bridge_config_to_file(
common_mosquitto_config.serialize(&mut common_draft)?;
common_draft.persist()?;

Ok(())
}

fn write_bridge_config_to_file(
config_location: &TEdgeConfigLocation,
bridge_config: &BridgeConfig,
) -> Result<(), ConnectError> {
let dir_path = config_location
.tedge_config_root_path
.join(TEDGE_BRIDGE_CONF_DIR_PATH);

// This will forcefully create directory structure if it doesn't exist, we should find better way to do it, maybe config should deal with it?
create_directories(dir_path)?;
Comment on lines +673 to +674
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The directories should be created by tedge init and tedge connect should fail if given a path with unknown parents.


let config_path = get_bridge_config_file_path(config_location, bridge_config);
let mut config_draft = DraftFile::new(config_path)?.with_mode(0o644);
bridge_config.serialize(&mut config_draft)?;
Expand Down
8 changes: 2 additions & 6 deletions crates/core/tedge/src/cli/disconnect/disconnect_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,8 @@ impl DisconnectBridgeCommand {
// If bridge config file was not found we assume that the bridge doesn't exist,
// We finish early returning exit code 0.
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
if self.built_in_bridge {
Ok(())
} else {
println!("Bridge doesn't exist. Operation finished!");
Err(DisconnectBridgeError::BridgeFileDoesNotExist)
}
println!("Bridge doesn't exist. Operation finished!");
Err(DisconnectBridgeError::BridgeFileDoesNotExist)
}

Err(e) => Err(DisconnectBridgeError::FileOperationFailed(
Expand Down
30 changes: 25 additions & 5 deletions crates/core/tedge/src/cli/refresh_bridges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tedge_config::TEdgeConfigLocation;
use super::common::Cloud;
use super::connect::ConnectError;
use crate::bridge::BridgeConfig;
use crate::bridge::BridgeLocation;
use crate::bridge::CommonMosquittoConfig;
use crate::bridge::TEDGE_BRIDGE_CONF_DIR_PATH;
use crate::command::BuildContext;
Expand All @@ -28,19 +29,38 @@ impl Command for RefreshBridgesCmd {
fn execute(&self) -> anyhow::Result<()> {
let clouds = established_bridges(&self.config_location);

if clouds.is_empty() {
if clouds.is_empty() && !self.config.mqtt.bridge.built_in {
println!("No bridges to refresh.");
return Ok(());
}

let common_mosquitto_config = CommonMosquittoConfig::from_tedge_config(&self.config);
common_mosquitto_config.save(&self.config_location)?;

for cloud in clouds {
println!("Refreshing bridge {cloud}");
if !self.config.mqtt.bridge.built_in {
for cloud in &clouds {
println!("Refreshing bridge {cloud}");

let bridge_config = super::connect::bridge_config(&self.config, cloud)?;
refresh_bridge(&bridge_config, &self.config_location)?;
let bridge_config = super::connect::bridge_config(&self.config, *cloud)?;
refresh_bridge(&bridge_config, &self.config_location)?;
}
}

for cloud in [Cloud::Aws, Cloud::Azure, Cloud::C8y] {
// (attempt to) reassert ownership of the certificate and key
// This is necessary when upgrading from the mosquitto bridge to the built-in bridge
Comment on lines +49 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have to find a more dynamic way to handle certificate ownership. But that's okay for now.

if let Ok(bridge_config) = super::connect::bridge_config(&self.config, cloud) {
super::connect::chown_certificate_and_key(&bridge_config);

if bridge_config.bridge_location == BridgeLocation::BuiltIn
&& clouds.contains(&cloud)
{
println!(
"Deleting mosquitto bridge configuration for {cloud} in favour of built-in bridge"
);
super::connect::use_built_in_bridge(&self.config_location, &bridge_config)?;
}
}
}

println!("Restarting mosquitto service.\n");
Expand Down
103 changes: 94 additions & 9 deletions crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use tedge_config::TEdgeConfig;
use tedge_downloader_ext::DownloaderActor;
use tedge_file_system_ext::FsWatchActorBuilder;
use tedge_http_ext::HttpActor;
use tedge_mqtt_bridge::rumqttc::LastWill;
use tedge_mqtt_bridge::use_key_and_cert;
use tedge_mqtt_bridge::BridgeConfig;
use tedge_mqtt_bridge::MqttBridgeActorBuilder;
use tedge_mqtt_bridge::QoS;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_timer_ext::TimerActor;
use tedge_uploader_ext::UploaderActor;
Expand Down Expand Up @@ -72,7 +74,36 @@ impl TEdgeComponent for CumulocityMapper {
tc.forward_from_remote(topic, local_prefix.clone(), "")?;
}

tc.forward_from_local("#", local_prefix, "")?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this was genuinely problematic as Cumulocity doesn't send us our own messages back, but I've changed the config to match the current bridge config in any case

// Templates
tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?;

// Static templates
tc.forward_from_local("s/us", local_prefix.clone(), "")?;
tc.forward_from_local("s/us/#", local_prefix.clone(), "")?;
tc.forward_from_local("t/us/#", local_prefix.clone(), "")?;
tc.forward_from_local("q/us/#", local_prefix.clone(), "")?;
tc.forward_from_local("c/us/#", local_prefix.clone(), "")?;

// SmartREST2
tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?;
tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?;
tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?;
tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?;

// c8y JSON
tc.forward_from_local(
"inventory/managedObjects/update/#",
local_prefix.clone(),
"",
)?;
tc.forward_from_local(
"measurement/measurements/create/#",
local_prefix.clone(),
"",
)?;
tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?;
tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?;
tc.forward_from_local("s/uat", local_prefix.clone(), "")?;

let c8y = tedge_config.c8y.mqtt.or_config_not_set()?;
let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new(
Expand All @@ -89,14 +120,68 @@ impl TEdgeComponent for CumulocityMapper {
&tedge_config,
)?;

let bridge_actor = MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
tc,
cloud_config,
)
.await;
runtime.spawn(bridge_actor).await?;
let main_device_xid: EntityExternalId =
tedge_config.device.id.try_read(&tedge_config)?.into();
let service_type = &tedge_config.service.ty;
let service_type = if service_type.is_empty() {
"service".to_string()
} else {
service_type.to_string()
};

// FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme

// there is one mapper instance per cloud per thin-edge instance, perhaps we should use some
// predefined topic id instead of trying to derive it from current device?
let entity_topic_id: EntityTopicId = tedge_config
.mqtt
.device_topic_id
.clone()
.parse()
.context("Invalid device_topic_id")?;

let mapper_service_topic_id = entity_topic_id
.default_service_for_device(CUMULOCITY_MAPPER_NAME)
.context("Can't derive service name if device topic id not in default scheme")?;

let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id(
&mapper_service_topic_id,
&main_device_xid,
);

let last_will_message_mapper =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
CUMULOCITY_MAPPER_NAME,
service_type.as_str(),
"down",
)?;
let last_will_message_bridge =
c8y_api::smartrest::inventory::service_creation_message_payload(
mapper_service_external_id.as_ref(),
&c8y_mapper_config.bridge_service_name(),
service_type.as_str(),
"down",
)?;

cloud_config.set_last_will(LastWill {
topic: "s/us".into(),
qos: QoS::AtLeastOnce,
message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(),
retain: false,
});

runtime
.spawn(
MqttBridgeActorBuilder::new(
&tedge_config,
c8y_mapper_config.bridge_service_name(),
tc,
cloud_config,
)
.await,
)
.await?;
}
let mut jwt_actor = C8YJwtRetriever::builder(
mqtt_config.clone(),
Expand Down
Loading