Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial follow up tasks for built-in bridge #2810

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,6 @@ define_tedge_config! {
local_cleansession: AutoFlag,
},

#[tedge_config(default(value = false))]
#[doku(skip)] // Hide the configuration in `tedge config list --doc`
built_in: bool,

// TODO validation
/// The topic prefix that will be used for the mapper bridge MQTT topic. For instance,
/// if this is set to "c8y", then messages published to `c8y/s/us` will be
Expand Down Expand Up @@ -655,7 +651,13 @@ define_tedge_config! {
#[doku(as = "PathBuf")]
#[tedge_config(deprecated_key = "mqtt.external.keyfile")]
key_file: Utf8PathBuf,
}
},

bridge: {
#[tedge_config(default(value = false))]
#[doku(skip)] // Hide the configuration in `tedge config list --doc`
built_in: bool,
},
},

http: {
Expand Down Expand Up @@ -843,7 +845,7 @@ impl ReadableKey {
// features from being discovered.
pub fn is_printable_value(self, value: &str) -> bool {
match self {
Self::C8yBridgeBuiltIn => value != "false",
Self::MqttBridgeBuiltIn => value != "false",
Self::C8yBridgeTopicPrefix => value != "c8y",
_ => true,
}
Expand Down
37 changes: 13 additions & 24 deletions crates/core/c8y_api/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@ pub mod bridge {
use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD;
use tedge_api::MQTT_BRIDGE_UP_PAYLOAD;

pub fn is_c8y_bridge_up(message: &Message, service: &str) -> bool {
let c8y_bridge_health_topic = main_device_health_topic(service);
match message.payload_str() {
Ok(payload) => {
message.topic.name == c8y_bridge_health_topic && payload == MQTT_BRIDGE_UP_PAYLOAD
}
Err(_err) => false,
}
}

pub fn is_c8y_bridge_established(message: &Message, service: &str) -> bool {
let c8y_bridge_health_topic = main_device_health_topic(service);
match message.payload_str() {
Ok(payload) => {
message.topic.name == c8y_bridge_health_topic
&& (payload == MQTT_BRIDGE_UP_PAYLOAD || payload == MQTT_BRIDGE_DOWN_PAYLOAD)
&& (payload == MQTT_BRIDGE_UP_PAYLOAD
|| payload == MQTT_BRIDGE_DOWN_PAYLOAD
|| is_valid_status_payload(payload))
}
Err(_err) => false,
}
}

#[derive(serde::Deserialize)]
struct HealthStatus<'a> {
status: &'a str,
}

fn is_valid_status_payload(payload: &str) -> bool {
serde_json::from_str::<HealthStatus>(payload)
.map_or(false, |h| h.status == "up" || h.status == "down")
}
Comment on lines +12 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] I would move into tedge_api this logic to accept 0 | 1 | status = up | status = down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm removing the check for bridge health, then this more complicated logic isn't required, so unless we need it for some other reason you know of, I'll undo this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking that the bridge has been initialized before sending registration messages to c8y is required for a very new thin-edge device. If the c8y mapper starts before the bridge and even before the c8y.url is provided (as this is the case for a generic image of thin-edge), then the mapper must wait for the bridge to be initialized - otherwise all the registration messages are lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this logic is required - but can be simplified by simply checking that at least one message has been received on this topic - whatever the payload.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw 4ef0fcd and I'm okay with it.

}

pub mod child_device {
Expand All @@ -46,23 +48,10 @@ mod tests {
use test_case::test_case;

use crate::utils::bridge::is_c8y_bridge_established;
use crate::utils::bridge::is_c8y_bridge_up;

const C8Y_BRIDGE_HEALTH_TOPIC: &str =
"te/device/main/service/tedge-mapper-bridge-c8y/status/health";

#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", false)]
#[test_case("tedge/not/health/topic", "1", false)]
#[test_case("tedge/not/health/topic", "0", false)]
fn test_bridge_is_up(topic: &str, payload: &str, expected: bool) {
let topic = Topic::new(topic).unwrap();
let message = Message::new(&topic, payload);

let actual = is_c8y_bridge_up(&message, "tedge-mapper-bridge-c8y");
assert_eq!(actual, expected);
}

#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "1", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "0", true)]
#[test_case(C8Y_BRIDGE_HEALTH_TOPIC, "bad payload", false)]
Expand Down
9 changes: 9 additions & 0 deletions crates/core/tedge/src/cli/certificate/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::bridge::BridgeLocation;
use camino::Utf8PathBuf;
use tedge_config::OptionalConfigError;

Expand Down Expand Up @@ -50,6 +51,11 @@ pub enum TEdgeCertCli {
impl BuildCommand for TEdgeCertCli {
fn build_command(self, context: BuildContext) -> Result<Box<dyn Command>, ConfigError> {
let config = context.load_config()?;
let bridge_location = if config.mqtt.bridge.built_in {
BridgeLocation::BuiltIn
} else {
BridgeLocation::Mosquitto
};

let cmd = match self {
TEdgeCertCli::Create { id } => {
Expand All @@ -58,6 +64,7 @@ impl BuildCommand for TEdgeCertCli {
cert_path: config.device.cert_path.clone(),
key_path: config.device.key_path.clone(),
csr_path: None,
bridge_location,
};
cmd.into_boxed()
}
Expand All @@ -69,6 +76,7 @@ impl BuildCommand for TEdgeCertCli {
key_path: config.device.key_path.clone(),
// Use output file instead of csr_path from tedge config if provided
csr_path: output_path.unwrap_or_else(|| config.device.csr_path.clone()),
bridge_location,
};
cmd.into_boxed()
}
Expand Down Expand Up @@ -103,6 +111,7 @@ impl BuildCommand for TEdgeCertCli {
let cmd = RenewCertCmd {
cert_path: config.device.cert_path.clone(),
key_path: config.device.key_path.clone(),
bridge_location,
};
cmd.into_boxed()
}
Expand Down
19 changes: 16 additions & 3 deletions crates/core/tedge/src/cli/certificate/create.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::error::CertError;
use crate::bridge::BridgeLocation;
use crate::command::Command;
use camino::Utf8PathBuf;
use certificate::KeyCertPair;
Expand Down Expand Up @@ -26,6 +27,9 @@ pub struct CreateCertCmd {

/// The path where the device CSR file will be stored
pub csr_path: Option<Utf8PathBuf>,

/// The component that is configured to host the MQTT bridge logic
pub bridge_location: BridgeLocation,
}

impl Command for CreateCertCmd {
Expand Down Expand Up @@ -79,6 +83,11 @@ impl CreateCertCmd {
validate_parent_dir_exists(&self.cert_path).map_err(CertError::CertPathError)?;
validate_parent_dir_exists(&self.key_path).map_err(CertError::KeyPathError)?;

let (user, group) = match self.bridge_location {
BridgeLocation::BuiltIn => ("tedge", "tedge"),
BridgeLocation::Mosquitto => (crate::BROKER_USER, crate::BROKER_GROUP),
};

let cert = match &self.csr_path {
Some(csr_path) => {
validate_parent_dir_exists(csr_path).map_err(CertError::CsrPathError)?;
Expand All @@ -94,9 +103,8 @@ impl CreateCertCmd {
let cert = KeyCertPair::new_selfsigned_certificate(config, &self.id, key_kind)?;

// Creating files with permission 644 owned by the MQTT broker
let mut cert_file =
create_new_file(&self.cert_path, crate::BROKER_USER, crate::BROKER_GROUP)
.map_err(|err| err.cert_context(self.cert_path.clone()))?;
let mut cert_file = create_new_file(&self.cert_path, user, group)
.map_err(|err| err.cert_context(self.cert_path.clone()))?;

let cert_pem = cert.certificate_pem_string()?;
cert_file.write_all(cert_pem.as_bytes())?;
Expand Down Expand Up @@ -181,6 +189,7 @@ mod tests {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

assert_matches!(
Expand Down Expand Up @@ -209,6 +218,7 @@ mod tests {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

assert!(cmd
Expand All @@ -231,6 +241,7 @@ mod tests {
cert_path,
key_path,
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

let cert_error = cmd
Expand All @@ -250,6 +261,7 @@ mod tests {
cert_path,
key_path,
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

let cert_error = cmd
Expand All @@ -269,6 +281,7 @@ mod tests {
cert_path,
key_path,
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

let cert_error = cmd
Expand Down
6 changes: 6 additions & 0 deletions crates/core/tedge/src/cli/certificate/create_csr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::create::cn_of_self_signed_certificate;
use super::error::CertError;
use crate::bridge::BridgeLocation;
use crate::command::Command;
use crate::CreateCertCmd;
use camino::Utf8PathBuf;
Expand All @@ -10,6 +11,7 @@ pub struct CreateCsrCmd {
pub cert_path: Utf8PathBuf,
pub key_path: Utf8PathBuf,
pub csr_path: Utf8PathBuf,
pub bridge_location: BridgeLocation,
}

impl Command for CreateCsrCmd {
Expand Down Expand Up @@ -41,6 +43,7 @@ impl CreateCsrCmd {
cert_path: self.cert_path.clone(),
key_path: self.key_path.clone(),
csr_path: Some(self.csr_path.clone()),
bridge_location: self.bridge_location,
};

create_cmd.create_certificate_signing_request(config)
Expand Down Expand Up @@ -70,6 +73,7 @@ mod tests {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
csr_path: csr_path.clone(),
bridge_location: BridgeLocation::Mosquitto,
};

assert_matches!(
Expand All @@ -94,6 +98,7 @@ mod tests {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

// create private key and public cert with standard command
Expand All @@ -112,6 +117,7 @@ mod tests {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
csr_path: csr_path.clone(),
bridge_location: BridgeLocation::Mosquitto,
};

// create csr using existing private key and device_id from public cert
Expand Down
5 changes: 5 additions & 0 deletions crates/core/tedge/src/cli/certificate/renew.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::create::cn_of_self_signed_certificate;
use super::error::CertError;
use crate::bridge::BridgeLocation;
use crate::command::Command;
use crate::CreateCertCmd;
use camino::Utf8PathBuf;
Expand All @@ -8,6 +9,7 @@ use certificate::NewCertificateConfig;
pub struct RenewCertCmd {
pub cert_path: Utf8PathBuf,
pub key_path: Utf8PathBuf,
pub bridge_location: BridgeLocation,
}

impl Command for RenewCertCmd {
Expand Down Expand Up @@ -37,6 +39,7 @@ impl RenewCertCmd {
cert_path: self.cert_path.clone(),
key_path: self.key_path.clone(),
csr_path: None,
bridge_location: self.bridge_location,
};

create_cmd.renew_test_certificate(config)
Expand All @@ -62,6 +65,7 @@ mod tests {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
csr_path: None,
bridge_location: BridgeLocation::Mosquitto,
};

// First create both cert and key
Expand All @@ -80,6 +84,7 @@ mod tests {
let cmd = RenewCertCmd {
cert_path: cert_path.clone(),
key_path: key_path.clone(),
bridge_location: BridgeLocation::Mosquitto,
};
cmd.renew_test_certificate(&NewCertificateConfig::default())
.unwrap();
Expand Down
35 changes: 19 additions & 16 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ pub fn bridge_config(
Ok(BridgeConfig::from(params))
}
Cloud::C8y => {
let bridge_location = match config.c8y.bridge.built_in {
let bridge_location = match config.mqtt.bridge.built_in {
true => BridgeLocation::BuiltIn,
false => BridgeLocation::Mosquitto,
};
Expand Down Expand Up @@ -459,12 +459,6 @@ fn new_bridge(
config_location: &TEdgeConfigLocation,
device_type: &str,
) -> Result<(), ConnectError> {
if bridge_config.bridge_location == BridgeLocation::BuiltIn {
println!("Deleting mosquitto bridge configuration in favour of built-in bridge");
clean_up(config_location, bridge_config)?;
restart_mosquitto(bridge_config, service_manager, config_location)?;
return Ok(());
}
println!("Checking if {} is available.\n", service_manager.name());
let service_manager_result = service_manager.check_operational();

Expand All @@ -477,8 +471,10 @@ fn new_bridge(
);
}

println!("Checking if configuration for requested bridge already exists.\n");
bridge_config_exists(config_location, bridge_config)?;
if bridge_config.bridge_location == BridgeLocation::Mosquitto {
println!("Checking if configuration for requested bridge already exists.\n");
bridge_config_exists(config_location, bridge_config)?;
}

println!("Validating the bridge certificates.\n");
bridge_config.validate()?;
Expand All @@ -488,13 +484,20 @@ fn new_bridge(
c8y_direct_connection::create_device_with_direct_connection(bridge_config, device_type)?;
}

println!("Saving configuration for requested bridge.\n");
if let Err(err) =
write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config)
{
// We want to preserve previous errors and therefore discard result of this function.
let _ = clean_up(config_location, bridge_config);
return Err(err);
if bridge_config.bridge_location == BridgeLocation::Mosquitto {
println!("Saving configuration for requested bridge.\n");

if let Err(err) =
write_bridge_config_to_file(config_location, bridge_config, common_mosquitto_config)
{
// We want to preserve previous errors and therefore discard result of this function.
let _ = clean_up(config_location, bridge_config);
return Err(err);
}
} else {
println!("Deleting mosquitto bridge configuration in favour of built-in bridge");
clean_up(config_location, bridge_config)?;
restart_mosquitto(bridge_config, service_manager, config_location)?;
}

if let Err(err) = service_manager_result {
Expand Down
6 changes: 6 additions & 0 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,12 @@ impl EntityTopicId {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

// FIXME: can also match "device/bridge//" or "/device/main/service/my_custom_bridge"
// should match ONLY the single mapper bridge
pub fn is_bridge_health_topic(&self) -> bool {
self.as_str().contains("bridge")
}
Comment on lines +394 to +398
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR might be the opportunity to resolve this FIXME.

}

/// Contains a topic id of the service itself and the associated device.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl TEdgeComponent for CumulocityMapper {

let mqtt_config = tedge_config.mqtt_config()?;
let c8y_mapper_config = C8yMapperConfig::from_tedge_config(cfg_dir, &tedge_config)?;
if tedge_config.c8y.bridge.built_in {
if tedge_config.mqtt.bridge.built_in {
let custom_topics = tedge_config
.c8y
.smartrest
Expand Down
Loading