Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Drop the global event sender on close #2125

Merged
merged 7 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/agent/onefuzz-task/src/local/tui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl TerminalUi {
ui_event_tx: Sender<TerminalEvent>,
mut cancellation_rx: broadcast::Receiver<()>,
) -> Result<()> {
let mut rx = onefuzz_telemetry::subscribe_to_events();
let mut rx = onefuzz_telemetry::subscribe_to_events()?;

while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) {
match rx.try_recv() {
Expand Down
4 changes: 2 additions & 2 deletions src/agent/onefuzz-task/src/managed/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {
let common = config.common().clone();
let machine_id = get_machine_id().await?;
let task_logger = if let Some(logs) = common.logs.clone() {
let rx = onefuzz_telemetry::subscribe_to_events();
let rx = onefuzz_telemetry::subscribe_to_events()?;

let logger = task_logger::TaskLogger::new(common.job_id, common.task_id, machine_id);

Expand Down Expand Up @@ -58,7 +58,7 @@ pub async fn run(args: &clap::ArgMatches<'_>) -> Result<()> {

// wait for the task logger to finish
if let Some(task_logger) = task_logger {
let _ = task_logger.flush_and_stop(Duration::from_secs(5)).await;
let _ = task_logger.flush_and_stop(Duration::from_secs(60)).await;
}

result
Expand Down
89 changes: 33 additions & 56 deletions src/agent/onefuzz-task/src/tasks/task_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use async_trait::async_trait;
use azure_core::HttpError;
use azure_storage::core::prelude::*;
use azure_storage_blobs::prelude::*;
use onefuzz_telemetry::LoggingEvent;
use onefuzz_telemetry::{LogTrace, LoggingEvent};
use reqwest::{StatusCode, Url};
use std::{path::PathBuf, sync::Arc, time::Duration};
use uuid::Uuid;

use tokio::{sync::broadcast::Receiver, time::Instant};
use tokio::sync::broadcast::{error::TryRecvError, Receiver};

const LOGS_BUFFER_SIZE: usize = 100;
const MAX_LOG_SIZE: u64 = 100000000; // 100 MB
Expand Down Expand Up @@ -124,7 +124,6 @@ impl LogWriter<BlobLogWriter> for BlobLogWriter {
let data_stream = logs
.iter()
.flat_map(|log_event| match log_event {
// LoggingEvent::Flush => format!("End of log").into_bytes(),
LoggingEvent::Event(log_event) => format!(
"[{}] {}: {}\n",
log_event.timestamp,
Expand Down Expand Up @@ -211,12 +210,12 @@ enum LoopState {
InitLog {
start: usize,
count: usize,
flush: bool,
done: bool,
},
Send {
start: usize,
count: usize,
flush: bool,
done: bool,
},
Done,
}
Expand Down Expand Up @@ -261,25 +260,17 @@ impl TaskLogger {
Ok(storage_account_client.as_container_client(container))
}

async fn event_loop<T: Send + Sized>(
self,
context: LoopContext<T>,
flush_and_close: Option<(Instant, Duration)>,
) -> Result<LoopContext<T>> {
async fn event_loop<T: Send + Sized>(self, context: LoopContext<T>) -> Result<LoopContext<T>> {
match context.state {
LoopState::Send {
start,
count,
flush,
} => {
LoopState::Send { start, count, done } => {
match context
.log_writer
.write_logs(&context.pending_logs[start..start + count])
.await?
{
WriteLogResponse::Success => {
if start + count >= context.pending_logs.len() {
if flush {
if done {
Result::<_, anyhow::Error>::Ok(LoopContext {
pending_logs: vec![],
state: LoopState::Done,
Expand All @@ -299,7 +290,7 @@ impl TaskLogger {
state: LoopState::Send {
start: new_start,
count: new_count,
flush,
done,
},
..context
})
Expand All @@ -308,11 +299,7 @@ impl TaskLogger {

WriteLogResponse::MaxSizeReached => {
Result::<_, anyhow::Error>::Ok(LoopContext {
state: LoopState::InitLog {
start,
count,
flush,
},
state: LoopState::InitLog { start, count, done },
..context
})
}
Expand All @@ -322,34 +309,26 @@ impl TaskLogger {
state: LoopState::Send {
start,
count: count / 2,
flush,
done,
},
..context
})
}
}
}
LoopState::InitLog {
start,
count,
flush,
} => {
LoopState::InitLog { start, count, done } => {
let new_writer = context.log_writer.get_next_writer().await?;
Result::<_, anyhow::Error>::Ok(LoopContext {
log_writer: new_writer,
state: LoopState::Send {
start,
count,
flush,
},
state: LoopState::Send { start, count, done },
..context
})
}
LoopState::Receive => {
let mut event = context.event;
let mut data = Vec::with_capacity(self.log_buffer_size);
let start_time = tokio::time::Instant::now();
let mut flush = false;
let mut done = false;

loop {
if data.len() >= self.log_buffer_size {
Expand All @@ -358,18 +337,30 @@ impl TaskLogger {

let now = tokio::time::Instant::now();

flush = flush_and_close
.map(|(instant, duration)| now - instant > duration)
.unwrap_or_default();

if now - start_time > self.logging_interval {
break;
}
match event.try_recv() {
Ok(v) => {
data.push(v);
}
Err(_) => {
Err(TryRecvError::Closed) => {
done = true;
break;
}
Err(TryRecvError::Lagged(skipped_messages_count)) => {
let skipped_message_trace = LogTrace {
timestamp: chrono::Utc::now(),
level: log::Level::Info,
message: format!(
"onefuzz task logger: Skipped {} traces/events",
skipped_messages_count
),
};

data.push(LoggingEvent::Trace(skipped_message_trace));
}
Err(TryRecvError::Empty) => {
tokio::time::sleep(self.polling_interval).await;
}
}
Expand All @@ -380,7 +371,7 @@ impl TaskLogger {
state: LoopState::Send {
start: 0,
count: data.len(),
flush,
done,
},
pending_logs: data,
event,
Expand Down Expand Up @@ -411,11 +402,7 @@ impl TaskLogger {
event: Receiver<LoggingEvent>,
log_writer: Box<dyn LogWriter<T>>,
) -> Result<SpawnedLogger> {
let (flush_and_close_sender, mut flush_and_close_receiver) =
tokio::sync::oneshot::channel::<Duration>();

let this = *self;

let logger_handle = tokio::spawn(async move {
let initial_state = LoopContext {
log_writer,
Expand All @@ -427,12 +414,7 @@ impl TaskLogger {
let mut context = initial_state;

loop {
let flush_and_close = flush_and_close_receiver
.try_recv()
.ok()
.map(|d| (tokio::time::Instant::now(), d));

context = match this.event_loop(context, flush_and_close).await {
context = match this.event_loop(context).await {
Ok(LoopContext {
log_writer: _,
pending_logs: _,
Expand All @@ -449,21 +431,16 @@ impl TaskLogger {
Ok(())
});

Ok(SpawnedLogger {
logger_handle,
flush_and_close_sender,
})
Ok(SpawnedLogger { logger_handle })
}
}

pub struct SpawnedLogger {
logger_handle: tokio::task::JoinHandle<Result<()>>,
flush_and_close_sender: tokio::sync::oneshot::Sender<Duration>,
}

impl SpawnedLogger {
pub async fn flush_and_stop(self, timeout: Duration) -> Result<()> {
let _ = self.flush_and_close_sender.send(timeout);
let _ = tokio::time::timeout(timeout, self.logger_handle).await;
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/agent/onefuzz-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ intel_instructions = ["iced-x86"]
# not the feature has not been released yet. This is the pinned to the most
# recent git hash as of 2021-06-30. Once released, this should be reverted to
# use released versions
anyhow = "1.0"
appinsights = { git = "https://github.com/dmolokanov/appinsights-rs", rev = "0af6ec83bad1c050160f5258ab08e9834596ce20", features=["rustls"], default-features = false }
log = "0.4"
uuid = { version = "0.8", features = ["serde", "v4"] }
Expand Down
57 changes: 37 additions & 20 deletions src/agent/onefuzz-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use z3_sys::ErrorCode as Z3ErrorCode;

pub use chrono::Utc;

use anyhow::{bail, Result};
pub use appinsights::telemetry::SeverityLevel::{Critical, Error, Information, Verbose, Warning};
use tokio::sync::broadcast::{self, Receiver};
#[macro_use]
Expand Down Expand Up @@ -378,9 +379,9 @@ mod global {
};

lazy_static! {
pub static ref EVENT_SOURCE: Sender<LoggingEvent> = {
pub static ref EVENT_SOURCE: RwLock<Option<Sender<LoggingEvent>>> = {
let (telemetry_event_source, _) = broadcast::channel::<_>(100);
telemetry_event_source
RwLock::new(Some(telemetry_event_source))
};
}

Expand Down Expand Up @@ -487,6 +488,9 @@ pub fn try_flush_and_close() {
client.flush_channel();
client.close_channel();
}

// dropping the broadcast sender to make sure all pending events are sent
let _global_event_source = global::EVENT_SOURCE.write().unwrap().take();
}

pub fn client(client_type: ClientType) -> Option<RwLockReadGuard<'static, TelemetryClient>> {
Expand Down Expand Up @@ -554,31 +558,44 @@ pub fn format_events(events: &[EventData]) -> String {
fn try_broadcast_event(timestamp: DateTime<Utc>, event: &Event, properties: &[EventData]) -> bool {
// we ignore any send error here because they indicate that
// there are no receivers on the other end
let (event, properties) = (event.clone(), properties.to_vec());
global::EVENT_SOURCE
.send(LoggingEvent::Event(LogEvent {
timestamp,
event,
data: properties,
}))
.is_ok()

if let Some(ev) = global::EVENT_SOURCE.read().ok().and_then(|f| f.clone()) {
let (event, properties) = (event.clone(), properties.to_vec());

return ev
.send(LoggingEvent::Event(LogEvent {
timestamp,
event,
data: properties,
}))
.is_ok();
}

false
}

pub fn try_broadcast_trace(timestamp: DateTime<Utc>, msg: String, level: log::Level) -> bool {
// we ignore any send error here because they indicate that
// there are no receivers on the other end

global::EVENT_SOURCE
.send(LoggingEvent::Trace(LogTrace {
timestamp,
level,
message: msg,
}))
.is_ok()
if let Some(ev) = global::EVENT_SOURCE.read().ok().and_then(|f| f.clone()) {
return ev
.send(LoggingEvent::Trace(LogTrace {
timestamp,
level,
message: msg,
}))
.is_ok();
}
false
}

pub fn subscribe_to_events() -> Receiver<LoggingEvent> {
global::EVENT_SOURCE.subscribe()
pub fn subscribe_to_events() -> Result<Receiver<LoggingEvent>> {
let global_event_source = global::EVENT_SOURCE.read().unwrap();
if let Some(evs) = global_event_source.clone() {
Ok(evs.subscribe())
} else {
bail!("Event source not initialized");
}
}

pub fn track_event(event: &Event, properties: &[EventData]) {
Expand Down