Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Update action-client (#2356)
Browse files Browse the repository at this point in the history
Update action-client once again to optionally take a uuid that can be
used to bookkeep the request in flight.

In addition, remove `invoke_rust_agent_cancelable` as it adds an extra
layer of indirection that is more useful in tools calling the client.

Replace it with a `cancel_request` fn that will take a `Uuid` ref and
attempt to cancel a remote rpc matching the uuid.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
jgrund authored Oct 30, 2020
1 parent 0e31f13 commit 2c6fc50
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 95 deletions.
135 changes: 41 additions & 94 deletions iml-action-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@
// license that can be found in the LICENSE file.

use bytes::buf::BufExt as _;
use futures::{
future::{abortable, AbortHandle, Aborted},
Future,
};
use hyper::{client::HttpConnector, Body, Request};
use hyperlocal::{UnixClientExt as _, UnixConnector};
use iml_manager_env::{get_action_runner_http, get_action_runner_uds, running_in_docker};
use iml_wire_types::{Action, ActionId, ActionName, ActionType, Fqdn};
use iml_wire_types::{Action, ActionId, ActionName, ActionType, AgentResult, Fqdn};
use std::{ops::Deref, sync::Arc};
use thiserror::Error;
use uuid::Uuid;
Expand All @@ -21,8 +17,6 @@ pub enum ImlActionClientError {
HyperError(#[from] hyper::Error),
#[error(transparent)]
UriError(hyper::http::uri::InvalidUri),
#[error("Request Cancelled")]
CancelledRequest,
#[error(transparent)]
HttpError(#[from] hyper::http::Error),
#[error(transparent)]
Expand Down Expand Up @@ -80,111 +74,64 @@ impl Default for Client {
}

impl Client {
/// Invoke the given action plugin on the given `host`
///
/// *Note*: There is no way to cancel this fn, use `invoke_rust_agent_cancelable`
/// If you need to cancel.
pub async fn invoke_rust_agent(
&self,
host: impl Into<Fqdn>,
command: impl Into<ActionName> + Send,
action: impl Into<ActionName> + Send,
args: impl serde::Serialize + Send,
uuid: impl Into<Option<&Uuid>>,
) -> Result<serde_json::Value, ImlActionClientError> {
let request_id = Uuid::new_v4().to_hyphenated().to_string();

build_invoke_rust_agent(
Arc::clone(&self.inner),
Arc::clone(&self.uri),
host,
command,
args,
request_id,
)
.await
let id = match uuid.into() {
Some(x) => x.to_hyphenated().to_string(),
None => Uuid::new_v4().to_hyphenated().to_string(),
};

let action = Action::ActionStart {
action: action.into(),
args: serde_json::json!(args),
id: ActionId(id),
};

build_invoke_rust_agent(Arc::clone(&self.inner), Arc::clone(&self.uri), host, action).await
}
/// Invoke the given action plugin on the given `host`.
///
/// Returns an `AbortHandle`. When aborted, the
/// action plugin is cancelled.
pub fn invoke_rust_agent_cancelable(
pub async fn invoke_rust_agent_expect_result(
&self,
host: impl Into<Fqdn>,
action: impl Into<ActionName> + Send,
args: impl serde::Serialize + Send,
uuid: impl Into<Option<&Uuid>>,
) -> Result<AgentResult, ImlActionClientError> {
let x = self.invoke_rust_agent(host, action, args, uuid).await?;
let x = serde_json::from_value(x)?;

Ok(x)
}
pub async fn cancel_request(
&self,
host: impl Into<Fqdn> + Clone,
command: impl Into<ActionName>,
args: impl serde::Serialize,
) -> Result<
(
AbortHandle,
impl Future<Output = Result<serde_json::Value, ImlActionClientError>>,
),
ImlActionClientError,
> {
let request_id = Uuid::new_v4().to_hyphenated().to_string();

let host2 = host.clone();
let request_id2 = request_id.clone();

let inner = Arc::clone(&self.inner);

let uri = Arc::clone(&self.uri);

let post = build_invoke_rust_agent(
Arc::clone(&self.inner),
Arc::clone(&self.uri),
host,
command,
args,
request_id,
);

let (fut, handle) = abortable(post);

let fut = async move {
let x = fut.await;

match x {
Ok(x) => x,
Err(Aborted) => {
let cancel = ActionType::Remote((
host2.into(),
Action::ActionCancel {
id: ActionId(request_id2),
},
));

let req = Request::builder()
.method("POST")
.uri(uri.deref())
.body(Body::from(serde_json::to_string(&cancel)?));

if let Ok(req) = req {
let _rc = inner.request(req).await;
}

Err(ImlActionClientError::CancelledRequest)
}
}
uuid: &Uuid,
) -> Result<(), ImlActionClientError> {
let action = Action::ActionCancel {
id: ActionId(uuid.to_hyphenated().to_string()),
};

Ok((handle, fut))
let x =
build_invoke_rust_agent(Arc::clone(&self.inner), Arc::clone(&self.uri), host, action)
.await?;

tracing::info!("Cancelled request: {}. Resp: {:?}", uuid, x);

Ok(())
}
}

async fn build_invoke_rust_agent(
client: Arc<ClientInner>,
uri: Arc<hyper::Uri>,
host: impl Into<Fqdn>,
command: impl Into<ActionName>,
args: impl serde::Serialize,
request_id: String,
action: Action,
) -> Result<serde_json::Value, ImlActionClientError> {
let action = ActionType::Remote((
host.into(),
Action::ActionStart {
action: command.into(),
args: serde_json::json!(args),
id: ActionId(request_id.clone()),
},
));
let action = ActionType::Remote((host.into(), action));

let req = Request::builder()
.method(hyper::Method::POST)
Expand Down
5 changes: 4 additions & 1 deletion iml-task-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ async fn send_work(
// send fids to actions runner
// action names on Agents are "action.ACTION_NAME"
for action in task.actions.iter().map(|a| format!("action.{}", a)) {
match action_client.invoke_rust_agent(fqdn, &action, &args).await {
match action_client
.invoke_rust_agent(fqdn, &action, &args, None)
.await
{
Err(e) => {
tracing::info!("Failed to send {} to {}: {:?}", &action, fqdn, e);

Expand Down

0 comments on commit 2c6fc50

Please sign in to comment.