Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for built-in bridge to tedge-mapper-c8y #2716

Merged
merged 24 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
59a057f
Create PoC for bridge connection in tedge-mapper-c8y
jarhodes314 Feb 15, 2024
169f41e
Add support for the mapper to control topic subscriptions in the bridge
jarhodes314 Feb 26, 2024
8946fd3
Control bridge prefix
jarhodes314 Feb 26, 2024
66c10bc
Skip re-logging already observed error messages
jarhodes314 Feb 27, 2024
466e179
Make topic prefix configurable in mapper
jarhodes314 Feb 27, 2024
7fa43a7
Merge branch 'main' into 2592-mapper-bridge
jarhodes314 Feb 28, 2024
556a661
Add configuration to enable mapper bridge
jarhodes314 Mar 7, 2024
392cfcf
Fix unit tests following mapper bridge changes
jarhodes314 Mar 7, 2024
04890b6
Merge branch 'main' into 2592-mapper-bridge
jarhodes314 Mar 7, 2024
618a4f8
Run formatter
jarhodes314 Mar 7, 2024
ce44a03
Remove unused deps
jarhodes314 Mar 8, 2024
397dac9
Apply review suggestions
jarhodes314 Mar 8, 2024
335ac91
Finish doc comment
jarhodes314 Mar 8, 2024
a6db6bc
Apply more review suggestions
jarhodes314 Mar 11, 2024
30e2434
Only log error when message changes
jarhodes314 Mar 12, 2024
172caab
Allow `tedge connect c8y --test` to work with internal bridge
jarhodes314 Mar 12, 2024
a36f0b2
Log connected only on connack
jarhodes314 Mar 12, 2024
3c91d10
Add (some) mapper-bridge support for authenticated MQTT connections t…
jarhodes314 Mar 15, 2024
c471aa8
Ensure `tedge reconnect c8y` migrates configuration successfully for
jarhodes314 Mar 15, 2024
8c6a13e
Allow events topic to vary based on c8y prefix
jarhodes314 Mar 15, 2024
cbbafc1
Merge branch 'main' into 2592-mapper-bridge
jarhodes314 Mar 15, 2024
eaad4bd
Restore formatting of test case following accidental clobbering in 46…
jarhodes314 Mar 15, 2024
dcd4329
Add operation test and connectivity check in telemetry test
jarhodes314 Mar 15, 2024
9203378
Merge branch 'main' into 2592-mapper-bridge
jarhodes314 Mar 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ tedge_file_system_ext = { path = "crates/extensions/tedge_file_system_ext" }
tedge_health_ext = { path = "crates/extensions/tedge_health_ext" }
tedge_http_ext = { path = "crates/extensions/tedge_http_ext" }
tedge_log_manager = { path = "crates/extensions/tedge_log_manager" }
tedge_mqtt_bridge = { path = "crates/extensions/tedge_mqtt_bridge" }
tedge_mqtt_ext = { path = "crates/extensions/tedge_mqtt_ext" }
tedge_script_ext = { path = "crates/extensions/tedge_script_ext" }
tedge_signal_ext = { path = "crates/extensions/tedge_signal_ext" }
Expand Down
75 changes: 73 additions & 2 deletions crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@ use camino::Utf8PathBuf;
use certificate::CertificateError;
use certificate::PemCertificate;
use doku::Document;
use doku::Type;
use once_cell::sync::Lazy;
use serde::Deserialize;
use std::borrow::Cow;
use std::convert::Infallible;
use std::fmt;
use std::fmt::Formatter;
use std::io::Read;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::num::NonZeroU16;
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use tedge_config_macros::all_or_nothing;
use tedge_config_macros::define_tedge_config;
Expand All @@ -42,6 +49,7 @@ impl<T> OptionalConfigError<T> for OptionalConfig<T> {
}
}

#[derive(Clone)]
pub struct TEdgeConfig(TEdgeConfigReader);

impl std::ops::Deref for TEdgeConfig {
Expand Down Expand Up @@ -443,7 +451,11 @@ define_tedge_config! {
#[tedge_config(note = "If set to 'auto', this cleans the local session accordingly the detected version of mosquitto.")]
#[tedge_config(example = "auto", default(variable = "AutoFlag::Auto"))]
local_cleansession: AutoFlag,
}
},

// TODO validation
#[tedge_config(example = "c8y", default(value = "c8y"))]
topic_prefix: TopicPrefix,
},

entity_store: {
Expand Down Expand Up @@ -797,6 +809,64 @@ define_tedge_config! {

}

// TODO doc comment
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, serde::Serialize)]
#[serde(from = "String", into = "Arc<str>")]
pub struct TopicPrefix(Arc<str>);

impl Document for TopicPrefix {
fn ty() -> Type {
String::ty()
}
}

// TODO actual validation
// TODO make sure we don't allow c8y-internal either, or az, or aws as those are all used
impl From<String> for TopicPrefix {
fn from(value: String) -> Self {
Self(value.into())
}
}

impl From<&str> for TopicPrefix {
fn from(value: &str) -> Self {
Self(value.into())
}
}

impl FromStr for TopicPrefix {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(s.into())
}
}

impl From<TopicPrefix> for Arc<str> {
fn from(value: TopicPrefix) -> Self {
value.0
}
}

// TODO is deref actually right here
impl Deref for TopicPrefix {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl TopicPrefix {
pub fn as_str(&self) -> &str {
&self.0
}
}

impl fmt::Display for TopicPrefix {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

fn default_http_bind_address(dto: &TEdgeConfigDto) -> IpAddr {
let external_address = dto.mqtt.external.bind.address;
external_address
Expand All @@ -816,7 +886,8 @@ fn device_id(reader: &TEdgeConfigReader) -> Result<String, ReadError> {
fn cert_error_into_config_error(key: &'static str, err: CertificateError) -> ReadError {
match &err {
CertificateError::IoError(io_err) => match io_err.kind() {
std::io::ErrorKind::NotFound => ReadError::ReadOnlyNotFound { key,
std::io::ErrorKind::NotFound => ReadError::ReadOnlyNotFound {
key,
message: concat!(
"The device id is read from the device certificate.\n",
"To set 'device.id' to some <id>, you can use `tedge cert create --device-id <id>`.",
Expand Down
20 changes: 14 additions & 6 deletions crates/core/c8y_api/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::collections::HashMap;
use std::time::Duration;
use tedge_config::mqtt_config::MqttConfigBuildError;
use tedge_config::TEdgeConfig;
use tedge_config::TopicPrefix;
use tracing::error;
use tracing::info;

Expand Down Expand Up @@ -112,33 +113,41 @@ impl C8yEndPoint {

pub struct C8yMqttJwtTokenRetriever {
mqtt_config: mqtt_channel::Config,
topic_prefix: TopicPrefix,
}

impl C8yMqttJwtTokenRetriever {
pub fn from_tedge_config(tedge_config: &TEdgeConfig) -> Result<Self, MqttConfigBuildError> {
let mqtt_config = tedge_config.mqtt_config()?;

Ok(Self::new(mqtt_config))
Ok(Self::new(
mqtt_config,
tedge_config.c8y.bridge.topic_prefix.clone(),
))
}

pub fn new(mqtt_config: mqtt_channel::Config) -> Self {
let topic = TopicFilter::new_unchecked("c8y/s/dat");
pub fn new(mqtt_config: mqtt_channel::Config, topic_prefix: TopicPrefix) -> Self {
let topic = TopicFilter::new_unchecked(&format!("{topic_prefix}/s/dat"));
let mqtt_config = mqtt_config
.with_no_session() // Ignore any already published tokens, possibly stale.
.with_subscriptions(topic);

C8yMqttJwtTokenRetriever { mqtt_config }
C8yMqttJwtTokenRetriever {
mqtt_config,
topic_prefix,
}
}

pub async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, JwtError> {
let mut mqtt_con = Connection::new(&self.mqtt_config).await?;
let pub_topic = format!("{}/s/uat", self.topic_prefix);

tokio::time::sleep(Duration::from_millis(20)).await;
for _ in 0..3 {
mqtt_con
.published
.publish(
mqtt_channel::Message::new(&Topic::new_unchecked("c8y/s/uat"), "".to_string())
mqtt_channel::Message::new(&Topic::new_unchecked(&pub_topic), "".to_string())
.with_qos(mqtt_channel::QoS::AtMostOnce),
)
.await?;
Expand Down Expand Up @@ -184,7 +193,6 @@ pub enum JwtError {

#[cfg(test)]
mod tests {

use super::*;
use test_case::test_case;

Expand Down
18 changes: 12 additions & 6 deletions crates/core/c8y_api/src/json_c8y_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@ use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::SoftwareModule;
use tedge_api::SoftwareModuleUpdate;
use tedge_api::SoftwareUpdateCommand;
use tedge_config::TopicPrefix;
use time::OffsetDateTime;

pub struct C8yDeviceControlTopic;

impl C8yDeviceControlTopic {
pub fn topic() -> Topic {
Topic::new_unchecked(Self::name())
pub fn topic(prefix: &TopicPrefix) -> Topic {
Topic::new_unchecked(&Self::name(prefix))
}

pub fn accept(topic: &Topic) -> bool {
topic.name.starts_with(Self::name())
pub fn accept(topic: &Topic, prefix: &TopicPrefix) -> bool {
topic.name.starts_with(&Self::name(prefix))
}

pub fn name() -> &'static str {
"c8y/devicecontrol/notifications"
pub fn name(prefix: &TopicPrefix) -> String {
format!("{prefix}/devicecontrol/notifications")
}
}

Expand Down Expand Up @@ -422,10 +423,15 @@ pub trait C8yDeviceControlOperationHelper {
}

impl C8yDeviceControlOperationHelper for C8yRestart {}

impl C8yDeviceControlOperationHelper for C8ySoftwareUpdate {}

impl C8yDeviceControlOperationHelper for C8yLogfileRequest {}

impl C8yDeviceControlOperationHelper for C8yUploadConfigFile {}

impl C8yDeviceControlOperationHelper for C8yDownloadConfigFile {}

impl C8yDeviceControlOperationHelper for C8yFirmware {}

#[derive(thiserror::Error, Debug)]
Expand Down
10 changes: 7 additions & 3 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use crate::smartrest::csv::fields_to_csv_string;
use crate::smartrest::topic::publish_topic_from_ancestors;
use mqtt_channel::Message;
use tedge_config::TopicPrefix;

use super::message::sanitize_for_smartrest;

Expand All @@ -23,6 +24,7 @@ pub fn child_device_creation_message(
device_name: Option<&str>,
device_type: Option<&str>,
ancestors: &[String],
prefix: &TopicPrefix,
) -> Result<Message, InvalidValueError> {
if child_id.is_empty() {
return Err(InvalidValueError {
Expand All @@ -44,7 +46,7 @@ pub fn child_device_creation_message(
}

Ok(Message::new(
&publish_topic_from_ancestors(ancestors),
&publish_topic_from_ancestors(ancestors, prefix),
// XXX: if any arguments contain commas, output will be wrong
format!(
"101,{},{},{}",
Expand All @@ -64,6 +66,7 @@ pub fn service_creation_message(
service_type: &str,
service_status: &str,
ancestors: &[String],
prefix: &TopicPrefix,
) -> Result<Message, InvalidValueError> {
// TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format
if service_id.is_empty() {
Expand Down Expand Up @@ -92,7 +95,7 @@ pub fn service_creation_message(
}

Ok(Message::new(
&publish_topic_from_ancestors(ancestors),
&publish_topic_from_ancestors(ancestors, prefix),
fields_to_csv_string(&[
"102",
service_id,
Expand All @@ -116,8 +119,9 @@ pub fn service_creation_message(
pub fn service_status_update_message(
external_ids: &[impl AsRef<str>],
service_status: &str,
prefix: &TopicPrefix,
) -> Message {
let topic = publish_topic_from_ancestors(external_ids);
let topic = publish_topic_from_ancestors(external_ids, prefix);

let service_status =
sanitize_for_smartrest(service_status, super::message::MAX_PAYLOAD_LIMIT_IN_BYTES);
Expand Down
17 changes: 9 additions & 8 deletions crates/core/c8y_api/src/smartrest/smartrest_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::ser::SerializeSeq;
use serde::Deserialize;
use serde::Serialize;
use serde::Serializer;
use tedge_config::TopicPrefix;
use tracing::warn;

pub type SmartRest = String;
Expand Down Expand Up @@ -204,20 +205,20 @@ where

/// Helper to generate a SmartREST operation status message
pub trait OperationStatusMessage {
fn executing() -> Message {
Self::create_message(Self::status_executing())
fn executing(prefix: &TopicPrefix) -> Message {
Self::create_message(Self::status_executing(), prefix)
}
didier-wenzek marked this conversation as resolved.
Show resolved Hide resolved

fn successful(parameter: Option<&str>) -> Message {
Self::create_message(Self::status_successful(parameter))
fn successful(parameter: Option<&str>, prefix: &TopicPrefix) -> Message {
Self::create_message(Self::status_successful(parameter), prefix)
}

fn failed(failure_reason: &str) -> Message {
Self::create_message(Self::status_failed(failure_reason))
fn failed(failure_reason: &str, prefix: &TopicPrefix) -> Message {
Self::create_message(Self::status_failed(failure_reason), prefix)
}

fn create_message(payload: SmartRest) -> Message {
let topic = C8yTopic::SmartRestResponse.to_topic().unwrap(); // never fail
fn create_message(payload: SmartRest, prefix: &TopicPrefix) -> Message {
let topic = C8yTopic::SmartRestResponse.to_topic(prefix).unwrap(); // never fail
Message::new(&topic, payload)
}

Expand Down
Loading
Loading