From c19a4020e32440fda8c92d818894d58baa97d3f0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 16 Dec 2021 12:33:40 -0800 Subject: [PATCH] feat(subscriber): count dropped events due to buffer cap See also #209 Currently, we don't really have any way of surfacing potential data loss due to the event buffer capacity limit --- so, if events are dropped, the user may not be *aware* that they're seeing an incomplete picture of their application. It would be better if we had a way to surface this in the UI. This branch adds support for counting the number of dropped events in the `ConsoleLayer`. This data is now included in the `ResourceUpdate`, `TaskUpdate`, and `AsyncOpUpdate` messages, respectively. We track a separate counter for dropped events of each type, to make it easier to determine what data may be missing. The console UI doesn't currently *display* these counts; that can be added in a separate PR. We may want to use the warnings interface for displaying this information? --- console-api/proto/async_ops.proto | 11 ++ console-api/proto/resources.proto | 12 ++ console-api/proto/tasks.proto | 11 ++ console-subscriber/src/aggregator/mod.rs | 34 ++-- console-subscriber/src/lib.rs | 233 +++++++++++++++-------- 5 files changed, 205 insertions(+), 96 deletions(-) diff --git a/console-api/proto/async_ops.proto b/console-api/proto/async_ops.proto index 254f09223..bcb1e03ea 100644 --- a/console-api/proto/async_ops.proto +++ b/console-api/proto/async_ops.proto @@ -19,6 +19,17 @@ message AsyncOpUpdate { repeated AsyncOp new_async_ops = 1; // Any async op stats that have changed since the last update. map stats_update = 2; + // A count of how many async op events (e.g. polls, creation, etc) were not + // recorded because the application's event buffer was at capacity. + // + // If everything is working normally, this should be 0. If it is greater + // than 0, that may indicate that some data is missing from this update, and + // it may be necessary to increase the number of events buffered by the + // application to ensure that data loss is avoided. + // + // If the application's instrumentation ensures reliable delivery of events, + // this will always be 0. + uint64 dropped_events = 3; } // An async operation. diff --git a/console-api/proto/resources.proto b/console-api/proto/resources.proto index 5f851d557..c477a1a35 100644 --- a/console-api/proto/resources.proto +++ b/console-api/proto/resources.proto @@ -23,6 +23,18 @@ message ResourceUpdate { // A list of all new poll ops that have been invoked on resources since the last update. repeated PollOp new_poll_ops = 3; + + // A count of how many resource events (e.g. polls, creation, etc) were not + // recorded because the application's event buffer was at capacity. + // + // If everything is working normally, this should be 0. If it is greater + // than 0, that may indicate that some data is missing from this update, and + // it may be necessary to increase the number of events buffered by the + // application to ensure that data loss is avoided. + // + // If the application's instrumentation ensures reliable delivery of events, + // this will always be 0. + uint64 dropped_events = 4; } // Static data recorded when a new resource is created. diff --git a/console-api/proto/tasks.proto b/console-api/proto/tasks.proto index 9e07acbee..b537f7914 100644 --- a/console-api/proto/tasks.proto +++ b/console-api/proto/tasks.proto @@ -26,6 +26,17 @@ message TaskUpdate { // *is* included in this map, the corresponding value represents a complete // snapshot of that task's stats at in the current time window. map stats_update = 3; + // A count of how many task events (e.g. polls, spawns, etc) were not + // recorded because the application's event buffer was at capacity. + // + // If everything is working normally, this should be 0. If it is greater + // than 0, that may indicate that some data is missing from this update, and + // it may be necessary to increase the number of events buffered by the + // application to ensure that data loss is avoided. + // + // If the application's instrumentation ensures reliable delivery of events, + // this will always be 0. + uint64 dropped_events = 4; } // A task details update diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index e9876f284..ecf3b630d 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,4 +1,6 @@ -use super::{AttributeUpdate, AttributeUpdateOp, Command, Event, UpdateType, WakeOp, Watch}; +use super::{ + AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, UpdateType, WakeOp, Watch, +}; use crate::{record::Recorder, WatchRequest}; use console_api as proto; use proto::resources::resource; @@ -42,8 +44,9 @@ pub(crate) struct Aggregator { /// How long to keep task data after a task has completed. retention: Duration, - /// Triggers a flush when the event buffer is approaching capacity. - flush_capacity: Arc, + /// Shared state, including a `Notify` that triggers a flush when the event + /// buffer is approaching capacity. + shared: Arc, /// Currently active RPCs streaming task events. watchers: ShrinkVec>, @@ -99,7 +102,7 @@ pub(crate) struct Aggregator { temporality: Temporality, } -#[derive(Debug)] +#[derive(Debug, Default)] pub(crate) struct Flush { pub(crate) should_flush: Notify, triggered: AtomicBool, @@ -285,12 +288,10 @@ impl Aggregator { events: mpsc::Receiver, rpcs: mpsc::Receiver, builder: &crate::Builder, + shared: Arc, ) -> Self { Self { - flush_capacity: Arc::new(Flush { - should_flush: Notify::new(), - triggered: AtomicBool::new(false), - }), + shared, rpcs, publish_interval: builder.publish_interval, retention: builder.retention, @@ -316,10 +317,6 @@ impl Aggregator { } } - pub(crate) fn flush(&self) -> &Arc { - &self.flush_capacity - } - pub(crate) async fn run(mut self) { let mut publish = tokio::time::interval(self.publish_interval); loop { @@ -333,7 +330,7 @@ impl Aggregator { } // triggered when the event buffer is approaching capacity - _ = self.flush_capacity.should_flush.notified() => { + _ = self.shared.flush.should_flush.notified() => { tracing::debug!("approaching capacity; draining buffer"); false } @@ -399,7 +396,7 @@ impl Aggregator { } self.cleanup_closed(); if drained { - self.flush_capacity.has_flushed(); + self.shared.flush.has_flushed(); } } } @@ -445,6 +442,7 @@ impl Aggregator { .map(|(_, value)| value.to_proto()) .collect(), stats_update: self.task_stats.as_proto(Include::All), + dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64, }), resource_update: Some(proto::resources::ResourceUpdate { new_resources: self @@ -454,6 +452,7 @@ impl Aggregator { .collect(), stats_update: self.resource_stats.as_proto(Include::All), new_poll_ops: (*self.all_poll_ops).clone(), + dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, }), async_op_update: Some(proto::async_ops::AsyncOpUpdate { new_async_ops: self @@ -462,6 +461,7 @@ impl Aggregator { .map(|(_, value)| value.to_proto()) .collect(), stats_update: self.async_op_stats.as_proto(Include::All), + dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64, }), now: Some(now.into()), new_metadata: Some(proto::RegisterMetadata { @@ -534,6 +534,8 @@ impl Aggregator { .map(|(_, value)| value.to_proto()) .collect(), stats_update: self.task_stats.as_proto(Include::UpdatedOnly), + + dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64, }), resource_update: Some(proto::resources::ResourceUpdate { new_resources: self @@ -543,6 +545,8 @@ impl Aggregator { .collect(), stats_update: self.resource_stats.as_proto(Include::UpdatedOnly), new_poll_ops, + + dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, }), async_op_update: Some(proto::async_ops::AsyncOpUpdate { new_async_ops: self @@ -551,6 +555,8 @@ impl Aggregator { .map(|(_, value)| value.to_proto()) .collect(), stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly), + + dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64, }), }; diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 85dd31666..807eeb3a1 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -6,7 +6,10 @@ use std::{ cell::RefCell, fmt, net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, SystemTime}, }; use thread_local::ThreadLocal; @@ -51,7 +54,7 @@ use crate::visitors::{PollOpVisitor, StateUpdateVisitor}; pub struct ConsoleLayer { current_spans: ThreadLocal>, tx: mpsc::Sender, - flush: Arc, + shared: Arc, /// When the channel capacity goes under this number, a flush in the aggregator /// will be triggered. flush_under_capacity: usize, @@ -122,6 +125,26 @@ pub struct Server { client_buffer: usize, } +/// State shared between the `ConsoleLayer` and the `Aggregator` task. +#[derive(Debug, Default)] +struct Shared { + /// Used to notify the aggregator task when the event buffer should be + /// flushed. + flush: aggregator::Flush, + + /// A counter of how many task events were dropped because the event buffer + /// was at capacity. + dropped_tasks: AtomicUsize, + + /// A counter of how many async op events were dropped because the event buffer + /// was at capacity. + dropped_async_ops: AtomicUsize, + + /// A counter of how many resource events were dropped because the event buffer + /// was at capacity. + dropped_resources: AtomicUsize, +} + struct Watch(mpsc::Sender>); enum Command { @@ -269,10 +292,8 @@ impl ConsoleLayer { let (tx, events) = mpsc::channel(config.event_buffer_capacity); let (subscribe, rpcs) = mpsc::channel(256); - - let aggregator = Aggregator::new(events, rpcs, &config); - let flush = aggregator.flush().clone(); - + let shared = Arc::new(Shared::default()); + let aggregator = Aggregator::new(events, rpcs, &config, shared.clone()); // Conservatively, start to trigger a flush when half the channel is full. // This tries to reduce the chance of losing events to a full channel. let flush_under_capacity = config.event_buffer_capacity / 2; @@ -286,7 +307,7 @@ impl ConsoleLayer { let layer = Self { current_spans: ThreadLocal::new(), tx, - flush, + shared, flush_under_capacity, spawn_callsites: Callsites::default(), waker_callsites: Callsites::default(), @@ -419,7 +440,7 @@ impl ConsoleLayer { .cloned() } - fn send(&self, event: Event) { + fn send(&self, dropped: &AtomicUsize, event: Event) { use mpsc::error::TrySendError; match self.tx.try_reserve() { @@ -433,12 +454,13 @@ impl ConsoleLayer { // approaching the high water line...but if the executor wait // time is very high, maybe the aggregator task hasn't been // polled yet. so... eek?! + dropped.fetch_add(1, Ordering::Release); } } let capacity = self.tx.capacity(); if capacity <= self.flush_under_capacity { - self.flush.trigger(); + self.shared.flush.trigger(); } } } @@ -448,23 +470,43 @@ where S: Subscriber + for<'a> LookupSpan<'a>, { fn register_callsite(&self, meta: &'static Metadata<'static>) -> subscriber::Interest { - match (meta.name(), meta.target()) { - ("runtime.spawn", _) | ("task", "tokio::task") => self.spawn_callsites.insert(meta), - (_, "runtime::waker") | (_, "tokio::task::waker") => self.waker_callsites.insert(meta), - (ResourceVisitor::RES_SPAN_NAME, _) => self.resource_callsites.insert(meta), - (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => self.async_op_callsites.insert(meta), - ("runtime.resource.async_op.poll", _) => self.async_op_poll_callsites.insert(meta), - (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => self.poll_op_callsites.insert(meta), + let dropped = match (meta.name(), meta.target()) { + ("runtime.spawn", _) | ("task", "tokio::task") => { + self.spawn_callsites.insert(meta); + &self.shared.dropped_tasks + } + (_, "runtime::waker") | (_, "tokio::task::waker") => { + self.waker_callsites.insert(meta); + &self.shared.dropped_tasks + } + (ResourceVisitor::RES_SPAN_NAME, _) => { + self.resource_callsites.insert(meta); + &self.shared.dropped_resources + } + (AsyncOpVisitor::ASYNC_OP_SPAN_NAME, _) => { + self.async_op_callsites.insert(meta); + &self.shared.dropped_async_ops + } + ("runtime.resource.async_op.poll", _) => { + self.async_op_poll_callsites.insert(meta); + &self.shared.dropped_async_ops + } + (_, PollOpVisitor::POLL_OP_EVENT_TARGET) => { + self.poll_op_callsites.insert(meta); + &self.shared.dropped_async_ops + } (_, StateUpdateVisitor::RE_STATE_UPDATE_EVENT_TARGET) => { - self.resource_state_update_callsites.insert(meta) + self.resource_state_update_callsites.insert(meta); + &self.shared.dropped_resources } (_, StateUpdateVisitor::AO_STATE_UPDATE_EVENT_TARGET) => { - self.async_op_state_update_callsites.insert(meta) + self.async_op_state_update_callsites.insert(meta); + &self.shared.dropped_async_ops } - (_, _) => {} - } + (_, _) => &self.shared.dropped_tasks, + }; - self.send(Event::Metadata(meta)); + self.send(dropped, Event::Metadata(meta)); subscriber::Interest::always() } @@ -475,13 +517,16 @@ where let mut task_visitor = TaskVisitor::new(metadata.into()); attrs.record(&mut task_visitor); let (fields, location) = task_visitor.result(); - self.send(Event::Spawn { - id: id.clone(), - at, - metadata, - fields, - location, - }); + self.send( + &self.shared.dropped_tasks, + Event::Spawn { + id: id.clone(), + at, + metadata, + fields, + location, + }, + ); return; } @@ -500,17 +545,20 @@ where let parent_id = self.current_spans.get().and_then(|stack| { self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx)) }); - self.send(Event::Resource { - id: id.clone(), - parent_id, - metadata, - at, - concrete_type, - kind, - location, - is_internal, - inherit_child_attrs, - }); + self.send( + &self.shared.dropped_resources, + Event::Resource { + id: id.clone(), + parent_id, + metadata, + at, + concrete_type, + kind, + location, + is_internal, + inherit_child_attrs, + }, + ); } // else unknown resource span format return; } @@ -529,15 +577,18 @@ where }); if let Some(resource_id) = resource_id { - self.send(Event::AsyncResourceOp { - id: id.clone(), - parent_id, - resource_id, - at, - metadata, - source, - inherit_child_attrs, - }); + self.send( + &self.shared.dropped_async_ops, + Event::AsyncResourceOp { + id: id.clone(), + parent_id, + resource_id, + at, + metadata, + source, + inherit_child_attrs, + }, + ); } } // else async op span needs to have a source field @@ -561,7 +612,7 @@ where .unwrap_or(false); op = op.self_wake(self_wake); } - self.send(Event::Waker { id, op, at }); + self.send(&self.shared.dropped_tasks, Event::Waker { id, op, at }); } // else unknown waker event... what to do? can't trace it from here... return; @@ -587,14 +638,17 @@ where // poll op event should be emitted in the context of an async op and task spans if let Some((task_id, async_op_id)) = task_and_async_op_ids { - self.send(Event::PollOp { - metadata, - op_name, - resource_id, - async_op_id, - task_id, - is_ready, - }); + self.send( + &self.shared.dropped_async_ops, + Event::PollOp { + metadata, + op_name, + resource_id, + async_op_id, + task_id, + is_ready, + }, + ); } } } @@ -612,11 +666,14 @@ where let mut state_update_visitor = StateUpdateVisitor::new(meta_id); event.record(&mut state_update_visitor); if let Some(update) = state_update_visitor.result() { - self.send(Event::StateUpdate { - update_id: resource_id, - update_type: UpdateType::Resource, - update, - }) + self.send( + &self.shared.dropped_resources, + Event::StateUpdate { + update_id: resource_id, + update_type: UpdateType::Resource, + update, + }, + ) } } return; @@ -631,11 +688,14 @@ where let mut state_update_visitor = StateUpdateVisitor::new(meta_id); event.record(&mut state_update_visitor); if let Some(update) = state_update_visitor.result() { - self.send(Event::StateUpdate { - update_id: async_op_id, - update_type: UpdateType::AsyncOp, - update, - }); + self.send( + &self.shared.dropped_async_ops, + Event::StateUpdate { + update_id: async_op_id, + update_type: UpdateType::AsyncOp, + update, + }, + ); } } } @@ -652,11 +712,14 @@ where .push(id.clone()); let parent_id = cx.span(id).and_then(|s| s.parent().map(|p| p.id())); - self.send(Event::Enter { - at: SystemTime::now(), - id: id.clone(), - parent_id, - }); + self.send( + &self.shared.dropped_tasks, + Event::Enter { + at: SystemTime::now(), + id: id.clone(), + parent_id, + }, + ); } fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) { @@ -671,11 +734,14 @@ where let parent_id = cx.span(id).and_then(|s| s.parent().map(|p| p.id())); - self.send(Event::Exit { - id: id.clone(), - parent_id, - at: SystemTime::now(), - }); + self.send( + &self.shared.dropped_tasks, + Event::Exit { + id: id.clone(), + parent_id, + at: SystemTime::now(), + }, + ); } fn on_close(&self, id: span::Id, cx: Context<'_, S>) { @@ -684,10 +750,13 @@ where } let _default = dispatcher::set_default(&self.no_dispatch); - self.send(Event::Close { - at: SystemTime::now(), - id, - }); + self.send( + &self.shared.dropped_tasks, + Event::Close { + at: SystemTime::now(), + id, + }, + ); } } @@ -697,7 +766,7 @@ impl fmt::Debug for ConsoleLayer { // mpsc::Sender debug impl is not very useful .field("tx", &format_args!("<...>")) .field("tx.capacity", &self.tx.capacity()) - .field("flush", &self.flush) + .field("shared", &self.shared) .field("spawn_callsites", &self.spawn_callsites) .field("waker_callsites", &self.waker_callsites) .finish()