Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
enable the supervisor to handle longer service outages (#1098)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmc-msft authored Jul 21, 2021
1 parent 75d7120 commit d50c5e0
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 15 deletions.
38 changes: 34 additions & 4 deletions src/agent/onefuzz-supervisor/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Agent {
worker_runner: Box<dyn IWorkerRunner>,
heartbeat: Option<AgentHeartbeatClient>,
previous_state: NodeState,
last_poll_command: Result<Option<NodeCommand>, PollCommandError>,
}

impl Agent {
Expand All @@ -39,6 +40,7 @@ impl Agent {
) -> Self {
let scheduler = Some(scheduler);
let previous_state = NodeState::Init;
let last_poll_command = Ok(None);

Self {
coordinator,
Expand All @@ -49,6 +51,7 @@ impl Agent {
worker_runner,
heartbeat,
previous_state,
last_poll_command,
}
}

Expand Down Expand Up @@ -277,13 +280,40 @@ impl Agent {
}

async fn execute_pending_commands(&mut self) -> Result<()> {
let cmd = self.coordinator.poll_commands().await?;
let result = self.coordinator.poll_commands().await;

if let Some(cmd) = cmd {
info!("agent received node command: {:?}", cmd);
self.scheduler()?.execute_command(cmd).await?;
match &result {
Ok(None) => {}
Ok(Some(cmd)) => {
info!("agent received node command: {:?}", cmd);
self.scheduler()?.execute_command(&cmd).await?;
}
Err(PollCommandError::RequestFailed(err)) => {
// If we failed to request commands, this could be the service
// could be down. Log it, but keep going.
error!("error polling the service for commands: {:?}", err);
}
Err(PollCommandError::RequestParseFailed(err)) => {
bail!("poll commands failed: {:?}", err);
}
Err(PollCommandError::ClaimFailed(err)) => {
// If we failed to claim two commands in a row, it means the
// service is up (since we received the commands we're trying to
// claim), but something else is going wrong, consistently. This
// is suspicious, and less likely to be a transient service or
// networking error, so bail.
if matches!(
self.last_poll_command,
Err(PollCommandError::ClaimFailed(..))
) {
bail!("repeated command claim attempt failures: {:?}", err);
}
error!("error claiming command from the service: {:?}", err);
}
}

self.last_poll_command = result;

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-supervisor/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct SshKeyInfo {
}

#[cfg(target_family = "windows")]
pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> {
pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> {
if get_scaleset_name().await?.is_none() {
warn!("adding ssh keys only supported on managed nodes");
return Ok(());
Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> {
}

#[cfg(target_family = "unix")]
pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> {
pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> {
if get_scaleset_name().await?.is_none() {
warn!("adding ssh keys only supported on managed nodes");
return Ok(());
Expand Down
25 changes: 18 additions & 7 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use anyhow::{Context, Result};
use anyhow::{Context, Error, Result};
use downcast_rs::Downcast;
use onefuzz::{auth::AccessToken, http::ResponseExt, process::Output};
use reqwest::{Client, RequestBuilder, Response, StatusCode};
Expand Down Expand Up @@ -148,7 +148,7 @@ pub struct TaskInfo {

#[async_trait]
pub trait ICoordinator: Downcast {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>>;
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>, PollCommandError>;

async fn emit_event(&mut self, event: NodeEvent) -> Result<()>;

Expand All @@ -159,7 +159,7 @@ impl_downcast!(ICoordinator);

#[async_trait]
impl ICoordinator for Coordinator {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>, PollCommandError> {
self.poll_commands().await
}

Expand All @@ -172,6 +172,12 @@ impl ICoordinator for Coordinator {
}
}

pub enum PollCommandError {
RequestFailed(Error),
RequestParseFailed(Error),
ClaimFailed(Error),
}

pub struct Coordinator {
client: Client,
registration: Registration,
Expand All @@ -194,7 +200,7 @@ impl Coordinator {
///
/// If the request fails due to an expired access token, we will retry once
/// with a fresh one.
pub async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
pub async fn poll_commands(&mut self) -> Result<Option<NodeCommand>, PollCommandError> {
let request = PollCommandsRequest {
machine_id: self.registration.machine_id,
};
Expand All @@ -205,10 +211,12 @@ impl Coordinator {
let pending: PendingNodeCommand = self
.send_request(request)
.await
.context("PollCommands")?
.context("PollCommands")
.map_err(PollCommandError::RequestFailed)?
.json()
.await
.context("parsing PollCommands response")?;
.context("parsing PollCommands response")
.map_err(PollCommandError::RequestParseFailed)?;

if let Some(envelope) = pending.envelope {
let request = ClaimNodeCommandRequest {
Expand All @@ -219,7 +227,10 @@ impl Coordinator {
let url = self.registration.dynamic_config.commands_url.clone();
let request = self.client.delete(url).json(&request);

self.send_request(request).await.context("ClaimCommand")?;
self.send_request(request)
.await
.context("ClaimCommand")
.map_err(PollCommandError::ClaimFailed)?;

Ok(Some(envelope.command))
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/coordinator/double.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct CoordinatorDouble {

#[async_trait]
impl ICoordinator for CoordinatorDouble {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>, PollCommandError> {
Ok(self.commands.pop())
}

Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Scheduler {
Self::default()
}

pub async fn execute_command(&mut self, cmd: NodeCommand) -> Result<()> {
pub async fn execute_command(&mut self, cmd: &NodeCommand) -> Result<()> {
match cmd {
NodeCommand::AddSshKey(ssh_key_info) => {
add_ssh_key(ssh_key_info).await?;
Expand Down

0 comments on commit d50c5e0

Please sign in to comment.