Skip to content

Commit

Permalink
Add address/port config settings for crates using MQTT clients (thin-…
Browse files Browse the repository at this point in the history
…edge#1789)

* Added impl TryFrom<&str> for IpAddress

Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>

* Add MQTT client address and port config options

As described in thin-edge#1773, config settings for addresses and port numbers
were made separate for MQTT clients and brokers, as often connect
and bind addresses are different. This change will help in scenarios
where the MQTT broker is located outside of the local network, e.g. when
using containers or when using a hostname.

Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>

* Make c8y plugins use client address/port settings

c8y_configuration_plugin, c8y_firmware_plugin, and c8y_log_plugin
didnt's use any MQTT address/host config settings, but were hardcoded to
default to localhost. Now they use MqttClientAddressSetting and
MqttClientPortSetting settings.

Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>

---------

Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>
  • Loading branch information
Bravo555 authored and albinsuresh committed Mar 13, 2023
1 parent e042ad1 commit a6173e8
Show file tree
Hide file tree
Showing 29 changed files with 229 additions and 76 deletions.
8 changes: 4 additions & 4 deletions crates/bin/c8y-device-management/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tedge_actors::NoConfig;
use tedge_actors::Runtime;
use tedge_config::get_tedge_config;
use tedge_config::ConfigSettingAccessor;
use tedge_config::MqttBindAddressSetting;
use tedge_config::MqttPortSetting;
use tedge_config::MqttClientHostSetting;
use tedge_config::MqttClientPortSetting;
use tedge_config::TEdgeConfig;
use tedge_config::TEdgeConfigError;
use tedge_config::DEFAULT_TEDGE_CONFIG_PATH;
Expand Down Expand Up @@ -87,8 +87,8 @@ async fn main() -> anyhow::Result<()> {
}

fn mqtt_config(tedge_config: &TEdgeConfig) -> Result<MqttConfig, TEdgeConfigError> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let mqtt_port = tedge_config.query(MqttClientPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttClientHostSetting)?;
let config = MqttConfig::default()
.with_host(mqtt_host)
.with_port(mqtt_port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@ pub enum ConfigSettingError {
key: &'static str,
message: &'static str,
},

#[error("An error occurred: {msg}")]
Other { msg: &'static str },
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ impl Default for IpAddress {
}
}

impl TryFrom<&str> for IpAddress {
type Error = InvalidIpAddress;

fn try_from(input: &str) -> Result<Self, Self::Error> {
input
.parse::<IpAddr>()
.map_err(|_| InvalidIpAddress {
input: input.to_string(),
})
.map(IpAddress)
}
}

impl TryFrom<String> for IpAddress {
type Error = InvalidIpAddress;

Expand Down Expand Up @@ -63,22 +76,19 @@ mod tests {

#[test]
fn conversion_from_valid_ipv4_succeeds() {
let _loh: IpAddress = IpAddress::try_from("127.0.0.1".to_string()).unwrap();
let _loh: IpAddress = IpAddress::try_from("127.0.0.1").unwrap();
assert_matches!(Ipv4Addr::LOCALHOST, _loh);
}

#[test]
fn conversion_from_valid_ipv6_succeeds() {
let _loh: IpAddress = IpAddress::try_from("::1".to_string()).unwrap();
let _loh: IpAddress = IpAddress::try_from("::1").unwrap();
assert_matches!(Ipv6Addr::LOCALHOST, _loh);
}

#[test]
fn conversion_from_longer_integer_fails() {
assert_matches!(
IpAddress::try_from("66000".to_string()),
Err(InvalidIpAddress { .. })
);
assert_matches!(IpAddress::try_from("66000"), Err(InvalidIpAddress { .. }));
}

#[test]
Expand Down
28 changes: 28 additions & 0 deletions crates/common/tedge_config/src/tedge_config_cli/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ impl ConfigSetting for AwsRootCertPathSetting {
type Value = FilePath;
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct MqttClientHostSetting;

impl ConfigSetting for MqttClientHostSetting {
const KEY: &'static str = "mqtt.client.host";

const DESCRIPTION: &'static str = concat!(
"Mqtt broker address, which is used by the mqtt clients to publish or subscribe.",
"Example: 127.0.0.1"
);

type Value = String;
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct MqttClientPortSetting;

impl ConfigSetting for MqttClientPortSetting {
const KEY: &'static str = "mqtt.client.port";

const DESCRIPTION: &'static str = concat!(
"Mqtt broker port, which is used by the mqtt clients to publish or subscribe.",
"Example: 1883"
);

type Value = Port;
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct MqttPortSetting;

Expand Down
51 changes: 51 additions & 0 deletions crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use certificate::CertificateError;
use certificate::PemCertificate;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::num::NonZeroU16;

/// loads tedge config from system default
pub fn get_tedge_config() -> Result<TEdgeConfig, TEdgeConfigError> {
Expand Down Expand Up @@ -364,6 +365,56 @@ impl ConfigSettingAccessor<C8yRootCertPathSetting> for TEdgeConfig {
}
}

impl ConfigSettingAccessor<MqttClientHostSetting> for TEdgeConfig {
fn query(&self, _setting: MqttClientHostSetting) -> ConfigSettingResult<String> {
Ok(self
.data
.mqtt
.client_host
.clone()
.unwrap_or(self.config_defaults.default_mqtt_client_host.clone()))
}

fn update(
&mut self,
_setting: MqttClientHostSetting,
value: String,
) -> ConfigSettingResult<()> {
self.data.mqtt.client_host = Some(value);
Ok(())
}

fn unset(&mut self, _setting: MqttClientHostSetting) -> ConfigSettingResult<()> {
self.data.mqtt.client_host = None;
Ok(())
}
}

impl ConfigSettingAccessor<MqttClientPortSetting> for TEdgeConfig {
fn query(&self, _setting: MqttClientPortSetting) -> ConfigSettingResult<Port> {
Ok(self
.data
.mqtt
.client_port
.map(|p| Port(p.into()))
.unwrap_or_else(|| self.config_defaults.default_mqtt_port))
}

fn update(&mut self, _setting: MqttClientPortSetting, value: Port) -> ConfigSettingResult<()> {
let port: u16 = value.into();
let port: NonZeroU16 = port.try_into().map_err(|_| ConfigSettingError::Other {
msg: "Can't use 0 for a client port",
})?;
self.data.mqtt.client_port = Some(port);
Ok(())
}

fn unset(&mut self, _setting: MqttClientPortSetting) -> ConfigSettingResult<()> {
self.data.mqtt.client_port = None;
Ok(())
}
}

impl ConfigSettingAccessor<MqttPortSetting> for TEdgeConfig {
fn query(&self, _setting: MqttPortSetting) -> ConfigSettingResult<Port> {
Ok(self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct TEdgeConfigDefaults {
/// Default mqtt bind address
pub default_mqtt_bind_address: IpAddress,

/// Default mqtt broker host used by mqtt clients
pub default_mqtt_client_host: String,

/// Default http bind address
pub default_http_bind_address: IpAddress,

Expand Down Expand Up @@ -108,6 +111,7 @@ impl From<&TEdgeConfigLocation> for TEdgeConfigDefaults {
default_logs_path: logs_path.into(),
default_run_path: run_path.into(),
default_device_type: DEFAULT_DEVICE_TYPE.into(),
default_mqtt_client_host: "localhost".into(),
default_mqtt_bind_address: IpAddress::default(),
default_http_bind_address: IpAddress::default(),
default_c8y_smartrest_templates: TemplatesSet::default(),
Expand Down Expand Up @@ -142,6 +146,7 @@ fn test_from_tedge_config_location() {
default_logs_path: FilePath::from("/var/log"),
default_run_path: FilePath::from("/run"),
default_device_type: DEFAULT_DEVICE_TYPE.into(),
default_mqtt_client_host: "localhost".to_string(),
default_mqtt_bind_address: IpAddress::default(),
default_http_bind_address: IpAddress::default(),
default_c8y_smartrest_templates: TemplatesSet::default(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Crate-private plain-old data-type used for serialization.
use std::num::NonZeroU16;

use crate::*;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -112,6 +114,17 @@ pub(crate) struct AwsConfigDto {
#[serde(deny_unknown_fields)]
pub(crate) struct MqttConfigDto {
pub(crate) port: Option<u16>,

pub(crate) client_host: Option<String>,

// When connecting to a host, port 0 is invalid. When binding, however, port 0 is accepted and
// understood by the system to dynamically assign any free port to the process. The process then
// needs to take notice of what port it received, which I'm not sure if we're doing. If we don't
// want to allow binding to port 0, then we can also use `NonZeroU16` there as well, which
// because it can never be 0, can make the `Option` completely free, because Option can use 0x0000
// value for the `None` variant.
pub(crate) client_port: Option<NonZeroU16>,

pub(crate) bind_address: Option<IpAddress>,
pub(crate) external_port: Option<u16>,
pub(crate) external_bind_address: Option<IpAddress>,
Expand Down
9 changes: 5 additions & 4 deletions crates/common/tedge_config/tests/test_tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ path = "/some/value"

assert_eq!(
config.query(MqttExternalBindAddressSetting)?,
IpAddress::try_from("0.0.0.0".to_string()).unwrap()
IpAddress::try_from("0.0.0.0").unwrap()
);

assert_eq!(
Expand All @@ -106,7 +106,7 @@ path = "/some/value"

assert_eq!(
config.query(MqttBindAddressSetting)?,
IpAddress::try_from("0.0.0.0".to_string()).unwrap()
IpAddress::try_from("0.0.0.0").unwrap()
);

Ok(())
Expand Down Expand Up @@ -323,7 +323,7 @@ fn test_parse_config_with_only_device_configuration() -> Result<(), TEdgeConfigE
assert_eq!(config.query(MqttPortSetting)?, Port(1883));
assert_eq!(
config.query(MqttBindAddressSetting)?,
IpAddress::try_from("127.0.0.1".to_string()).unwrap()
IpAddress::try_from("127.0.0.1").unwrap()
);
Ok(())
}
Expand Down Expand Up @@ -477,7 +477,7 @@ bind_address = "1.2.3.4"
assert_eq!(config.query(MqttPortSetting)?, Port(2222));
assert_eq!(
config.query(MqttBindAddressSetting)?,
IpAddress::try_from("1.2.3.4".to_string()).unwrap()
IpAddress::try_from("1.2.3.4").unwrap()
);
Ok(())
}
Expand Down Expand Up @@ -882,6 +882,7 @@ fn dummy_tedge_config_defaults() -> TEdgeConfigDefaults {
default_logs_path: FilePath::from("/var/log"),
default_run_path: FilePath::from("/run"),
default_device_type: String::from("test"),
default_mqtt_client_host: "localhost".to_string(),
default_mqtt_bind_address: IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)),
default_http_bind_address: IpAddress(IpAddr::V4(Ipv4Addr::LOCALHOST)),
default_c8y_smartrest_templates: TemplatesSet::default(),
Expand Down
8 changes: 4 additions & 4 deletions crates/core/c8y_api/src/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tedge_config::C8yUrlSetting;
use tedge_config::ConfigSettingAccessor;
use tedge_config::ConfigSettingAccessorStringExt;
use tedge_config::DeviceIdSetting;
use tedge_config::MqttBindAddressSetting;
use tedge_config::MqttPortSetting;
use tedge_config::MqttClientHostSetting;
use tedge_config::MqttClientPortSetting;
use tedge_config::TEdgeConfig;
use time::OffsetDateTime;

Expand Down Expand Up @@ -186,8 +186,8 @@ pub struct C8yMqttJwtTokenRetriever {

impl C8yMqttJwtTokenRetriever {
pub async fn try_new(tedge_config: &TEdgeConfig) -> Result<Self, SMCumulocityMapperError> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
let mqtt_port = tedge_config.query(MqttClientPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttClientHostSetting)?;
let topic = TopicFilter::new("c8y/s/dat")?;
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
Expand Down
2 changes: 2 additions & 0 deletions crates/core/tedge/src/cli/config/config_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ impl ConfigKey {
config_key!(AzureMapperTimestamp),
config_key!(MqttBindAddressSetting),
config_key!(HttpBindAddressSetting),
config_key!(MqttClientHostSetting),
config_key!(MqttClientPortSetting),
config_key!(MqttPortSetting),
config_key!(HttpPortSetting),
config_key!(MqttExternalPortSetting),
Expand Down
12 changes: 6 additions & 6 deletions crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ impl Command for ConnectCommand {
if let Cloud::C8y = self.cloud {
check_connected_c8y_tenant_as_configured(
&config.query_string(C8yUrlSetting)?,
config.query(MqttPortSetting)?.into(),
config.query(MqttBindAddressSetting)?.to_string(),
config.query(MqttClientPortSetting)?.into(),
config.query(MqttClientHostSetting)?,
);
enable_software_management(&bridge_config, self.service_manager.as_ref());
}
Expand Down Expand Up @@ -210,8 +210,8 @@ impl ConnectCommand {
}

fn check_connection(&self, config: &TEdgeConfig) -> Result<DeviceStatus, ConnectError> {
let port = config.query(MqttPortSetting)?.into();
let host = config.query(MqttBindAddressSetting)?.to_string();
let port = config.query(MqttClientPortSetting)?.into();
let host = config.query(MqttClientHostSetting)?;

println!(
"Sending packets to check connection. This may take up to {} seconds.\n",
Expand Down Expand Up @@ -257,8 +257,8 @@ fn check_device_status_c8y(tedge_config: &TEdgeConfig) -> Result<DeviceStatus, C

let mut options = MqttOptions::new(
CLIENT_ID,
tedge_config.query(MqttBindAddressSetting)?.to_string(),
tedge_config.query(MqttPortSetting)?.into(),
tedge_config.query(MqttClientHostSetting)?,
tedge_config.query(MqttClientPortSetting)?.into(),
);

options.set_keep_alive(RESPONSE_TIMEOUT);
Expand Down
11 changes: 7 additions & 4 deletions crates/core/tedge/src/cli/mqtt/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ pub enum TEdgeMqttCli {

impl BuildCommand for TEdgeMqttCli {
fn build_command(self, context: BuildContext) -> Result<Box<dyn Command>, crate::ConfigError> {
let port = context.config_repository.load()?.query(MqttPortSetting)?;
let port = context
.config_repository
.load()?
.query(MqttClientPortSetting)?;
let host = context
.config_repository
.load()?
.query(MqttBindAddressSetting)?;
.query(MqttClientHostSetting)?;
let cmd = {
match self {
TEdgeMqttCli::Pub {
Expand All @@ -56,7 +59,7 @@ impl BuildCommand for TEdgeMqttCli {
qos,
retain,
} => MqttPublishCommand {
host: host.to_string(),
host,
port: port.into(),
topic,
message,
Expand All @@ -71,7 +74,7 @@ impl BuildCommand for TEdgeMqttCli {
qos,
hide_topic,
} => MqttSubscribeCommand {
host: host.to_string(),
host,
port: port.into(),
topic,
qos,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge/tests/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod tests {

fn make_config(port: u16) -> Result<tempfile::TempDir, anyhow::Error> {
let dir = tempfile::TempDir::new().unwrap();
let toml_conf = &format!("[mqtt]\nport = {port}");
let toml_conf = &format!("[mqtt]\nclient_port = {port}");

let config_location = TEdgeConfigLocation::from_custom_root(dir.path());
let mut file = std::fs::File::create(config_location.tedge_config_file_path())?;
Expand Down
Loading

0 comments on commit a6173e8

Please sign in to comment.