diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index f777cd1f02..71a37bb38c 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "RustyXML" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" + [[package]] name = "addr2line" version = "0.17.0" @@ -125,6 +131,84 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "azure_core" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c61455ab776eedabfc7e166dda27c6c6bc2a882c043c35817501f1bd7440158" +dependencies = [ + "async-trait", + "base64", + "bytes 1.1.0", + "chrono", + "dyn-clone", + "futures", + "getrandom 0.2.3", + "http", + "log", + "oauth2", + "rand 0.8.4", + "reqwest", + "rustc_version", + "serde", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid", +] + +[[package]] +name = "azure_storage" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22c413e8459badf86c9e6e0c84f5894609663bcc8fa5eb1e49bfb985273dac58" +dependencies = [ + "RustyXML", + "async-trait", + "azure_core", + "base64", + "bytes 1.1.0", + "chrono", + "futures", + "http", + "log", + "once_cell", + "ring", + "serde", + "serde-xml-rs", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid", +] + +[[package]] +name = "azure_storage_blobs" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a70ec6fab8a2cae5d774098267870c0f3fbef1cb63cac12afab38b8c17cc8d97" +dependencies = [ + "RustyXML", + "azure_core", + "azure_storage", + "base64", + "bytes 1.1.0", + "chrono", + "futures", + "http", + "log", + "md5", + "serde", + "serde-xml-rs", + "serde_derive", + "serde_json", + "thiserror", + "url", + "uuid", +] + [[package]] name = "backoff" version = "0.4.0" @@ -210,6 +294,15 @@ dependencies = [ "wyz", ] +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.1" @@ -324,6 +417,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "serde", "time", "winapi 0.3.9", ] @@ -374,6 +468,22 @@ dependencies = [ "serde", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "coverage" version = "0.1.0" @@ -621,13 +731,22 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499" +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8549e6bfdecd113b7e221fe60b433087f6957387a20f8118ebca9b12af19143d" dependencies = [ - "block-buffer", + "block-buffer 0.10.1", "crypto-common", "generic-array", ] @@ -656,6 +775,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453440c271cf5577fd2a40e4942540cb7d0d2f85e27c8d07dd0023c925a67541" +[[package]] +name = "dyn-clone" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21e50f3adc76d6a43f5ed73b698a87d0760ca74617f60f7c3b879003536fdd28" + [[package]] name = "dynamic-library" version = "0.1.0" @@ -812,6 +937,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared 0.1.1", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -819,7 +953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" dependencies = [ "foreign-types-macros", - "foreign-types-shared", + "foreign-types-shared 0.3.0", ] [[package]] @@ -833,6 +967,12 @@ dependencies = [ "syn 1.0.76", ] +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "foreign-types-shared" version = "0.3.0" @@ -1230,6 +1370,19 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.1.0", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iced-x86" version = "1.15.0" @@ -1312,7 +1465,7 @@ dependencies = [ "log", "num_cpus", "rayon", - "sha2", + "sha2 0.10.1", "win-util", "winapi 0.3.9", ] @@ -1411,9 +1564,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.103" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" +checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" [[package]] name = "libclusterfuzz" @@ -1483,6 +1636,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.4.1" @@ -1622,6 +1781,24 @@ dependencies = [ "getrandom 0.2.3", ] +[[package]] +name = "native-tls" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09bf6f32a3afefd0b587ee42ed19acd945c6d1f3b5424040f50b2f24ab16be77" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "net2" version = "0.2.37" @@ -1755,6 +1932,26 @@ dependencies = [ "libc", ] +[[package]] +name = "oauth2" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e47cfc4c0a1a519d9a025ebfbac3a2439d1b5cdf397d72dcb79b11d9920dab" +dependencies = [ + "base64", + "chrono", + "getrandom 0.2.3", + "http", + "rand 0.8.4", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "sha2 0.9.9", + "thiserror", + "url", +] + [[package]] name = "object" version = "0.27.0" @@ -1803,7 +2000,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "sha2", + "sha2 0.10.1", "stacktrace-parser", "storage-queue", "structopt", @@ -1829,6 +2026,9 @@ dependencies = [ "arraydeque", "async-trait", "atexit", + "azure_core", + "azure_storage", + "azure_storage_blobs", "backoff", "clap", "coverage", @@ -1846,6 +2046,7 @@ dependencies = [ "reqwest", "reqwest-retry", "serde", + "serde-xml-rs", "serde_json", "stacktrace-parser", "storage-queue", @@ -1903,6 +2104,45 @@ dependencies = [ "z3-sys", ] +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + +[[package]] +name = "openssl" +version = "0.10.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "foreign-types 0.3.2", + "libc", + "once_cell", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "os_pipe" version = "1.0.0" @@ -2372,11 +2612,13 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "lazy_static", "log", "mime", + "native-tls", "percent-encoding", "pin-project-lite", "rustls", @@ -2385,6 +2627,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "url", @@ -2448,6 +2691,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustls" version = "0.20.2" @@ -2490,6 +2742,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi 0.3.9", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2526,6 +2788,35 @@ dependencies = [ "untrusted", ] +[[package]] +name = "security-framework" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "semver" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" + [[package]] name = "serde" version = "1.0.130" @@ -2570,6 +2861,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7868ad3b8196a8a0aea99a8220b124278ee5320a55e4fde97794b6f85b1a377" +dependencies = [ + "serde", +] + [[package]] name = "serde_qs" version = "0.8.4" @@ -2593,6 +2893,19 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.10.1" @@ -2601,7 +2914,7 @@ checksum = "99c3bd8169c58782adad9290a9af5939994036b76187f7b4f0e6de91dbbfc0ec" dependencies = [ "cfg-if 1.0.0", "cpufeatures", - "digest", + "digest 0.10.0", ] [[package]] @@ -2756,7 +3069,7 @@ dependencies = [ "regex", "serde", "serde_json", - "sha2", + "sha2 0.10.1", ] [[package]] @@ -3067,6 +3380,16 @@ dependencies = [ "syn 1.0.76", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.1" @@ -3208,7 +3531,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b737c38ba258c25916dd4002b12e631b180b6ea63528147a94d6364f68d886df" dependencies = [ - "foreign-types", + "foreign-types 0.5.0", "libc", "unwind-sys", ] @@ -3272,6 +3595,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/src/agent/onefuzz-agent/Cargo.toml b/src/agent/onefuzz-agent/Cargo.toml index cbbaa04312..70e976adf2 100644 --- a/src/agent/onefuzz-agent/Cargo.toml +++ b/src/agent/onefuzz-agent/Cargo.toml @@ -28,6 +28,7 @@ num_cpus = "1.13" reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features=false } serde = "1.0" serde_json = "1.0" +serde-xml-rs = "0.5.1" onefuzz = { path = "../onefuzz" } onefuzz-telemetry = { path = "../onefuzz-telemetry" } path-absolutize = "3.0" @@ -44,3 +45,7 @@ tokio-stream = "0.1" tui = { version = "0.16", default-features = false, features = ['crossterm'] } url = { version = "2.2", features = ["serde"] } uuid = { version = "0.8", features = ["serde", "v4"] } + +azure_core = "0.1.1" +azure_storage = "0.1.0" +azure_storage_blobs = "0.1.0" \ No newline at end of file diff --git a/src/agent/onefuzz-agent/src/local/tui.rs b/src/agent/onefuzz-agent/src/local/tui.rs index 7662d4f222..fc76507f84 100644 --- a/src/agent/onefuzz-agent/src/local/tui.rs +++ b/src/agent/onefuzz-agent/src/local/tui.rs @@ -11,7 +11,7 @@ use crossterm::{ use futures::{StreamExt, TryStreamExt}; use log::Level; use onefuzz::utils::try_wait_all_join_handles; -use onefuzz_telemetry::{self, EventData}; +use onefuzz_telemetry::{self, EventData, LogEvent}; use std::{ collections::HashMap, io::{self, Stdout}, @@ -233,13 +233,14 @@ impl TerminalUi { while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) { match rx.try_recv() { - Ok((_event, data)) => { + Ok(LogEvent::Event((_event, data))) => { let data = data .into_iter() .filter(Self::filter_event) .collect::>(); let _ = ui_event_tx.send(TerminalEvent::Telemetry(data)); } + Ok(_) => continue, Err(TryRecvError::Empty) => sleep(EVENT_POLLING_PERIOD).await, Err(TryRecvError::Lagged(_)) => continue, Err(TryRecvError::Closed) => break, diff --git a/src/agent/onefuzz-agent/src/tasks/config.rs b/src/agent/onefuzz-agent/src/tasks/config.rs index fbd6bb65ac..c6db3f2293 100644 --- a/src/agent/onefuzz-agent/src/tasks/config.rs +++ b/src/agent/onefuzz-agent/src/tasks/config.rs @@ -7,7 +7,7 @@ use crate::tasks::coverage; use crate::tasks::{ analysis, fuzz, heartbeat::{init_task_heartbeat, TaskHeartbeatClient}, - merge, regression, report, + merge, regression, report, task_logger, }; use anyhow::Result; use onefuzz::machine_id::{get_machine_id, get_scaleset_name}; @@ -46,6 +46,8 @@ pub struct CommonConfig { pub microsoft_telemetry_key: Option, + pub logs: Option, + #[serde(default)] pub setup_dir: PathBuf, @@ -204,6 +206,7 @@ impl Config { telemetry::set_property(EventData::Version(env!("ONEFUZZ_VERSION").to_string())); telemetry::set_property(EventData::InstanceId(self.common().instance_id)); telemetry::set_property(EventData::Role(Role::Agent)); + let scaleset = get_scaleset_name().await?; if let Some(scaleset_name) = &scaleset { telemetry::set_property(EventData::ScalesetId(scaleset_name.to_string())); @@ -212,6 +215,20 @@ impl Config { info!("agent ready, dispatching task"); self.report_event(); + let common = self.common().clone(); + if let Some(logs) = common.logs.clone() { + let rx = onefuzz_telemetry::subscribe_to_events(); + + let _logging = tokio::spawn(async move { + let logger = task_logger::TaskLogger::new( + common.job_id, + common.task_id, + get_machine_id().await?, + ); + logger.start(rx, logs).await + }); + } + match self { #[cfg(any(target_os = "linux", target_os = "windows"))] Config::Coverage(config) => coverage::generic::CoverageTask::new(config).run().await, diff --git a/src/agent/onefuzz-agent/src/tasks/mod.rs b/src/agent/onefuzz-agent/src/tasks/mod.rs index 8abca308e8..c8fe200906 100644 --- a/src/agent/onefuzz-agent/src/tasks/mod.rs +++ b/src/agent/onefuzz-agent/src/tasks/mod.rs @@ -12,4 +12,5 @@ pub mod merge; pub mod regression; pub mod report; pub mod stats; +pub mod task_logger; pub mod utils; diff --git a/src/agent/onefuzz-agent/src/tasks/task_logger.rs b/src/agent/onefuzz-agent/src/tasks/task_logger.rs new file mode 100644 index 0000000000..6a31426755 --- /dev/null +++ b/src/agent/onefuzz-agent/src/tasks/task_logger.rs @@ -0,0 +1,604 @@ +#![allow(clippy::if_same_then_else)] +#![allow(dead_code)] + +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use azure_core::HttpError; +use azure_storage::core::prelude::*; +use azure_storage_blobs::prelude::*; +use futures::{StreamExt, TryStreamExt}; +use onefuzz_telemetry::LogEvent; +use reqwest::{StatusCode, Url}; +use std::{path::PathBuf, sync::Arc, time::Duration}; +use uuid::Uuid; + +use tokio::sync::broadcast::Receiver; + +const LOGS_BUFFER_SIZE: usize = 100; +const MAX_LOG_SIZE: u64 = 100000000; // 100 MB +const DEFAULT_LOGGING_INTERVAL: Duration = Duration::from_secs(60); +const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(5); + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +#[serde(rename = "Error")] +struct RequestError { + code: String, + message: String, +} + +#[derive(PartialEq, Debug)] +enum WriteLogResponse { + Success, + /// The message needs to be split into multiple parts. + MessageTooLarge, + /// the log file is full we need a new file + MaxSizeReached, +} + +/// Abstracts the operation needed to write logs +#[async_trait] +trait LogWriter: Send + Sync { + async fn write_logs(&self, logs: &[LogEvent]) -> Result; + /// creates a new blob file and returns the logWriter associated with it + async fn get_next_writer(&self) -> Result>>; +} + +/// Writes logs on azure blobs +pub struct BlobLogWriter { + container_client: Arc, + task_id: Uuid, + machine_id: Uuid, + blob_id: usize, + max_log_size: u64, +} + +impl BlobLogWriter { + fn get_blob_name(&self) -> String { + format!("{}/{}/{}.log", self.task_id, self.machine_id, self.blob_id) + } + + pub async fn create( + task_id: Uuid, + machine_id: Uuid, + log_container: Url, + max_log_size: u64, + ) -> Result { + let container_client = TaskLogger::create_container_client(&log_container)?; + let prefix = format!("{}/{}", task_id, machine_id); + let blob_list = container_client + .list_blobs() + .prefix(prefix.as_str()) + .execute() + .await + .map_err(|e| anyhow!(e.to_string()))?; + let mut blob_ids = blob_list + .blobs + .blobs + .iter() + .filter_map(|b| { + b.name + .strip_prefix(&prefix) + .map(PathBuf::from) + .filter(|file_name| { + file_name.extension().and_then(|f| f.to_str()) == Some("log") + }) + .map(|file_name| file_name.with_extension("")) + .and_then(|file_name| { + file_name + .with_extension("") + .to_str() + .and_then(|f| f.parse::().ok()) + }) + }) + .collect::>(); + blob_ids.sort_unstable(); + + let blob_id = match blob_ids.into_iter().last() { + Some(id) => id, + None => { + let blob_client = container_client.as_blob_client(format!("{}/1.log", prefix)); + blob_client + .put_append_blob() + .execute() + .await + .map_err(|e| anyhow!(e.to_string()))?; + 1 + } + }; + + Ok(Self { + container_client, + task_id, + machine_id, + blob_id, + max_log_size, + }) + } +} + +#[async_trait] +impl LogWriter for BlobLogWriter { + async fn write_logs(&self, logs: &[LogEvent]) -> Result { + let blob_name = self.get_blob_name(); + print!("{}", blob_name); + let blob_client = self.container_client.as_blob_client(blob_name); + let data_stream = logs + .iter() + .flat_map(|log_event| match log_event { + LogEvent::Event((ev, data)) => format!( + "{}: {}\n", + ev.as_str(), + data.iter() + .map(|p| p.as_values()) + .map(|(name, val)| format!("{} {}", name, val)) + .collect::>() + .join(", ") + ) + .into_bytes(), + LogEvent::Trace((level, msg)) => { + format!("{}: {}\n", level.as_str(), msg).into_bytes() + } + }) + .collect::>(); + + let result = blob_client + .append_block(data_stream) + .condition_max_size(self.max_log_size) + .execute() + .await; + + match result { + Ok(_r) => Ok(WriteLogResponse::Success), + Err(e) => match e.downcast_ref::() { + Some(HttpError::StatusCode { status: s, body: b }) => { + if s == &StatusCode::PRECONDITION_FAILED + && b.contains("MaxBlobSizeConditionNotMet") + { + Ok(WriteLogResponse::MaxSizeReached) + } else if s == &StatusCode::CONFLICT && b.contains("BlockCountExceedsLimit") { + Ok(WriteLogResponse::MaxSizeReached) + } else if s == &StatusCode::PAYLOAD_TOO_LARGE { + Ok(WriteLogResponse::MessageTooLarge) + } else { + Err(anyhow!(e.to_string())) + } + } + _ => Err(anyhow!(e.to_string())), + }, + } + } + async fn get_next_writer(&self) -> Result>> { + let new_writer = Self { + blob_id: self.blob_id + 1, + container_client: self.container_client.clone(), + task_id: self.task_id, + machine_id: self.machine_id, + max_log_size: self.max_log_size, + }; + + let blob_client = self + .container_client + .as_blob_client(new_writer.get_blob_name()); + blob_client + .put_append_blob() + .execute() + .await + .map_err(|e| anyhow!(e.to_string()))?; + + Ok(Box::new(new_writer)) + } +} + +#[derive(Debug, Clone)] +pub struct TaskLogger { + job_id: Uuid, + task_id: Uuid, + machine_id: Uuid, + logging_interval: Duration, + log_buffer_size: usize, + polling_interval: Duration, +} + +enum LoopState { + Receive, + InitLog { start: usize, count: usize }, + Send { start: usize, count: usize }, +} + +struct LoopContext { + pub log_writer: Box>, + pub pending_logs: Vec, + pub state: LoopState, + pub event: Receiver, +} + +impl TaskLogger { + pub fn new(job_id: Uuid, task_id: Uuid, machine_id: Uuid) -> Self { + Self { + job_id, + task_id, + machine_id, + logging_interval: DEFAULT_LOGGING_INTERVAL, + log_buffer_size: LOGS_BUFFER_SIZE, + polling_interval: DEFAULT_POLLING_INTERVAL, + } + } + + fn create_container_client(log_container: &Url) -> Result> { + let account = log_container + .domain() + .and_then(|d| d.split('.').next()) + .ok_or(anyhow!("Invalid log container"))? + .to_owned(); + let container = log_container + .path_segments() + .and_then(|mut ps| ps.next()) + .ok_or(anyhow!("Invalid log container"))? + .to_owned(); + let sas_token = log_container + .query() + .ok_or(anyhow!("Invalid log container"))?; + + let http_client = azure_core::new_http_client(); + let storage_account_client = + StorageAccountClient::new_sas_token(http_client, account, sas_token)?; + Ok(storage_account_client.as_container_client(container)) + } + + async fn event_loop( + self, + log_writer: Box>, + event: Receiver, + ) -> Result<()> { + let initial_state = LoopContext { + log_writer, + pending_logs: vec![], + state: LoopState::Receive, + event, + }; + + let _loop_result = futures::stream::repeat(0) + .map(Ok) + .try_fold(initial_state, |context, _| async { + match context.state { + LoopState::Send { start, count } => { + match context + .log_writer + .write_logs(&context.pending_logs[start..start + count]) + .await? + { + WriteLogResponse::Success => { + if start + count >= context.pending_logs.len() { + Result::<_, anyhow::Error>::Ok(LoopContext { + pending_logs: vec![], + state: LoopState::Receive, + ..context + }) + } else { + let new_start = start + 1; + let new_count = context.pending_logs.len() - new_start; + Result::<_, anyhow::Error>::Ok(LoopContext { + state: LoopState::Send { + start: new_start, + count: new_count, + }, + ..context + }) + } + } + + WriteLogResponse::MaxSizeReached => { + Result::<_, anyhow::Error>::Ok(LoopContext { + state: LoopState::InitLog { start, count }, + ..context + }) + } + WriteLogResponse::MessageTooLarge => { + // split the logs here + Result::<_, anyhow::Error>::Ok(LoopContext { + state: LoopState::Send { + start, + count: count / 2, + }, + ..context + }) + } + } + } + LoopState::InitLog { start, count } => { + let new_writer = context.log_writer.get_next_writer().await?; + Result::<_, anyhow::Error>::Ok(LoopContext { + log_writer: new_writer, + state: LoopState::Send { start, count }, + ..context + }) + } + LoopState::Receive => { + let mut event = context.event; + let mut data = Vec::with_capacity(self.log_buffer_size); + let now = tokio::time::Instant::now(); + loop { + if data.len() >= self.log_buffer_size { + break; + } + + if tokio::time::Instant::now() - now > self.logging_interval { + break; + } + + if let Ok(v) = event.try_recv() { + data.push(v); + } else { + tokio::time::sleep(self.polling_interval).await; + } + } + + if !data.is_empty() { + Result::<_, anyhow::Error>::Ok(LoopContext { + state: LoopState::Send { + start: 0, + count: data.len(), + }, + pending_logs: data, + event, + ..context + }) + } else { + Result::<_, anyhow::Error>::Ok(LoopContext { event, ..context }) + } + } + } + }) + .await; + + Ok(()) + } + + pub async fn start(&self, event: Receiver, log_container: Url) -> Result<()> { + let blob_writer = + BlobLogWriter::create(self.task_id, self.machine_id, log_container, MAX_LOG_SIZE) + .await?; + + self._start(event, Box::new(blob_writer)).await + } + + async fn _start( + &self, + event: Receiver, + log_writer: Box>, + ) -> Result<()> { + self.clone().event_loop(log_writer, event).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::RwLock}; + + use super::*; + use reqwest::Url; + + #[tokio::test] + #[ignore] + async fn test_get_blob() -> Result<()> { + let url = std::env::var("test_blob_logger_container")?; + let log_container = Url::parse(&url)?; + let client = TaskLogger::create_container_client(&log_container)?; + + let response = client + .list_blobs() + .prefix(format!("job1/tak1/1")) + .execute() + .await + .map_err(|e| anyhow!(e.to_string()))?; + + println!("blob prefix {:?}", response.blobs.blob_prefix); + for blob in response.blobs.blobs { + println!("{}", blob.name); + } + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn test_write_log() -> Result<()> { + let url = std::env::var("test_blob_logger_container")?; + let log_container = Url::parse(&url)?; + let blob_logger = TaskLogger::new(Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()); + + let (tx, rx) = tokio::sync::broadcast::channel(16); + + tx.send(LogEvent::Trace((log::Level::Info, "test".into())))?; + + blob_logger.start(rx, log_container).await?; + Ok(()) + } + + pub struct TestLogWriter { + events: Arc>>>, + id: usize, + max_size: usize, + } + + #[async_trait] + impl LogWriter for TestLogWriter { + async fn write_logs(&self, logs: &[LogEvent]) -> Result { + let mut events = self.events.write().unwrap(); + let entry = &mut *events.entry(self.id).or_insert(Vec::new()); + if entry.len() >= self.max_size { + Ok(WriteLogResponse::MaxSizeReached) + } else if logs.len() > 1 { + Ok(WriteLogResponse::MessageTooLarge) + } else { + for v in logs { + entry.push(v.clone()); + } + Ok(WriteLogResponse::Success) + } + } + async fn get_next_writer(&self) -> Result>> { + Ok(Box::new(Self { + events: self.events.clone(), + id: self.id + 1, + ..*self + })) + } + } + + #[tokio::test] + async fn test_task_logger_normal_messages() -> Result<()> { + let events = Arc::new(RwLock::new(HashMap::new())); + let log_writer = Box::new(TestLogWriter { + id: 0, + events: events.clone(), + max_size: 1, + }); + + let blob_logger = TaskLogger { + job_id: Uuid::new_v4(), + task_id: Uuid::new_v4(), + machine_id: Uuid::new_v4(), + logging_interval: Duration::from_secs(1), + log_buffer_size: 1, + polling_interval: Duration::from_secs(1), + }; + + let (tx, rx) = tokio::sync::broadcast::channel(16); + tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?; + + let _res = + tokio::time::timeout(Duration::from_secs(5), blob_logger._start(rx, log_writer)).await; + + let x = events.read().unwrap(); + + for (k, values) in x.iter() { + println!("{}", k); + for v in values { + println!(" {:?}", v); + } + } + + assert_eq!(x.keys().len(), 5, "expected 5 groups of messages"); + Ok(()) + } + + #[tokio::test] + async fn test_task_logger_big_messages() -> Result<()> { + let events = Arc::new(RwLock::new(HashMap::new())); + let log_writer = Box::new(TestLogWriter { + id: 0, + events: events.clone(), + max_size: 2, + }); + + let blob_logger = TaskLogger { + job_id: Uuid::new_v4(), + task_id: Uuid::new_v4(), + machine_id: Uuid::new_v4(), + logging_interval: Duration::from_secs(3), + log_buffer_size: 2, + polling_interval: Duration::from_secs(1), + }; + + let (tx, rx) = tokio::sync::broadcast::channel(16); + tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?; + tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?; + + let _res = + tokio::time::timeout(Duration::from_secs(15), blob_logger._start(rx, log_writer)).await; + + let x = events.read().unwrap(); + + for (k, values) in x.iter() { + println!("{}", k); + for v in values { + println!(" {:?}", v); + } + } + + assert_eq!(x.keys().len(), 3, "expected 3 groups of messages"); + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn test_blob_writer_create() -> Result<()> { + let url = std::env::var("test_blob_logger_container")?; + let blob_writer = + BlobLogWriter::create(Uuid::new_v4(), Uuid::new_v4(), Url::parse(&url)?, 15).await?; + + let blob_prefix = format!("{}/{}", blob_writer.task_id, blob_writer.machine_id); + + print!("blob prefix {}", &blob_prefix); + + let container_client = blob_writer.container_client.clone(); + + let blobs = container_client + .list_blobs() + .prefix(blob_prefix.clone()) + .execute() + .await + .map_err(|e| anyhow!(e.to_string()))?; + + // test initial blob creation + assert_eq!(blobs.blobs.blobs.len(), 1, "expected exactly one blob"); + assert_eq!( + blobs.blobs.blobs[0].name, + format!("{}/1.log", &blob_prefix), + "Wrong file name" + ); + println!("logging test event"); + let result = blob_writer + .write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))]) + .await + .map_err(|e| anyhow!(e.to_string()))?; + + assert_eq!(result, WriteLogResponse::Success, "expected success"); + + // testing that we return MaxSizeReached when the size is exceeded + let result = blob_writer + .write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))]) + .await + .map_err(|e| anyhow!(e.to_string()))?; + + assert_eq!( + result, + WriteLogResponse::MaxSizeReached, + "expected MaxSizeReached" + ); + + // testing the creation of new blob when we call get_next_writer() + let _blob_writer = blob_writer.get_next_writer().await?; + + let blobs = container_client + .list_blobs() + .prefix(blob_prefix.clone()) + .execute() + .await + .map_err(|e| anyhow!(e.to_string()))?; + + assert_eq!(blobs.blobs.blobs.len(), 2, "expected exactly 2 blob"); + let blob_names = blobs + .blobs + .blobs + .iter() + .map(|b| b.name.clone()) + .collect::>(); + + assert!( + blob_names.contains(&format!("{}/2.log", &blob_prefix)), + "expected 2.log" + ); + + Ok(()) + } +} diff --git a/src/agent/onefuzz-telemetry/src/lib.rs b/src/agent/onefuzz-telemetry/src/lib.rs index 95f3735ce7..adaa4aac14 100644 --- a/src/agent/onefuzz-telemetry/src/lib.rs +++ b/src/agent/onefuzz-telemetry/src/lib.rs @@ -333,6 +333,12 @@ impl EventData { } } +#[derive(Clone, Debug)] +pub enum LogEvent { + Trace((log::Level, String)), + Event((Event, Vec)), +} + mod global { use std::sync::{ atomic::{AtomicUsize, Ordering}, @@ -355,7 +361,7 @@ mod global { }; lazy_static! { - pub static ref EVENT_SOURCE: Sender<(Event, Vec)> = { + pub static ref EVENT_SOURCE: Sender = { let (telemetry_event_source, _) = broadcast::channel::<_>(100); telemetry_event_source }; @@ -532,10 +538,21 @@ fn try_broadcast_event(event: &Event, properties: &[EventData]) -> bool { // we ignore any send error here because they indicate that // there are no receivers on the other end let (event, properties) = (event.clone(), properties.to_vec()); - global::EVENT_SOURCE.send((event, properties)).is_ok() + global::EVENT_SOURCE + .send(LogEvent::Event((event, properties))) + .is_ok() +} + +pub fn try_broadcast_trace(msg: String, level: log::Level) -> bool { + // we ignore any send error here because they indicate that + // there are no receivers on the other end + + global::EVENT_SOURCE + .send(LogEvent::Trace((level, msg))) + .is_ok() } -pub fn subscribe_to_events() -> Receiver<(Event, Vec)> { +pub fn subscribe_to_events() -> Receiver { global::EVENT_SOURCE.subscribe() } @@ -616,6 +633,7 @@ macro_rules! log { if log_level <= log::max_level() { let msg = format!("{}", format_args!($($arg)+)); log::log!(log_level, "{}", msg); + onefuzz_telemetry::try_broadcast_trace(msg.to_string(), log_level); onefuzz_telemetry::log_message($level, msg.to_string()); } }};