From 7fde79ee9772a08e0940b8f7ef6299abbb5b941a Mon Sep 17 00:00:00 2001 From: Teo Voinea <58236992+tevoinea@users.noreply.github.com> Date: Thu, 26 May 2022 08:34:41 -0400 Subject: [PATCH] Add timestamp to agent log (#1972) * Add timestamp to agent log * Add timestamp and test for events too * Move timestamp into task_logger to avoid sending it to app insights * Update Cargo.toml * Update lib.rs * Update cargo.lock * Update LogEvent to contain timestamp * Uncomment ignore * fmt * PR comment --- src/agent/Cargo.lock | 2 + src/agent/onefuzz-agent/Cargo.toml | 3 +- src/agent/onefuzz-agent/src/local/tui.rs | 7 +- .../onefuzz-agent/src/tasks/task_logger.rs | 115 +++++++++++++----- src/agent/onefuzz-telemetry/Cargo.toml | 3 +- src/agent/onefuzz-telemetry/src/lib.rs | 47 +++++-- 6 files changed, 130 insertions(+), 47 deletions(-) diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 83b7628dad..c773e0d080 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -1899,6 +1899,7 @@ dependencies = [ "azure_storage", "azure_storage_blobs", "backoff", + "chrono", "clap", "coverage", "crossterm 0.22.1", @@ -1964,6 +1965,7 @@ name = "onefuzz-telemetry" version = "0.1.0" dependencies = [ "appinsights", + "chrono", "iced-x86", "lazy_static", "log", diff --git a/src/agent/onefuzz-agent/Cargo.toml b/src/agent/onefuzz-agent/Cargo.toml index 4a2f89b9b3..8847d14922 100644 --- a/src/agent/onefuzz-agent/Cargo.toml +++ b/src/agent/onefuzz-agent/Cargo.toml @@ -45,7 +45,8 @@ tokio-stream = "0.1" tui = { version = "0.16", default-features = false, features = ['crossterm'] } url = { version = "2.2", features = ["serde"] } uuid = { version = "0.8", features = ["serde", "v4"] } +chrono = "0.4" azure_core = { version = "0.1.1", default-features = false, features = ["enable_reqwest_rustls"] } azure_storage = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] } -azure_storage_blobs = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] } \ No newline at end of file +azure_storage_blobs = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] } diff --git a/src/agent/onefuzz-agent/src/local/tui.rs b/src/agent/onefuzz-agent/src/local/tui.rs index fc76507f84..9319ea07e4 100644 --- a/src/agent/onefuzz-agent/src/local/tui.rs +++ b/src/agent/onefuzz-agent/src/local/tui.rs @@ -11,7 +11,7 @@ use crossterm::{ use futures::{StreamExt, TryStreamExt}; use log::Level; use onefuzz::utils::try_wait_all_join_handles; -use onefuzz_telemetry::{self, EventData, LogEvent}; +use onefuzz_telemetry::{self, EventData, LoggingEvent}; use std::{ collections::HashMap, io::{self, Stdout}, @@ -233,8 +233,9 @@ impl TerminalUi { while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) { match rx.try_recv() { - Ok(LogEvent::Event((_event, data))) => { - let data = data + Ok(LoggingEvent::Event(log_event)) => { + let data = log_event + .data .into_iter() .filter(Self::filter_event) .collect::>(); diff --git a/src/agent/onefuzz-agent/src/tasks/task_logger.rs b/src/agent/onefuzz-agent/src/tasks/task_logger.rs index 6a31426755..259f60e272 100644 --- a/src/agent/onefuzz-agent/src/tasks/task_logger.rs +++ b/src/agent/onefuzz-agent/src/tasks/task_logger.rs @@ -7,7 +7,7 @@ use azure_core::HttpError; use azure_storage::core::prelude::*; use azure_storage_blobs::prelude::*; use futures::{StreamExt, TryStreamExt}; -use onefuzz_telemetry::LogEvent; +use onefuzz_telemetry::LoggingEvent; use reqwest::{StatusCode, Url}; use std::{path::PathBuf, sync::Arc, time::Duration}; use uuid::Uuid; @@ -39,7 +39,7 @@ enum WriteLogResponse { /// Abstracts the operation needed to write logs #[async_trait] trait LogWriter: Send + Sync { - async fn write_logs(&self, logs: &[LogEvent]) -> Result; + async fn write_logs(&self, logs: &[LoggingEvent]) -> Result; /// creates a new blob file and returns the logWriter associated with it async fn get_next_writer(&self) -> Result>>; } @@ -119,26 +119,33 @@ impl BlobLogWriter { #[async_trait] impl LogWriter for BlobLogWriter { - async fn write_logs(&self, logs: &[LogEvent]) -> Result { + async fn write_logs(&self, logs: &[LoggingEvent]) -> Result { let blob_name = self.get_blob_name(); print!("{}", blob_name); let blob_client = self.container_client.as_blob_client(blob_name); let data_stream = logs .iter() .flat_map(|log_event| match log_event { - LogEvent::Event((ev, data)) => format!( - "{}: {}\n", - ev.as_str(), - data.iter() + LoggingEvent::Event(log_event) => format!( + "[{}] {}: {}\n", + log_event.timestamp, + log_event.event.as_str(), + log_event + .data + .iter() .map(|p| p.as_values()) .map(|(name, val)| format!("{} {}", name, val)) .collect::>() .join(", ") ) .into_bytes(), - LogEvent::Trace((level, msg)) => { - format!("{}: {}\n", level.as_str(), msg).into_bytes() - } + LoggingEvent::Trace(log_trace) => format!( + "[{}] {}: {}\n", + log_trace.timestamp, + log_trace.level.as_str(), + log_trace.message + ) + .into_bytes(), }) .collect::>(); @@ -208,9 +215,9 @@ enum LoopState { struct LoopContext { pub log_writer: Box>, - pub pending_logs: Vec, + pub pending_logs: Vec, pub state: LoopState, - pub event: Receiver, + pub event: Receiver, } impl TaskLogger { @@ -249,7 +256,7 @@ impl TaskLogger { async fn event_loop( self, log_writer: Box>, - event: Receiver, + event: Receiver, ) -> Result<()> { let initial_state = LoopContext { log_writer, @@ -355,7 +362,7 @@ impl TaskLogger { Ok(()) } - pub async fn start(&self, event: Receiver, log_container: Url) -> Result<()> { + pub async fn start(&self, event: Receiver, log_container: Url) -> Result<()> { let blob_writer = BlobLogWriter::create(self.task_id, self.machine_id, log_container, MAX_LOG_SIZE) .await?; @@ -365,7 +372,7 @@ impl TaskLogger { async fn _start( &self, - event: Receiver, + event: Receiver, log_writer: Box>, ) -> Result<()> { self.clone().event_loop(log_writer, event).await?; @@ -378,8 +385,17 @@ mod tests { use std::{collections::HashMap, sync::RwLock}; use super::*; + use onefuzz_telemetry::LogTrace; use reqwest::Url; + fn create_log_trace(level: log::Level, message: String) -> LogTrace { + LogTrace { + timestamp: chrono::Utc::now(), + level, + message, + } + } + #[tokio::test] #[ignore] async fn test_get_blob() -> Result<()> { @@ -410,21 +426,24 @@ mod tests { let (tx, rx) = tokio::sync::broadcast::channel(16); - tx.send(LogEvent::Trace((log::Level::Info, "test".into())))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test".into(), + )))?; blob_logger.start(rx, log_container).await?; Ok(()) } pub struct TestLogWriter { - events: Arc>>>, + events: Arc>>>, id: usize, max_size: usize, } #[async_trait] impl LogWriter for TestLogWriter { - async fn write_logs(&self, logs: &[LogEvent]) -> Result { + async fn write_logs(&self, logs: &[LoggingEvent]) -> Result { let mut events = self.events.write().unwrap(); let entry = &mut *events.entry(self.id).or_insert(Vec::new()); if entry.len() >= self.max_size { @@ -466,11 +485,26 @@ mod tests { }; let (tx, rx) = tokio::sync::broadcast::channel(16); - tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test1".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test2".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test3".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test4".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test5".into(), + )))?; let _res = tokio::time::timeout(Duration::from_secs(5), blob_logger._start(rx, log_writer)).await; @@ -507,11 +541,26 @@ mod tests { }; let (tx, rx) = tokio::sync::broadcast::channel(16); - tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?; - tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test1".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test2".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test3".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test4".into(), + )))?; + tx.send(LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test5".into(), + )))?; let _res = tokio::time::timeout(Duration::from_secs(15), blob_logger._start(rx, log_writer)).await; @@ -558,7 +607,10 @@ mod tests { ); println!("logging test event"); let result = blob_writer - .write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))]) + .write_logs(&[LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test".into(), + ))]) .await .map_err(|e| anyhow!(e.to_string()))?; @@ -566,7 +618,10 @@ mod tests { // testing that we return MaxSizeReached when the size is exceeded let result = blob_writer - .write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))]) + .write_logs(&[LoggingEvent::Trace(create_log_trace( + log::Level::Info, + "test".into(), + ))]) .await .map_err(|e| anyhow!(e.to_string()))?; diff --git a/src/agent/onefuzz-telemetry/Cargo.toml b/src/agent/onefuzz-telemetry/Cargo.toml index 7f1eb01453..bc50898e2a 100644 --- a/src/agent/onefuzz-telemetry/Cargo.toml +++ b/src/agent/onefuzz-telemetry/Cargo.toml @@ -23,5 +23,4 @@ z3-sys = { version = "0.6", optional = true} iced-x86 = { version = "1.15", optional = true} tokio = { version = "1.15", features = ["full"] } lazy_static = "1.4" - - +chrono = "0.4" diff --git a/src/agent/onefuzz-telemetry/src/lib.rs b/src/agent/onefuzz-telemetry/src/lib.rs index adaa4aac14..c81999d317 100644 --- a/src/agent/onefuzz-telemetry/src/lib.rs +++ b/src/agent/onefuzz-telemetry/src/lib.rs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +use chrono::DateTime; #[cfg(feature = "intel_instructions")] use iced_x86::{Code as IntelInstructionCode, Mnemonic as IntelInstructionMnemonic}; use serde::{Deserialize, Serialize}; @@ -10,6 +11,8 @@ use uuid::Uuid; #[cfg(feature = "z3")] use z3_sys::ErrorCode as Z3ErrorCode; +pub use chrono::Utc; + pub use appinsights::telemetry::SeverityLevel::{Critical, Error, Information, Verbose, Warning}; use tokio::sync::broadcast::{self, Receiver}; #[macro_use] @@ -334,9 +337,23 @@ impl EventData { } #[derive(Clone, Debug)] -pub enum LogEvent { - Trace((log::Level, String)), - Event((Event, Vec)), +pub enum LoggingEvent { + Trace(LogTrace), + Event(LogEvent), +} + +#[derive(Clone, Debug)] +pub struct LogTrace { + pub timestamp: DateTime, + pub level: log::Level, + pub message: String, +} + +#[derive(Clone, Debug)] +pub struct LogEvent { + pub timestamp: DateTime, + pub event: Event, + pub data: Vec, } mod global { @@ -361,7 +378,7 @@ mod global { }; lazy_static! { - pub static ref EVENT_SOURCE: Sender = { + pub static ref EVENT_SOURCE: Sender = { let (telemetry_event_source, _) = broadcast::channel::<_>(100); telemetry_event_source }; @@ -534,25 +551,33 @@ pub fn format_events(events: &[EventData]) -> String { .join(" ") } -fn try_broadcast_event(event: &Event, properties: &[EventData]) -> bool { +fn try_broadcast_event(timestamp: DateTime, event: &Event, properties: &[EventData]) -> bool { // we ignore any send error here because they indicate that // there are no receivers on the other end let (event, properties) = (event.clone(), properties.to_vec()); global::EVENT_SOURCE - .send(LogEvent::Event((event, properties))) + .send(LoggingEvent::Event(LogEvent { + timestamp, + event, + data: properties, + })) .is_ok() } -pub fn try_broadcast_trace(msg: String, level: log::Level) -> bool { +pub fn try_broadcast_trace(timestamp: DateTime, msg: String, level: log::Level) -> bool { // we ignore any send error here because they indicate that // there are no receivers on the other end global::EVENT_SOURCE - .send(LogEvent::Trace((level, msg))) + .send(LoggingEvent::Trace(LogTrace { + timestamp, + level, + message: msg, + })) .is_ok() } -pub fn subscribe_to_events() -> Receiver { +pub fn subscribe_to_events() -> Receiver { global::EVENT_SOURCE.subscribe() } @@ -581,7 +606,7 @@ pub fn track_event(event: &Event, properties: &[EventData]) { } client.track(evt); } - try_broadcast_event(event, properties); + try_broadcast_event(chrono::Utc::now(), event, properties); } pub fn to_log_level(level: &appinsights::telemetry::SeverityLevel) -> log::Level { @@ -633,7 +658,7 @@ macro_rules! log { if log_level <= log::max_level() { let msg = format!("{}", format_args!($($arg)+)); log::log!(log_level, "{}", msg); - onefuzz_telemetry::try_broadcast_trace(msg.to_string(), log_level); + onefuzz_telemetry::try_broadcast_trace(onefuzz_telemetry::Utc::now(), msg.to_string(), log_level); onefuzz_telemetry::log_message($level, msg.to_string()); } }};