Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Add timestamp to agent log (#1972)
Browse files Browse the repository at this point in the history
* Add timestamp to agent log

* Add timestamp and test for events too

* Move timestamp into task_logger to avoid sending it to app insights

* Update Cargo.toml

* Update lib.rs

* Update cargo.lock

* Update LogEvent to contain timestamp

* Uncomment ignore

* fmt

* PR comment
  • Loading branch information
tevoinea authored May 26, 2022
1 parent 69f31ec commit 7fde79e
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 47 deletions.
2 changes: 2 additions & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/agent/onefuzz-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ tokio-stream = "0.1"
tui = { version = "0.16", default-features = false, features = ['crossterm'] }
url = { version = "2.2", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = "0.4"

azure_core = { version = "0.1.1", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] }
7 changes: 4 additions & 3 deletions src/agent/onefuzz-agent/src/local/tui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crossterm::{
use futures::{StreamExt, TryStreamExt};
use log::Level;
use onefuzz::utils::try_wait_all_join_handles;
use onefuzz_telemetry::{self, EventData, LogEvent};
use onefuzz_telemetry::{self, EventData, LoggingEvent};
use std::{
collections::HashMap,
io::{self, Stdout},
Expand Down Expand Up @@ -233,8 +233,9 @@ impl TerminalUi {

while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) {
match rx.try_recv() {
Ok(LogEvent::Event((_event, data))) => {
let data = data
Ok(LoggingEvent::Event(log_event)) => {
let data = log_event
.data
.into_iter()
.filter(Self::filter_event)
.collect::<Vec<_>>();
Expand Down
115 changes: 85 additions & 30 deletions src/agent/onefuzz-agent/src/tasks/task_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use azure_core::HttpError;
use azure_storage::core::prelude::*;
use azure_storage_blobs::prelude::*;
use futures::{StreamExt, TryStreamExt};
use onefuzz_telemetry::LogEvent;
use onefuzz_telemetry::LoggingEvent;
use reqwest::{StatusCode, Url};
use std::{path::PathBuf, sync::Arc, time::Duration};
use uuid::Uuid;
Expand Down Expand Up @@ -39,7 +39,7 @@ enum WriteLogResponse {
/// Abstracts the operation needed to write logs
#[async_trait]
trait LogWriter<T>: Send + Sync {
async fn write_logs(&self, logs: &[LogEvent]) -> Result<WriteLogResponse>;
async fn write_logs(&self, logs: &[LoggingEvent]) -> Result<WriteLogResponse>;
/// creates a new blob file and returns the logWriter associated with it
async fn get_next_writer(&self) -> Result<Box<dyn LogWriter<T>>>;
}
Expand Down Expand Up @@ -119,26 +119,33 @@ impl BlobLogWriter {

#[async_trait]
impl LogWriter<BlobLogWriter> for BlobLogWriter {
async fn write_logs(&self, logs: &[LogEvent]) -> Result<WriteLogResponse> {
async fn write_logs(&self, logs: &[LoggingEvent]) -> Result<WriteLogResponse> {
let blob_name = self.get_blob_name();
print!("{}", blob_name);
let blob_client = self.container_client.as_blob_client(blob_name);
let data_stream = logs
.iter()
.flat_map(|log_event| match log_event {
LogEvent::Event((ev, data)) => format!(
"{}: {}\n",
ev.as_str(),
data.iter()
LoggingEvent::Event(log_event) => format!(
"[{}] {}: {}\n",
log_event.timestamp,
log_event.event.as_str(),
log_event
.data
.iter()
.map(|p| p.as_values())
.map(|(name, val)| format!("{} {}", name, val))
.collect::<Vec<_>>()
.join(", ")
)
.into_bytes(),
LogEvent::Trace((level, msg)) => {
format!("{}: {}\n", level.as_str(), msg).into_bytes()
}
LoggingEvent::Trace(log_trace) => format!(
"[{}] {}: {}\n",
log_trace.timestamp,
log_trace.level.as_str(),
log_trace.message
)
.into_bytes(),
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -208,9 +215,9 @@ enum LoopState {

struct LoopContext<T: Sized> {
pub log_writer: Box<dyn LogWriter<T>>,
pub pending_logs: Vec<LogEvent>,
pub pending_logs: Vec<LoggingEvent>,
pub state: LoopState,
pub event: Receiver<LogEvent>,
pub event: Receiver<LoggingEvent>,
}

impl TaskLogger {
Expand Down Expand Up @@ -249,7 +256,7 @@ impl TaskLogger {
async fn event_loop<T: Send + Sized>(
self,
log_writer: Box<dyn LogWriter<T>>,
event: Receiver<LogEvent>,
event: Receiver<LoggingEvent>,
) -> Result<()> {
let initial_state = LoopContext {
log_writer,
Expand Down Expand Up @@ -355,7 +362,7 @@ impl TaskLogger {
Ok(())
}

pub async fn start(&self, event: Receiver<LogEvent>, log_container: Url) -> Result<()> {
pub async fn start(&self, event: Receiver<LoggingEvent>, log_container: Url) -> Result<()> {
let blob_writer =
BlobLogWriter::create(self.task_id, self.machine_id, log_container, MAX_LOG_SIZE)
.await?;
Expand All @@ -365,7 +372,7 @@ impl TaskLogger {

async fn _start<T: 'static + Send>(
&self,
event: Receiver<LogEvent>,
event: Receiver<LoggingEvent>,
log_writer: Box<dyn LogWriter<T>>,
) -> Result<()> {
self.clone().event_loop(log_writer, event).await?;
Expand All @@ -378,8 +385,17 @@ mod tests {
use std::{collections::HashMap, sync::RwLock};

use super::*;
use onefuzz_telemetry::LogTrace;
use reqwest::Url;

fn create_log_trace(level: log::Level, message: String) -> LogTrace {
LogTrace {
timestamp: chrono::Utc::now(),
level,
message,
}
}

#[tokio::test]
#[ignore]
async fn test_get_blob() -> Result<()> {
Expand Down Expand Up @@ -410,21 +426,24 @@ mod tests {

let (tx, rx) = tokio::sync::broadcast::channel(16);

tx.send(LogEvent::Trace((log::Level::Info, "test".into())))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test".into(),
)))?;

blob_logger.start(rx, log_container).await?;
Ok(())
}

pub struct TestLogWriter {
events: Arc<RwLock<HashMap<usize, Vec<LogEvent>>>>,
events: Arc<RwLock<HashMap<usize, Vec<LoggingEvent>>>>,
id: usize,
max_size: usize,
}

#[async_trait]
impl LogWriter<TestLogWriter> for TestLogWriter {
async fn write_logs(&self, logs: &[LogEvent]) -> Result<WriteLogResponse> {
async fn write_logs(&self, logs: &[LoggingEvent]) -> Result<WriteLogResponse> {
let mut events = self.events.write().unwrap();
let entry = &mut *events.entry(self.id).or_insert(Vec::new());
if entry.len() >= self.max_size {
Expand Down Expand Up @@ -466,11 +485,26 @@ mod tests {
};

let (tx, rx) = tokio::sync::broadcast::channel(16);
tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test1".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test2".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test3".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test4".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test5".into(),
)))?;

let _res =
tokio::time::timeout(Duration::from_secs(5), blob_logger._start(rx, log_writer)).await;
Expand Down Expand Up @@ -507,11 +541,26 @@ mod tests {
};

let (tx, rx) = tokio::sync::broadcast::channel(16);
tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test1".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test2".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test3".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test4".into(),
)))?;
tx.send(LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test5".into(),
)))?;

let _res =
tokio::time::timeout(Duration::from_secs(15), blob_logger._start(rx, log_writer)).await;
Expand Down Expand Up @@ -558,15 +607,21 @@ mod tests {
);
println!("logging test event");
let result = blob_writer
.write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))])
.write_logs(&[LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test".into(),
))])
.await
.map_err(|e| anyhow!(e.to_string()))?;

assert_eq!(result, WriteLogResponse::Success, "expected success");

// testing that we return MaxSizeReached when the size is exceeded
let result = blob_writer
.write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))])
.write_logs(&[LoggingEvent::Trace(create_log_trace(
log::Level::Info,
"test".into(),
))])
.await
.map_err(|e| anyhow!(e.to_string()))?;

Expand Down
3 changes: 1 addition & 2 deletions src/agent/onefuzz-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ z3-sys = { version = "0.6", optional = true}
iced-x86 = { version = "1.15", optional = true}
tokio = { version = "1.15", features = ["full"] }
lazy_static = "1.4"


chrono = "0.4"
47 changes: 36 additions & 11 deletions src/agent/onefuzz-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

use chrono::DateTime;
#[cfg(feature = "intel_instructions")]
use iced_x86::{Code as IntelInstructionCode, Mnemonic as IntelInstructionMnemonic};
use serde::{Deserialize, Serialize};
Expand All @@ -10,6 +11,8 @@ use uuid::Uuid;
#[cfg(feature = "z3")]
use z3_sys::ErrorCode as Z3ErrorCode;

pub use chrono::Utc;

pub use appinsights::telemetry::SeverityLevel::{Critical, Error, Information, Verbose, Warning};
use tokio::sync::broadcast::{self, Receiver};
#[macro_use]
Expand Down Expand Up @@ -334,9 +337,23 @@ impl EventData {
}

#[derive(Clone, Debug)]
pub enum LogEvent {
Trace((log::Level, String)),
Event((Event, Vec<EventData>)),
pub enum LoggingEvent {
Trace(LogTrace),
Event(LogEvent),
}

#[derive(Clone, Debug)]
pub struct LogTrace {
pub timestamp: DateTime<Utc>,
pub level: log::Level,
pub message: String,
}

#[derive(Clone, Debug)]
pub struct LogEvent {
pub timestamp: DateTime<Utc>,
pub event: Event,
pub data: Vec<EventData>,
}

mod global {
Expand All @@ -361,7 +378,7 @@ mod global {
};

lazy_static! {
pub static ref EVENT_SOURCE: Sender<LogEvent> = {
pub static ref EVENT_SOURCE: Sender<LoggingEvent> = {
let (telemetry_event_source, _) = broadcast::channel::<_>(100);
telemetry_event_source
};
Expand Down Expand Up @@ -534,25 +551,33 @@ pub fn format_events(events: &[EventData]) -> String {
.join(" ")
}

fn try_broadcast_event(event: &Event, properties: &[EventData]) -> bool {
fn try_broadcast_event(timestamp: DateTime<Utc>, event: &Event, properties: &[EventData]) -> bool {
// we ignore any send error here because they indicate that
// there are no receivers on the other end
let (event, properties) = (event.clone(), properties.to_vec());
global::EVENT_SOURCE
.send(LogEvent::Event((event, properties)))
.send(LoggingEvent::Event(LogEvent {
timestamp,
event,
data: properties,
}))
.is_ok()
}

pub fn try_broadcast_trace(msg: String, level: log::Level) -> bool {
pub fn try_broadcast_trace(timestamp: DateTime<Utc>, msg: String, level: log::Level) -> bool {
// we ignore any send error here because they indicate that
// there are no receivers on the other end

global::EVENT_SOURCE
.send(LogEvent::Trace((level, msg)))
.send(LoggingEvent::Trace(LogTrace {
timestamp,
level,
message: msg,
}))
.is_ok()
}

pub fn subscribe_to_events() -> Receiver<LogEvent> {
pub fn subscribe_to_events() -> Receiver<LoggingEvent> {
global::EVENT_SOURCE.subscribe()
}

Expand Down Expand Up @@ -581,7 +606,7 @@ pub fn track_event(event: &Event, properties: &[EventData]) {
}
client.track(evt);
}
try_broadcast_event(event, properties);
try_broadcast_event(chrono::Utc::now(), event, properties);
}

pub fn to_log_level(level: &appinsights::telemetry::SeverityLevel) -> log::Level {
Expand Down Expand Up @@ -633,7 +658,7 @@ macro_rules! log {
if log_level <= log::max_level() {
let msg = format!("{}", format_args!($($arg)+));
log::log!(log_level, "{}", msg);
onefuzz_telemetry::try_broadcast_trace(msg.to_string(), log_level);
onefuzz_telemetry::try_broadcast_trace(onefuzz_telemetry::Utc::now(), msg.to_string(), log_level);
onefuzz_telemetry::log_message($level, msg.to_string());
}
}};
Expand Down

0 comments on commit 7fde79e

Please sign in to comment.