Skip to content

Commit

Permalink
Make c8y plugins use client address/port settings
Browse files Browse the repository at this point in the history
c8y_configuration_plugin, c8y_firmware_plugin, and c8y_log_plugin
didnt's use any MQTT address/host config settings, but were hardcoded to
default to localhost. Now they use MqttClientAddressSetting and
MqttClientPortSetting settings.

Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>
  • Loading branch information
Bravo555 committed Mar 6, 2023
1 parent c978bb9 commit adfcc49
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 4 deletions.
9 changes: 7 additions & 2 deletions plugins/c8y_configuration_plugin/src/config_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub enum ActiveOperationState {
impl ConfigManager {
pub async fn new(
tedge_device_id: impl ToString,
mqtt_host: String,
mqtt_port: u16,
http_client: Arc<Mutex<dyn C8YHttpProxy>>,
local_http_host: impl ToString,
Expand All @@ -76,7 +77,7 @@ impl ConfigManager {
let plugin_config =
PluginConfig::new(&config_file_dir.join(DEFAULT_PLUGIN_CONFIG_FILE_NAME));

let mqtt_client = Self::create_mqtt_client(mqtt_port).await?;
let mqtt_client = Self::create_mqtt_client(mqtt_host, mqtt_port).await?;

let c8y_request_topics: TopicFilter = C8yTopic::SmartRestRequest.into();
let health_check_topics = health_check_topics(DEFAULT_PLUGIN_CONFIG_TYPE);
Expand Down Expand Up @@ -203,7 +204,10 @@ impl ConfigManager {
}
}

async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, anyhow::Error> {
async fn create_mqtt_client(
mqtt_host: String,
mqtt_port: u16,
) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mut topic_filter =
mqtt_channel::TopicFilter::new_unchecked(&C8yTopic::SmartRestRequest.to_string());
topic_filter.add_all(health_check_topics(DEFAULT_PLUGIN_CONFIG_TYPE));
Expand All @@ -213,6 +217,7 @@ impl ConfigManager {

let mqtt_config = mqtt_channel::Config::default()
.with_session_name(DEFAULT_PLUGIN_CONFIG_TYPE)
.with_host(mqtt_host)
.with_port(mqtt_port)
.with_subscriptions(topic_filter)
.with_last_will_message(health_status_down_message(DEFAULT_PLUGIN_CONFIG_TYPE));
Expand Down
3 changes: 3 additions & 0 deletions plugins/c8y_configuration_plugin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tedge_config::ConfigSettingAccessor;
use tedge_config::DeviceIdSetting;
use tedge_config::HttpBindAddressSetting;
use tedge_config::HttpPortSetting;
use tedge_config::MqttClientHostSetting;
use tedge_config::MqttClientPortSetting;
use tedge_config::TEdgeConfig;
use tedge_config::TmpPathSetting;
Expand Down Expand Up @@ -106,6 +107,7 @@ async fn main() -> Result<(), anyhow::Error> {

let tedge_device_id = tedge_config.query(DeviceIdSetting)?;

let mqtt_host = tedge_config.query(MqttClientHostSetting)?;
let mqtt_port = tedge_config.query(MqttClientPortSetting)?.into();
let http_client = create_http_client(&tedge_config).await?;
let http_client: Arc<Mutex<dyn C8YHttpProxy>> = Arc::new(Mutex::new(http_client));
Expand All @@ -117,6 +119,7 @@ async fn main() -> Result<(), anyhow::Error> {

let mut config_manager = ConfigManager::new(
tedge_device_id,
mqtt_host,
mqtt_port,
http_client,
local_http_host,
Expand Down
14 changes: 14 additions & 0 deletions plugins/c8y_configuration_plugin/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ async fn test_handle_config_upload_request_tedge_device() -> anyhow::Result<()>

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -144,6 +145,7 @@ async fn test_handle_config_upload_request_child_device() -> anyhow::Result<()>

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -215,6 +217,7 @@ async fn test_handle_config_upload_executing_response_child_device() -> anyhow::

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -282,6 +285,7 @@ async fn test_handle_config_upload_failed_response_child_device() -> anyhow::Res

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -339,6 +343,7 @@ async fn test_invalid_config_snapshot_response_child_device() -> anyhow::Result<

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -403,6 +408,7 @@ async fn test_no_config_snapshot_response_child_device() -> anyhow::Result<()> {

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -485,6 +491,7 @@ async fn test_handle_config_upload_successful_response_child_device() -> anyhow:

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -559,6 +566,7 @@ async fn test_child_config_upload_successful_response_mapped_to_failed_without_u

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -644,6 +652,7 @@ async fn test_handle_config_update_request_tedge_device() -> anyhow::Result<()>

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -732,6 +741,7 @@ async fn test_handle_config_update_request_child_device() -> anyhow::Result<()>

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -817,6 +827,7 @@ async fn test_c8y_config_download_child_device_fail_on_broken_url() -> anyhow::R

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -881,6 +892,7 @@ async fn test_handle_config_update_successful_response_child_device() -> anyhow:

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -938,6 +950,7 @@ async fn test_invalid_config_update_response_child_device() -> anyhow::Result<()

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down Expand Up @@ -998,6 +1011,7 @@ async fn test_handle_multiline_config_upload_requests() -> anyhow::Result<()> {

let mut config_manager = ConfigManager::new(
tedge_device_id,
"localhost".to_string(),
broker.port,
Arc::new(Mutex::new(c8y_http_client)),
mockito::server_address().to_string(),
Expand Down
11 changes: 9 additions & 2 deletions plugins/c8y_firmware_plugin/src/firmware_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,19 @@ pub struct FirmwareManager {
}

impl FirmwareManager {
// TODO: merge some of the function arguments
#[allow(clippy::too_many_arguments)]
pub async fn new(
tedge_device_id: String,
mqtt_host: String,
mqtt_port: u16,
http_client: Box<dyn C8YHttpProxy>,
local_http_host: String,
persistent_dir: PathBuf,
tmp_dir: PathBuf,
timeout_sec: Duration,
) -> Result<Self, FirmwareManagementError> {
let mqtt_client = Self::create_mqtt_client(mqtt_port).await?;
let mqtt_client = Self::create_mqtt_client(mqtt_host, mqtt_port).await?;

let c8y_request_topics = C8yTopic::SmartRestRequest.into();
let health_check_topics = health_check_topics(PLUGIN_SERVICE_NAME);
Expand Down Expand Up @@ -426,13 +429,17 @@ impl FirmwareManager {
Ok(())
}

async fn create_mqtt_client(mqtt_port: u16) -> Result<Connection, MqttError> {
async fn create_mqtt_client(
mqtt_host: String,
mqtt_port: u16,
) -> Result<Connection, MqttError> {
let mut topic_filter = TopicFilter::new_unchecked(&C8yTopic::SmartRestRequest.to_string());
topic_filter.add_all(health_check_topics(PLUGIN_SERVICE_NAME));
topic_filter.add_all(TopicFilter::new_unchecked(FIRMWARE_UPDATE_RESPONSE_TOPICS));

let mqtt_config = mqtt_channel::Config::default()
.with_session_name(PLUGIN_SERVICE_NAME)
.with_host(mqtt_host)
.with_port(mqtt_port)
.with_subscriptions(topic_filter)
.with_last_will_message(health_status_down_message(PLUGIN_SERVICE_NAME));
Expand Down
3 changes: 3 additions & 0 deletions plugins/c8y_firmware_plugin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tedge_config::DeviceIdSetting;
use tedge_config::FirmwareChildUpdateTimeoutSetting;
use tedge_config::HttpBindAddressSetting;
use tedge_config::HttpPortSetting;
use tedge_config::MqttClientHostSetting;
use tedge_config::MqttClientPortSetting;
use tedge_config::TEdgeConfig;
use tedge_config::TmpPathSetting;
Expand Down Expand Up @@ -95,6 +96,7 @@ async fn main() -> Result<(), FirmwareManagementError> {
let tedge_config = config_repository.load()?;

let tedge_device_id = tedge_config.query(DeviceIdSetting)?;
let mqtt_host = tedge_config.query(MqttClientHostSetting)?;
let mqtt_port = tedge_config.query(MqttClientPortSetting)?.into();

let http_client = create_http_client(&tedge_config).await?;
Expand All @@ -113,6 +115,7 @@ async fn main() -> Result<(), FirmwareManagementError> {

let mut firmware_manager = FirmwareManager::new(
tedge_device_id,
mqtt_host,
mqtt_port,
http_client,
local_http_host,
Expand Down
1 change: 1 addition & 0 deletions plugins/c8y_firmware_plugin/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ async fn start_firmware_manager(
) -> anyhow::Result<()> {
let mut firmware_manager = FirmwareManager::new(
"tedge_device_id".to_string(),
"localhost".to_string(),
port,
Box::new(http_client),
mockito::server_address().to_string(),
Expand Down
3 changes: 3 additions & 0 deletions plugins/c8y_log_plugin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tedge_config::ConfigRepository;
use tedge_config::ConfigSettingAccessor;
use tedge_config::DeviceIdSetting;
use tedge_config::LogPathSetting;
use tedge_config::MqttClientHostSetting;
use tedge_config::MqttClientPortSetting;
use tedge_config::TEdgeConfig;
use tedge_config::DEFAULT_TEDGE_CONFIG_PATH;
Expand Down Expand Up @@ -80,6 +81,7 @@ pub struct LogfileRequestPluginOpt {
async fn create_mqtt_client(
tedge_config: &TEdgeConfig,
) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mqtt_host = tedge_config.query(MqttClientHostSetting)?;
let mqtt_port = tedge_config.query(MqttClientPortSetting)?.into();
let mut topics: TopicFilter = health_check_topics(C8Y_LOG_PLUGIN);

Expand All @@ -89,6 +91,7 @@ async fn create_mqtt_client(

let mqtt_config = mqtt_channel::Config::default()
.with_session_name(C8Y_LOG_PLUGIN)
.with_host(mqtt_host)
.with_port(mqtt_port)
.with_subscriptions(topics)
.with_last_will_message(health_status_down_message(C8Y_LOG_PLUGIN));
Expand Down

0 comments on commit adfcc49

Please sign in to comment.