Skip to content

Commit

Permalink
fixup! Add MQTT server authentication
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>
  • Loading branch information
Bravo555 committed Mar 27, 2023
1 parent 961929d commit 598b1d8
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 113 deletions.
73 changes: 45 additions & 28 deletions crates/common/certificate/src/parse_root_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use rustls::RootCertStore;
use rustls_pemfile::certs;
use rustls_pemfile::pkcs8_private_keys;
use rustls_pemfile::rsa_private_keys;
use std::ffi::OsString;
use std::fs;
use std::fs::File;
use std::io::BufReader;
Expand All @@ -18,7 +19,7 @@ pub fn create_tls_config(
client_private_key: PathBuf,
client_certificate: PathBuf,
) -> Result<ClientConfig, CertificateError> {
let root_cert_store = new_root_store(root_certificates)?;
let root_cert_store = new_root_store(&root_certificates)?;
let pvt_key = read_pvt_key(client_private_key)?;
let cert_chain = read_cert_chain(client_certificate)?;

Expand All @@ -28,46 +29,65 @@ pub fn create_tls_config(
.with_single_cert(cert_chain, pvt_key)?)
}

pub fn create_tls_config_from_single_ca(
ca_file: impl AsRef<Path>,
) -> Result<ClientConfig, CertificateError> {
let mut root_store = RootCertStore::empty();
let cert_chain = read_cert_chain(ca_file)?;
for cert in &cert_chain {
pub fn add_certs_from_file(
root_store: &mut RootCertStore,
cert_file: impl AsRef<Path>,
) -> Result<(), CertificateError> {
let cert_chain = read_cert_chain(cert_file)?;
for cert in cert_chain {
root_store
.add(cert)
.add(&cert)
.map_err(|_| CertificateError::RootStoreAdd)?;
}

let config = ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_store)
.with_no_client_auth();
Ok(())
}

Ok(config)
pub fn add_certs_from_directory(
root_store: &mut RootCertStore,
cert_dir: impl AsRef<Path>,
) -> Result<(), CertificateError> {
let files = fs::read_dir(cert_dir)?;
let certs = files.filter_map(|f| f.ok()).filter(|file| {
file.path()
.extension()
.filter(|&extension| {
["pem", "cer", "crt"]
.map(OsString::from)
.iter()
.any(|e| e == extension)
})
.is_some()
});

for cert_file in certs {
add_certs_from_file(root_store, cert_file.path())?;
}

Ok(())
}

fn new_root_store(cert_path: PathBuf) -> Result<RootCertStore, CertificateError> {
fn new_root_store(cert_path: &Path) -> Result<RootCertStore, CertificateError> {
let mut root_store = RootCertStore::empty();
rec_add_root_cert(&mut root_store, cert_path);
Ok(root_store)
}

fn rec_add_root_cert(root_store: &mut RootCertStore, cert_path: PathBuf) {
if let Err(err) = try_rec_add_root_cert(root_store, cert_path.clone()) {
fn rec_add_root_cert(root_store: &mut RootCertStore, cert_path: &Path) {
if let Err(err) = try_rec_add_root_cert(root_store, cert_path) {
eprintln!("Ignoring certificates in {:?} due to: {}", cert_path, err)
}
}

fn try_rec_add_root_cert(
root_store: &mut RootCertStore,
cert_path: PathBuf,
cert_path: &Path,
) -> Result<(), CertificateError> {
if fs::metadata(&cert_path)?.is_dir() {
if fs::metadata(cert_path)?.is_dir() {
for file_entry in fs::read_dir(cert_path)?.flatten() {
rec_add_root_cert(root_store, file_entry.path());
rec_add_root_cert(root_store, &file_entry.path());
}
} else if let Err(err) = add_root_cert(root_store, cert_path.clone()) {
} else if let Err(err) = add_root_cert(root_store, cert_path) {
eprintln!(
"Ignoring certificates in file {:?} due to: {}",
cert_path, err
Expand All @@ -76,11 +96,8 @@ fn try_rec_add_root_cert(
Ok(())
}

fn add_root_cert(
root_store: &mut RootCertStore,
cert_path: PathBuf,
) -> Result<(), CertificateError> {
let certificates = read_cert_chain(cert_path.clone())?;
fn add_root_cert(root_store: &mut RootCertStore, cert_path: &Path) -> Result<(), CertificateError> {
let certificates = read_cert_chain(cert_path)?;
for certificate in certificates.iter() {
if let Err(err) = root_store.add(certificate) {
eprintln!(
Expand Down Expand Up @@ -187,7 +204,7 @@ mod tests {
fn an_empty_directory_contains_no_root_certificate() {
let temp_dir = TempDir::new().unwrap();

let root_certs = new_root_store(temp_dir.into_path()).unwrap();
let root_certs = new_root_store(temp_dir.path()).unwrap();
assert!(root_certs.is_empty());
}

Expand All @@ -207,7 +224,7 @@ mod tests {
.write_all(include_str!("./test_root_cert_2.txt").as_bytes())
.unwrap();

let root_certs = new_root_store(temp_dir.path().to_path_buf()).unwrap();
let root_certs = new_root_store(temp_dir.path()).unwrap();
assert_eq!(root_certs.len(), 3);
}

Expand All @@ -228,7 +245,7 @@ mod tests {
.write_all(include_str!("./test_root_cert_2.txt").as_bytes())
.unwrap();

let root_certs = new_root_store(temp_dir.path().to_path_buf()).unwrap();
let root_certs = new_root_store(temp_dir.path()).unwrap();
assert_eq!(root_certs.len(), 3);
}
}
48 changes: 36 additions & 12 deletions crates/common/mqtt_channel/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::Message;
use crate::TopicFilter;
use certificate::parse_root_certificate;
use rumqttc::tokio_rustls::rustls::ClientConfig;
use rumqttc::tokio_rustls::rustls;
use rumqttc::LastWill;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::path::PathBuf;
use std::path::Path;
use std::sync::Arc;

/// Configuration of an MQTT connection
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct Config {
pub initial_message: Option<InitMessageFn>,

/// TLS configuration used to connect to the broker.
pub tls_config: Option<ClientConfig>,
pub cert_store: Option<rustls::RootCertStore>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Default for Config {
max_packet_size: 1024 * 1024,
last_will_message: None,
initial_message: None,
tls_config: None,
cert_store: None,
}
}
}
Expand Down Expand Up @@ -187,11 +187,31 @@ impl Config {
}
}

/// Set CA cert file
pub fn with_cafile(self, ca_file: PathBuf) -> Result<Self, certificate::CertificateError> {
let tls_config = parse_root_certificate::create_tls_config_from_single_ca(ca_file)?;
/// Adds all certificates present in `ca_file` file to the trust store.
pub fn with_cafile(
self,
ca_file: impl AsRef<Path>,
) -> Result<Self, certificate::CertificateError> {
let mut cert_store = self.cert_store.unwrap_or_else(rustls::RootCertStore::empty);
parse_root_certificate::add_certs_from_file(&mut cert_store, ca_file)?;

Ok(Self {
tls_config: Some(tls_config),
cert_store: Some(cert_store),
..self
})
}

/// Adds all certificate from all files in the directory `ca_dir` to the
/// trust store.
pub fn with_cadir(
self,
ca_dir: impl AsRef<Path>,
) -> Result<Self, certificate::CertificateError> {
let mut cert_store = self.cert_store.unwrap_or_else(rustls::RootCertStore::empty);
parse_root_certificate::add_certs_from_directory(&mut cert_store, ca_dir)?;

Ok(Self {
cert_store: Some(cert_store),
..self
})
}
Expand All @@ -213,10 +233,14 @@ impl Config {
} else {
mqtt_options.set_clean_session(self.clean_session);
}
if let Some(tls_config) = self.tls_config.as_ref() {
mqtt_options.set_transport(rumqttc::Transport::tls_with_config(
tls_config.clone().into(),
));

if let Some(cert_store) = self.cert_store.as_ref() {
let tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(cert_store.clone())
.with_no_client_auth();

mqtt_options.set_transport(rumqttc::Transport::tls_with_config(tls_config.into()));
}

mqtt_options.set_max_packet_size(self.max_packet_size, self.max_packet_size);
Expand Down
4 changes: 2 additions & 2 deletions crates/common/mqtt_channel/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ impl Connection {
const INSECURE_MQTT_PORT: u16 = 1883;
const SECURE_MQTT_PORT: u16 = 8883;

if config.port == INSECURE_MQTT_PORT && config.tls_config.is_some() {
if config.port == INSECURE_MQTT_PORT && config.cert_store.is_some() {
eprintln!("WARNING: Connecting on port 1883 for insecure MQTT using a TLS connection");
}
if config.port == SECURE_MQTT_PORT && config.tls_config.is_none() {
if config.port == SECURE_MQTT_PORT && config.cert_store.is_none() {
eprintln!("WARNING: Connecting on port 8883 for secure MQTT without a CA file");
}

Expand Down
16 changes: 15 additions & 1 deletion crates/common/tedge_config/src/tedge_config_cli/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,21 @@ impl ConfigSetting for MqttClientPortSetting {
pub struct MqttClientCafileSetting;

impl ConfigSetting for MqttClientCafileSetting {
const KEY: &'static str = "mqtt.client.cafile";
const KEY: &'static str = "mqtt.client.ca_file";

const DESCRIPTION: &'static str = concat!(
"Path to the CA certificate used by MQTT clients to use when ",
"authenticating the MQTT broker."
);

type Value = FilePath;
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct MqttClientCapathSetting;

impl ConfigSetting for MqttClientCapathSetting {
const KEY: &'static str = "mqtt.client.ca_path";

const DESCRIPTION: &'static str = concat!(
"Path to the CA certificate used by MQTT clients to use when ",
Expand Down
36 changes: 31 additions & 5 deletions crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,24 +419,50 @@ impl ConfigSettingAccessor<MqttClientCafileSetting> for TEdgeConfig {
fn query(&self, _setting: MqttClientCafileSetting) -> ConfigSettingResult<FilePath> {
self.data
.mqtt
.client_cafile
.client_ca_file
.clone()
.ok_or(ConfigSettingError::ConfigNotSet {
key: "mqtt.client.cafile",
key: "mqtt.client.ca_file",
})
}

fn update(
&mut self,
_setting: MqttClientCafileSetting,
cafile: FilePath,
ca_file: FilePath,
) -> ConfigSettingResult<()> {
self.data.mqtt.client_cafile = Some(cafile);
self.data.mqtt.client_ca_file = Some(ca_file);
Ok(())
}

fn unset(&mut self, _setting: MqttClientCafileSetting) -> ConfigSettingResult<()> {
self.data.mqtt.client_cafile = None;
self.data.mqtt.client_ca_file = None;
Ok(())
}
}

impl ConfigSettingAccessor<MqttClientCapathSetting> for TEdgeConfig {
fn query(&self, _setting: MqttClientCapathSetting) -> ConfigSettingResult<FilePath> {
self.data
.mqtt
.client_ca_path
.clone()
.ok_or(ConfigSettingError::ConfigNotSet {
key: "mqtt.client.ca_path",
})
}

fn update(
&mut self,
_setting: MqttClientCapathSetting,
cafile: FilePath,
) -> ConfigSettingResult<()> {
self.data.mqtt.client_ca_path = Some(cafile);
Ok(())
}

fn unset(&mut self, _setting: MqttClientCapathSetting) -> ConfigSettingResult<()> {
self.data.mqtt.client_ca_path = None;
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,13 @@ pub(crate) struct MqttConfigDto {
// `None` variant.
pub(crate) client_port: Option<NonZeroU16>,

/// Path to the CA certificate used by MQTT clients to use when
/// Path to the trusted CA certificate file used by MQTT clients when
/// authenticating the MQTT broker.
pub(crate) client_cafile: Option<FilePath>,
pub(crate) client_ca_file: Option<FilePath>,

/// Path to the directory containing trusted CA certificates used by MQTT
/// clients when authenticating the MQTT broker.
pub(crate) client_ca_path: Option<FilePath>,

pub(crate) bind_address: Option<IpAddress>,
pub(crate) external_port: Option<u16>,
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge/src/cli/config/config_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl ConfigKey {
config_key!(MqttClientHostSetting),
config_key!(MqttClientPortSetting),
config_key!(MqttClientCafileSetting),
config_key!(MqttClientCapathSetting),
config_key!(MqttPortSetting),
config_key!(HttpPortSetting),
config_key!(MqttExternalPortSetting),
Expand Down
7 changes: 7 additions & 0 deletions crates/core/tedge/src/cli/mqtt/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ impl BuildCommand for TEdgeMqttCli {
.load()?
.query(MqttClientCafileSetting)
.ok();
let ca_path = context
.config_repository
.load()?
.query(MqttClientCapathSetting)
.ok();

let cmd = {
match self {
Expand All @@ -74,6 +79,7 @@ impl BuildCommand for TEdgeMqttCli {
disconnect_timeout: DISCONNECT_TIMEOUT,
retain,
ca_file,
ca_path,
}
.into_boxed(),
TEdgeMqttCli::Sub {
Expand All @@ -88,6 +94,7 @@ impl BuildCommand for TEdgeMqttCli {
hide_topic,
client_id: format!("{}-{}", SUB_CLIENT_PREFIX, std::process::id()),
ca_file,
ca_path,
}
.into_boxed(),
}
Expand Down
Loading

0 comments on commit 598b1d8

Please sign in to comment.