Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

enable the supervisor to handle longer service outages #1098

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions src/agent/onefuzz-supervisor/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Agent {
worker_runner: Box<dyn IWorkerRunner>,
heartbeat: Option<AgentHeartbeatClient>,
previous_state: NodeState,
last_poll_command: Option<PollCommandResult>,
}

impl Agent {
Expand All @@ -39,6 +40,7 @@ impl Agent {
) -> Self {
let scheduler = Some(scheduler);
let previous_state = NodeState::Init;
let last_poll_command = None;

Self {
coordinator,
Expand All @@ -49,6 +51,7 @@ impl Agent {
worker_runner,
heartbeat,
previous_state,
last_poll_command,
}
}

Expand Down Expand Up @@ -277,13 +280,30 @@ impl Agent {
}

async fn execute_pending_commands(&mut self) -> Result<()> {
let cmd = self.coordinator.poll_commands().await?;
let result = self.coordinator.poll_commands().await?;

if let Some(cmd) = cmd {
info!("agent received node command: {:?}", cmd);
self.scheduler()?.execute_command(cmd).await?;
match &result {
PollCommandResult::None => {}
PollCommandResult::Command(cmd) => {
info!("agent received node command: {:?}", cmd);
self.scheduler()?.execute_command(cmd).await?;
}
PollCommandResult::RequestFailed(err) => {
error!("error polling the service for commands: {:?}", err);
}
PollCommandResult::ClaimFailed(err) => {
if matches!(
self.last_poll_command,
Some(PollCommandResult::ClaimFailed(..))
) {
bail!("repeated command claim attempt failures: {:?}", err);
}
error!("error polling the service for commands: {:?}", err);
}
}

self.last_poll_command = Some(result);

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> {
}

#[cfg(target_family = "unix")]
pub async fn add_ssh_key(key_info: SshKeyInfo) -> Result<()> {
pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> {
if get_scaleset_name().await?.is_none() {
warn!("adding ssh keys only supported on managed nodes");
return Ok(());
Expand Down
42 changes: 31 additions & 11 deletions src/agent/onefuzz-supervisor/src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use anyhow::{Context, Result};
use anyhow::{Context, Error, Result};
use downcast_rs::Downcast;
use onefuzz::{auth::AccessToken, http::ResponseExt, process::Output};
use reqwest::{Client, RequestBuilder, Response, StatusCode};
Expand Down Expand Up @@ -148,7 +148,7 @@ pub struct TaskInfo {

#[async_trait]
pub trait ICoordinator: Downcast {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>>;
async fn poll_commands(&mut self) -> Result<PollCommandResult>;

async fn emit_event(&mut self, event: NodeEvent) -> Result<()>;

Expand All @@ -159,7 +159,7 @@ impl_downcast!(ICoordinator);

#[async_trait]
impl ICoordinator for Coordinator {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
async fn poll_commands(&mut self) -> Result<PollCommandResult> {
self.poll_commands().await
}

Expand All @@ -172,6 +172,13 @@ impl ICoordinator for Coordinator {
}
}

pub enum PollCommandResult {
None,
Command(NodeCommand),
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
RequestFailed(Error),
ClaimFailed(Error),
}

pub struct Coordinator {
client: Client,
registration: Registration,
Expand All @@ -194,18 +201,25 @@ impl Coordinator {
///
/// If the request fails due to an expired access token, we will retry once
/// with a fresh one.
pub async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
pub async fn poll_commands(&mut self) -> Result<PollCommandResult> {
let request = PollCommandsRequest {
machine_id: self.registration.machine_id,
};

let url = self.registration.dynamic_config.commands_url.clone();
let request = self.client.get(url).json(&request);

let pending: PendingNodeCommand = self
.send_request(request)
.await
.context("PollCommands")?
let response = self.send_request(request).await.context("PollCommands");

// Treat communication issues with the service as `no commands
// available`. This adds resiliency to the supervisor during longer
// outages. Given poll_commands runs on a 10 second cycle, this should
// provide eventual recovery.
if let Err(response) = response {
return Ok(PollCommandResult::RequestFailed(response));
}

let pending: PendingNodeCommand = response?
.json()
.await
.context("parsing PollCommands response")?;
Expand All @@ -219,11 +233,17 @@ impl Coordinator {
let url = self.registration.dynamic_config.commands_url.clone();
let request = self.client.delete(url).json(&request);

self.send_request(request).await.context("ClaimCommand")?;
let response = self.send_request(request).await.context("ClaimCommand");

// similar polling available commands, this treats issues claiming
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
// commands as `no commands available`
if let Err(response) = response {
return Ok(PollCommandResult::ClaimFailed(response));
bmc-msft marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(Some(envelope.command))
Ok(PollCommandResult::Command(envelope.command))
} else {
Ok(None)
Ok(PollCommandResult::None)
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/agent/onefuzz-supervisor/src/coordinator/double.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ pub struct CoordinatorDouble {

#[async_trait]
impl ICoordinator for CoordinatorDouble {
async fn poll_commands(&mut self) -> Result<Option<NodeCommand>> {
Ok(self.commands.pop())
async fn poll_commands(&mut self) -> Result<PollCommandResult> {
match self.commands.pop() {
Some(x) => Ok(PollCommandResult::Command(x)),
None => Ok(PollCommandResult::None),
}
}

async fn emit_event(&mut self, event: NodeEvent) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-supervisor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl Scheduler {
Self::default()
}

pub async fn execute_command(&mut self, cmd: NodeCommand) -> Result<()> {
pub async fn execute_command(&mut self, cmd: &NodeCommand) -> Result<()> {
match cmd {
NodeCommand::AddSshKey(ssh_key_info) => {
add_ssh_key(ssh_key_info).await?;
Expand Down