Skip to content

Commit

Permalink
Add skip_status_update option to custom operation handler
Browse files Browse the repository at this point in the history
If this key is set to `true` (default `false`), c8y-mapper will execute
the command, but it'll not send operation status update messages to c8y

Signed-off-by: Rina Fujino <rina.fujino.23@gmail.com>
  • Loading branch information
rina23q committed Oct 2, 2024
1 parent b5d2296 commit af45393
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 50 deletions.
6 changes: 6 additions & 0 deletions crates/core/c8y_api/src/smartrest/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct OnMessageExec {
on_fragment: Option<String>,
topic: Option<String>,
user: Option<String>,
#[serde(default)]
skip_status_update: bool,
#[serde(default, deserialize_with = "to_result_format")]
result_format: ResultFormat,
#[serde(rename = "timeout")]
Expand Down Expand Up @@ -104,6 +106,10 @@ impl Operation {
self.exec().and_then(|exec| exec.on_fragment.clone())
}

pub fn skip_status_update(&self) -> bool {
self.exec().unwrap().skip_status_update
}

pub fn result_format(&self) -> ResultFormat {
self.exec()
.map(|exec| exec.result_format.clone())
Expand Down
112 changes: 62 additions & 50 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ impl CumulocityConverter {
custom_handler.forceful_timeout(),
custom_handler.name.clone(),
Some(operation_id.into()),
custom_handler.skip_status_update(),
)
.await?;

Expand Down Expand Up @@ -920,8 +921,9 @@ impl CumulocityConverter {
operation.result_format(),
operation.graceful_timeout(),
operation.forceful_timeout(),
operation.name,
operation.name.clone(),
None,
operation.skip_status_update(),
)
.await?;
}
Expand All @@ -940,6 +942,7 @@ impl CumulocityConverter {
forceful_timeout: Duration,
operation_name: String,
operation_id: Option<String>,
skip_status_update: bool,
) -> Result<(), CumulocityMapperError> {
let command = command.to_owned();
let payload = payload.to_string();
Expand Down Expand Up @@ -984,21 +987,23 @@ impl CumulocityConverter {

tokio::spawn(async move {
let op_name = op_name.as_str();

// mqtt client publishes executing
let topic = C8yTopic::SmartRestResponse.to_topic(&c8y_prefix).unwrap();
let executing_str = if use_id {
set_operation_executing_with_id(&op_id)
} else {
set_operation_executing_with_name(op_name)
};

mqtt_publisher
.send(MqttMessage::new(&topic, executing_str.as_str()))
.await
.unwrap_or_else(|err| {
error!("Failed to publish a message: {executing_str}. Error: {err}")
});
if !skip_status_update {
// mqtt client publishes executing
let executing_str = if use_id {
set_operation_executing_with_id(&op_id)
} else {
set_operation_executing_with_name(op_name)
};

mqtt_publisher
.send(MqttMessage::new(&topic, executing_str.as_str()))
.await
.unwrap_or_else(|err| {
error!("Failed to publish a message: {executing_str}. Error: {err}")
});
}

// execute the command and wait until it finishes
// mqtt client publishes failed or successful depending on the exit code
Expand All @@ -1020,48 +1025,53 @@ impl CumulocityConverter {
ResultFormat::Text => TextOrCsv::Text(sanitized_stdout),
ResultFormat::Csv => EmbeddedCsv::new(sanitized_stdout).into(),
};
let success_message = if use_id {
succeed_operation_with_id(&op_id, result)
} else {
succeed_operation_with_name(op_name, result)
};
match success_message {
Ok(message) => mqtt_publisher.send(MqttMessage::new(&topic, message.as_str())).await
.unwrap_or_else(|err| {
error!("Failed to publish a message: {message}. Error: {err}")
}),
Err(e) => {
let reason = format!("{:?}", anyhow::Error::from(e).context("Custom operation process exited successfully, but couldn't convert output to valid SmartREST message"));
let fail_message = if use_id {
fail_operation_with_id(&op_id, &reason)
} else {
fail_operation_with_name(op_name, &reason)
};
mqtt_publisher.send(MqttMessage::new(&topic, fail_message.as_str())).await.unwrap_or_else(|err| {
error!("Failed to publish a message: {fail_message}. Error: {err}")
})

if !skip_status_update {
let success_message = if use_id {
succeed_operation_with_id(&op_id, result)
} else {
succeed_operation_with_name(op_name, result)
};
match success_message {
Ok(message) => mqtt_publisher.send(MqttMessage::new(&topic, message.as_str())).await
.unwrap_or_else(|err| {
error!("Failed to publish a message: {message}. Error: {err}")
}),
Err(e) => {
let reason = format!("{:?}", anyhow::Error::from(e).context("Custom operation process exited successfully, but couldn't convert output to valid SmartREST message"));
let fail_message = if use_id {
fail_operation_with_id(&op_id, &reason)
} else {
fail_operation_with_name(op_name, &reason)
};
mqtt_publisher.send(MqttMessage::new(&topic, fail_message.as_str())).await.unwrap_or_else(|err| {
error!("Failed to publish a message: {fail_message}. Error: {err}")
})
}
}
}
}
_ => {
let failure_reason = get_failure_reason_for_smartrest(
&output.stderr,
MAX_PAYLOAD_LIMIT_IN_BYTES,
);
let payload = if use_id {
fail_operation_with_id(&op_id, &failure_reason)
} else {
fail_operation_with_name(op_name, &failure_reason)
};

mqtt_publisher
.send(MqttMessage::new(&topic, payload.as_bytes()))
.await
.unwrap_or_else(|err| {
error!(
if !skip_status_update {
let failure_reason = get_failure_reason_for_smartrest(
&output.stderr,
MAX_PAYLOAD_LIMIT_IN_BYTES,
);
let payload = if use_id {
fail_operation_with_id(&op_id, &failure_reason)
} else {
fail_operation_with_name(op_name, &failure_reason)
};

mqtt_publisher
.send(MqttMessage::new(&topic, payload.as_bytes()))
.await
.unwrap_or_else(|err| {
error!(
"Failed to publish a message: {payload}. Error: {err}"
)
})
})
}
}
}
}
Expand Down Expand Up @@ -2664,6 +2674,7 @@ pub(crate) mod tests {
tokio::time::Duration::from_secs(1),
"sleep_ten".to_owned(),
None,
false,
)
.await
.unwrap();
Expand All @@ -2676,6 +2687,7 @@ pub(crate) mod tests {
tokio::time::Duration::from_secs(1),
"sleep_twenty".to_owned(),
None,
false,
)
.await
.unwrap();
Expand Down

0 comments on commit af45393

Please sign in to comment.