From d50c5e0d09f715b118f0d55e11942ad293463c69 Mon Sep 17 00:00:00 2001 From: bmc-msft <41130664+bmc-msft@users.noreply.github.com> Date: Wed, 21 Jul 2021 17:03:06 -0400 Subject: [PATCH] enable the supervisor to handle longer service outages (#1098) --- src/agent/onefuzz-supervisor/src/agent.rs | 38 +++++++++++++++++-- src/agent/onefuzz-supervisor/src/commands.rs | 4 +- .../onefuzz-supervisor/src/coordinator.rs | 25 ++++++++---- .../src/coordinator/double.rs | 2 +- src/agent/onefuzz-supervisor/src/scheduler.rs | 2 +- 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index 9239132cba..7bc3de4410 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -25,6 +25,7 @@ pub struct Agent { worker_runner: Box, heartbeat: Option, previous_state: NodeState, + last_poll_command: Result, PollCommandError>, } impl Agent { @@ -39,6 +40,7 @@ impl Agent { ) -> Self { let scheduler = Some(scheduler); let previous_state = NodeState::Init; + let last_poll_command = Ok(None); Self { coordinator, @@ -49,6 +51,7 @@ impl Agent { worker_runner, heartbeat, previous_state, + last_poll_command, } } @@ -277,13 +280,40 @@ impl Agent { } async fn execute_pending_commands(&mut self) -> Result<()> { - let cmd = self.coordinator.poll_commands().await?; + let result = self.coordinator.poll_commands().await; - if let Some(cmd) = cmd { - info!("agent received node command: {:?}", cmd); - self.scheduler()?.execute_command(cmd).await?; + match &result { + Ok(None) => {} + Ok(Some(cmd)) => { + info!("agent received node command: {:?}", cmd); + self.scheduler()?.execute_command(&cmd).await?; + } + Err(PollCommandError::RequestFailed(err)) => { + // If we failed to request commands, this could be the service + // could be down. Log it, but keep going. + error!("error polling the service for commands: {:?}", err); + } + Err(PollCommandError::RequestParseFailed(err)) => { + bail!("poll commands failed: {:?}", err); + } + Err(PollCommandError::ClaimFailed(err)) => { + // If we failed to claim two commands in a row, it means the + // service is up (since we received the commands we're trying to + // claim), but something else is going wrong, consistently. This + // is suspicious, and less likely to be a transient service or + // networking error, so bail. + if matches!( + self.last_poll_command, + Err(PollCommandError::ClaimFailed(..)) + ) { + bail!("repeated command claim attempt failures: {:?}", err); + } + error!("error claiming command from the service: {:?}", err); + } } + self.last_poll_command = result; + Ok(()) } diff --git a/src/agent/onefuzz-supervisor/src/commands.rs b/src/agent/onefuzz-supervisor/src/commands.rs index 81e992a614..049ccc595d 100644 --- a/src/agent/onefuzz-supervisor/src/commands.rs +++ b/src/agent/onefuzz-supervisor/src/commands.rs @@ -31,7 +31,7 @@ pub struct SshKeyInfo { } #[cfg(target_family = "windows")] -pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> { +pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> { if get_scaleset_name().await?.is_none() { warn!("adding ssh keys only supported on managed nodes"); return Ok(()); @@ -159,7 +159,7 @@ pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> { } #[cfg(target_family = "unix")] -pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> { +pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> { if get_scaleset_name().await?.is_none() { warn!("adding ssh keys only supported on managed nodes"); return Ok(()); diff --git a/src/agent/onefuzz-supervisor/src/coordinator.rs b/src/agent/onefuzz-supervisor/src/coordinator.rs index 08f99ca55c..278f9d84c8 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator.rs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use anyhow::{Context, Result}; +use anyhow::{Context, Error, Result}; use downcast_rs::Downcast; use onefuzz::{auth::AccessToken, http::ResponseExt, process::Output}; use reqwest::{Client, RequestBuilder, Response, StatusCode}; @@ -148,7 +148,7 @@ pub struct TaskInfo { #[async_trait] pub trait ICoordinator: Downcast { - async fn poll_commands(&mut self) -> Result>; + async fn poll_commands(&mut self) -> Result, PollCommandError>; async fn emit_event(&mut self, event: NodeEvent) -> Result<()>; @@ -159,7 +159,7 @@ impl_downcast!(ICoordinator); #[async_trait] impl ICoordinator for Coordinator { - async fn poll_commands(&mut self) -> Result> { + async fn poll_commands(&mut self) -> Result, PollCommandError> { self.poll_commands().await } @@ -172,6 +172,12 @@ impl ICoordinator for Coordinator { } } +pub enum PollCommandError { + RequestFailed(Error), + RequestParseFailed(Error), + ClaimFailed(Error), +} + pub struct Coordinator { client: Client, registration: Registration, @@ -194,7 +200,7 @@ impl Coordinator { /// /// If the request fails due to an expired access token, we will retry once /// with a fresh one. - pub async fn poll_commands(&mut self) -> Result> { + pub async fn poll_commands(&mut self) -> Result, PollCommandError> { let request = PollCommandsRequest { machine_id: self.registration.machine_id, }; @@ -205,10 +211,12 @@ impl Coordinator { let pending: PendingNodeCommand = self .send_request(request) .await - .context("PollCommands")? + .context("PollCommands") + .map_err(PollCommandError::RequestFailed)? .json() .await - .context("parsing PollCommands response")?; + .context("parsing PollCommands response") + .map_err(PollCommandError::RequestParseFailed)?; if let Some(envelope) = pending.envelope { let request = ClaimNodeCommandRequest { @@ -219,7 +227,10 @@ impl Coordinator { let url = self.registration.dynamic_config.commands_url.clone(); let request = self.client.delete(url).json(&request); - self.send_request(request).await.context("ClaimCommand")?; + self.send_request(request) + .await + .context("ClaimCommand") + .map_err(PollCommandError::ClaimFailed)?; Ok(Some(envelope.command)) } else { diff --git a/src/agent/onefuzz-supervisor/src/coordinator/double.rs b/src/agent/onefuzz-supervisor/src/coordinator/double.rs index 7d723b2d92..9e4ac9f644 100644 --- a/src/agent/onefuzz-supervisor/src/coordinator/double.rs +++ b/src/agent/onefuzz-supervisor/src/coordinator/double.rs @@ -11,7 +11,7 @@ pub struct CoordinatorDouble { #[async_trait] impl ICoordinator for CoordinatorDouble { - async fn poll_commands(&mut self) -> Result> { + async fn poll_commands(&mut self) -> Result, PollCommandError> { Ok(self.commands.pop()) } diff --git a/src/agent/onefuzz-supervisor/src/scheduler.rs b/src/agent/onefuzz-supervisor/src/scheduler.rs index a3824410d9..77bf38936a 100644 --- a/src/agent/onefuzz-supervisor/src/scheduler.rs +++ b/src/agent/onefuzz-supervisor/src/scheduler.rs @@ -54,7 +54,7 @@ impl Scheduler { Self::default() } - pub async fn execute_command(&mut self, cmd: NodeCommand) -> Result<()> { + pub async fn execute_command(&mut self, cmd: &NodeCommand) -> Result<()> { match cmd { NodeCommand::AddSshKey(ssh_key_info) => { add_ssh_key(ssh_key_info).await?;