Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Making machine identity a parameter of the agent config #2649

Merged
merged 4 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/agent/onefuzz-agent/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#![allow(clippy::too_many_arguments)]
use anyhow::{Error, Result};
use tokio::time;

Expand All @@ -26,6 +27,7 @@ pub struct Agent {
heartbeat: Option<AgentHeartbeatClient>,
previous_state: NodeState,
last_poll_command: Result<Option<NodeCommand>, PollCommandError>,
managed: bool,
}

impl Agent {
Expand All @@ -37,6 +39,7 @@ impl Agent {
work_queue: Box<dyn IWorkQueue>,
worker_runner: Box<dyn IWorkerRunner>,
heartbeat: Option<AgentHeartbeatClient>,
managed: bool,
) -> Self {
let scheduler = Some(scheduler);
let previous_state = NodeState::Init;
Expand All @@ -52,6 +55,7 @@ impl Agent {
heartbeat,
previous_state,
last_poll_command,
managed,
}
}

Expand Down Expand Up @@ -286,7 +290,8 @@ impl Agent {
Ok(None) => {}
Ok(Some(cmd)) => {
info!("agent received node command: {:?}", cmd);
self.scheduler()?.execute_command(cmd).await?;
let managed = self.managed;
self.scheduler()?.execute_command(cmd, managed).await?;
}
Err(PollCommandError::RequestFailed(err)) => {
// If we failed to request commands, this could be the service
Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl Fixture {
work_queue,
worker_runner,
None,
true,
)
}

Expand Down
12 changes: 1 addition & 11 deletions src/agent/onefuzz-agent/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.

use anyhow::{Context, Result};
use onefuzz::{auth::Secret, machine_id::get_scaleset_name};
use onefuzz::auth::Secret;
use std::process::Stdio;
use tokio::{fs, io::AsyncWriteExt, process::Command};

Expand Down Expand Up @@ -32,11 +32,6 @@ pub struct SshKeyInfo {

#[cfg(target_family = "windows")]
pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> {
if get_scaleset_name().await?.is_none() {
warn!("adding ssh keys only supported on managed nodes");
return Ok(());
}

let mut ssh_path =
PathBuf::from(env::var("ProgramData").unwrap_or_else(|_| "c:\\programdata".to_string()));
ssh_path.push("ssh");
Expand Down Expand Up @@ -160,11 +155,6 @@ pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> {

#[cfg(target_family = "unix")]
pub async fn add_ssh_key(key_info: &SshKeyInfo) -> Result<()> {
if get_scaleset_name().await?.is_none() {
warn!("adding ssh keys only supported on managed nodes");
return Ok(());
}

let user =
get_user_by_name(ONEFUZZ_SERVICE_USER).ok_or_else(|| format_err!("unable to find user"))?;
info!("adding ssh key:{:?} to user:{:?}", key_info, user);
Expand Down
32 changes: 22 additions & 10 deletions src/agent/onefuzz-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use onefuzz::{
auth::{ClientCredentials, Credentials, ManagedIdentityCredentials},
http::{is_auth_error_code, ResponseExt},
jitter::delay_with_jitter,
machine_id::MachineIdentity,
};
use onefuzz_telemetry::{InstanceTelemetryKey, MicrosoftTelemetryKey};
use reqwest_retry::SendRetry;
Expand Down Expand Up @@ -37,6 +38,8 @@ pub struct StaticConfig {

#[serde(default = "default_as_true")]
pub managed: bool,

pub machine_identity: MachineIdentity,
}

fn default_as_true() -> bool {
Expand Down Expand Up @@ -64,10 +67,12 @@ struct RawStaticConfig {

#[serde(default = "default_as_true")]
pub managed: bool,

pub machine_identity: Option<MachineIdentity>,
}

impl StaticConfig {
pub fn new(data: &[u8]) -> Result<Self> {
pub async fn new(data: &[u8]) -> Result<Self> {
let config: RawStaticConfig = serde_json::from_slice(data)?;

let credentials = match config.client_credentials {
Expand All @@ -84,6 +89,11 @@ impl StaticConfig {
managed.into()
}
};
let machine_identity = match config.machine_identity {
Some(machine_identity) => machine_identity,
None => MachineIdentity::from_metadata().await?,
};

let config = StaticConfig {
credentials,
pool_name: config.pool_name,
Expand All @@ -94,16 +104,17 @@ impl StaticConfig {
heartbeat_queue: config.heartbeat_queue,
instance_id: config.instance_id,
managed: config.managed,
machine_identity,
};

Ok(config)
}

pub fn from_file(config_path: impl AsRef<Path>) -> Result<Self> {
pub async fn from_file(config_path: impl AsRef<Path>) -> Result<Self> {
let config_path = config_path.as_ref();
let data = std::fs::read(config_path)
.with_context(|| format!("unable to read config file: {}", config_path.display()))?;
Self::new(&data)
Self::new(&data).await
}

pub fn from_env() -> Result<Self> {
Expand All @@ -115,6 +126,7 @@ impl StaticConfig {
let onefuzz_url = Url::parse(&std::env::var("ONEFUZZ_URL")?)?;
let pool_name = std::env::var("ONEFUZZ_POOL")?;
let is_unmanaged = std::env::var("ONEFUZZ_IS_UNMANAGED").is_ok();
let machine_identity = MachineIdentity::from_env()?;

let heartbeat_queue = if let Ok(key) = std::env::var("ONEFUZZ_HEARTBEAT") {
Some(Url::parse(&key)?)
Expand Down Expand Up @@ -155,6 +167,7 @@ impl StaticConfig {
heartbeat_queue,
instance_id,
managed: !is_unmanaged,
machine_identity,
})
}

Expand Down Expand Up @@ -218,22 +231,21 @@ const REGISTRATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
impl Registration {
pub async fn create(config: StaticConfig, managed: bool, timeout: Duration) -> Result<Self> {
let token = config.credentials.access_token().await?;
let machine_name = onefuzz::machine_id::get_machine_name().await?;
let machine_id = onefuzz::machine_id::get_machine_id().await?;
let machine_name = &config.machine_identity.machine_name;
let machine_id = config.machine_identity.machine_id;

let mut url = config.register_url();
url.query_pairs_mut()
.append_pair("machine_id", &machine_id.to_string())
.append_pair("machine_name", &machine_name)
.append_pair("machine_name", machine_name)
.append_pair("pool_name", &config.pool_name)
.append_pair("version", env!("ONEFUZZ_VERSION"))
.append_pair("os", std::env::consts::OS);

if managed {
let scaleset = onefuzz::machine_id::get_scaleset_name().await?;
match scaleset {
match &config.machine_identity.scaleset_name {
Some(scaleset) => {
url.query_pairs_mut().append_pair("scaleset_id", &scaleset);
url.query_pairs_mut().append_pair("scaleset_id", scaleset);
}
None => {
anyhow::bail!("managed instance without scaleset name");
Expand Down Expand Up @@ -284,7 +296,7 @@ impl Registration {

pub async fn load_existing(config: StaticConfig) -> Result<Self> {
let dynamic_config = DynamicConfig::load().await?;
let machine_id = onefuzz::machine_id::get_machine_id().await?;
let machine_id = config.machine_identity.machine_id;
let mut registration = Self {
config,
dynamic_config,
Expand Down
9 changes: 5 additions & 4 deletions src/agent/onefuzz-agent/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

use crate::onefuzz::heartbeat::HeartbeatClient;
use crate::onefuzz::machine_id::{get_machine_id, get_machine_name};
use anyhow::Result;
use reqwest::Url;
use serde::{self, Deserialize, Serialize};
Expand All @@ -29,9 +28,11 @@ pub struct AgentContext {

pub type AgentHeartbeatClient = HeartbeatClient<AgentContext, HeartbeatData>;

pub async fn init_agent_heartbeat(queue_url: Url) -> Result<AgentHeartbeatClient> {
let node_id = get_machine_id().await?;
let machine_name = get_machine_name().await?;
pub async fn init_agent_heartbeat(
queue_url: Url,
node_id: Uuid,
machine_name: String,
) -> Result<AgentHeartbeatClient> {
let hb = HeartbeatClient::init_heartbeat(
AgentContext {
node_id,
Expand Down
23 changes: 14 additions & 9 deletions src/agent/onefuzz-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use std::process::{Command, Stdio};

use anyhow::{Context, Result};
use clap::Parser;
use onefuzz::{
machine_id::{get_machine_id, get_scaleset_name},
process::ExitStatus,
};
use onefuzz::process::ExitStatus;
use onefuzz_telemetry::{self as telemetry, EventData, Role};
use std::io::{self, Write};
use uuid::Uuid;
Expand Down Expand Up @@ -202,7 +199,7 @@ async fn load_config(opt: RunOpt) -> Result<StaticConfig> {
info!("loading supervisor agent config");

let config = match &opt.config_path {
Some(config_path) => StaticConfig::from_file(config_path)?,
Some(config_path) => StaticConfig::from_file(config_path).await?,
None => StaticConfig::from_env()?,
};

Expand Down Expand Up @@ -266,11 +263,11 @@ async fn check_existing_worksets(coordinator: &mut coordinator::Coordinator) ->

async fn run_agent(config: StaticConfig) -> Result<()> {
telemetry::set_property(EventData::InstanceId(config.instance_id));
telemetry::set_property(EventData::MachineId(get_machine_id().await?));
telemetry::set_property(EventData::MachineId(config.machine_identity.machine_id));
telemetry::set_property(EventData::Version(env!("ONEFUZZ_VERSION").to_string()));
telemetry::set_property(EventData::Role(Role::Supervisor));
let scaleset = get_scaleset_name().await?;
if let Some(scaleset_name) = &scaleset {

if let Some(scaleset_name) = &config.machine_identity.scaleset_name {
telemetry::set_property(EventData::ScalesetId(scaleset_name.to_string()));
}

Expand Down Expand Up @@ -300,7 +297,14 @@ async fn run_agent(config: StaticConfig) -> Result<()> {
let work_queue = work::WorkQueue::new(registration.clone())?;

let agent_heartbeat = match config.heartbeat_queue {
Some(url) => Some(init_agent_heartbeat(url).await?),
Some(url) => Some(
init_agent_heartbeat(
url,
config.machine_identity.machine_id,
config.machine_identity.machine_name,
)
.await?,
),
None => None,
};
let mut agent = agent::Agent::new(
Expand All @@ -311,6 +315,7 @@ async fn run_agent(config: StaticConfig) -> Result<()> {
Box::new(work_queue),
Box::new(worker::WorkerRunner),
agent_heartbeat,
config.managed,
);

info!("running agent");
Expand Down
8 changes: 6 additions & 2 deletions src/agent/onefuzz-agent/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ impl Scheduler {
Self::default()
}

pub async fn execute_command(&mut self, cmd: &NodeCommand) -> Result<()> {
pub async fn execute_command(&mut self, cmd: &NodeCommand, managed: bool) -> Result<()> {
match cmd {
NodeCommand::AddSshKey(ssh_key_info) => {
add_ssh_key(ssh_key_info).await?;
if managed {
add_ssh_key(ssh_key_info).await?;
} else {
warn!("adding ssh keys only supported on managed nodes");
}
}
NodeCommand::StopTask(stop_task) => {
if let Scheduler::Busy(state) = self {
Expand Down
19 changes: 17 additions & 2 deletions src/agent/onefuzz-agent/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use std::{
collections::HashMap,
path::{Path, PathBuf},
process::{Child, ChildStderr, ChildStdout, Command, Stdio},
thread::{self, JoinHandle},
};

use anyhow::{format_err, Context as AnyhowContext, Result};
use downcast_rs::Downcast;
use onefuzz::process::{ExitStatus, Output};
use onefuzz::{
machine_id::MachineIdentity,
process::{ExitStatus, Output},
};
use tokio::fs;

use crate::buffer::TailBuffer;
use crate::work::*;

use serde_json::Value;

// Max length of captured output streams from worker child processes.
const MAX_TAIL_LEN: usize = 40960;

Expand Down Expand Up @@ -209,9 +215,18 @@ impl IWorkerRunner for WorkerRunner {

debug!("created worker working dir: {}", working_dir.display());

// inject the machine_identity in the config file
let work_config = work.config.expose_ref();
let mut config: HashMap<String, Value> = serde_json::from_str(work_config.as_str())?;

config.insert(
"machine_identity".to_string(),
serde_json::to_value(MachineIdentity::default())?,
);

let config_path = work.config_path()?;

fs::write(&config_path, work.config.expose_ref())
fs::write(&config_path, serde_json::to_string(&config)?.as_bytes())
.await
.with_context(|| format!("unable to save task config: {}", config_path.display()))?;

Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz-task/src/local/libfuzzer_test_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEven
check_retry_count,
setup_dir: &context.common_config.setup_dir,
minimized_stack_depth: None,
machine_identity: context.common_config.machine_identity,
};

let result = test_input(config).await?;
Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz-task/src/local/test_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn run(args: &clap::ArgMatches<'_>, event_sender: Option<Sender<UiEven
minimized_stack_depth: None,
check_asan_log,
check_debugger,
machine_identity: context.common_config.machine_identity.clone(),
};

let result = test_input(config).await?;
Expand Down
3 changes: 1 addition & 2 deletions src/agent/onefuzz-task/src/managed/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::path::PathBuf;

use anyhow::Result;
use clap::{App, Arg, SubCommand};
use onefuzz::machine_id::get_machine_id;
use std::time::Duration;

use crate::tasks::{
Expand All @@ -28,7 +27,7 @@ pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let check_oom = out_of_memory(min_available_memory_bytes);

let common = config.common().clone();
let machine_id = get_machine_id().await?;
let machine_id = common.machine_identity.machine_id;
let task_logger = if let Some(logs) = common.logs.clone() {
let rx = onefuzz_telemetry::subscribe_to_events()?;

Expand Down
2 changes: 1 addition & 1 deletion src/agent/onefuzz-task/src/tasks/analysis/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub async fn run_tool(
let target_exe =
try_resolve_setup_relative_path(&config.common.setup_dir, &config.target_exe).await?;

let expand = Expand::new()
let expand = Expand::new(&config.common.machine_identity)
.machine_id()
.await?
.input_path(&input)
Expand Down
Loading