Skip to content

Commit

Permalink
feat(subscriber): count dropped events due to buffer cap
Browse files Browse the repository at this point in the history
See also #209

Currently, we don't really have any way of surfacing potential data loss
due to the event buffer capacity limit --- so, if events are dropped,
the user may not be *aware* that they're seeing an incomplete picture of
their application. It would be better if we had a way to surface this in
the UI.

This branch adds support for counting the number of dropped events in
the `ConsoleLayer`. This data is now included in the `ResourceUpdate`,
`TaskUpdate`, and `AsyncOpUpdate` messages, respectively. We track a
separate counter for dropped events of each type, to make it easier to
determine what data may be missing.

The console UI doesn't currently *display* these counts; that can be
added in a separate PR. We may want to use the warnings interface for
displaying this information?
  • Loading branch information
hawkw committed Dec 16, 2021
1 parent 7d16ead commit c19a402
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 96 deletions.
11 changes: 11 additions & 0 deletions console-api/proto/async_ops.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ message AsyncOpUpdate {
repeated AsyncOp new_async_ops = 1;
// Any async op stats that have changed since the last update.
map<uint64, Stats> stats_update = 2;
// A count of how many async op events (e.g. polls, creation, etc) were not
// recorded because the application's event buffer was at capacity.
//
// If everything is working normally, this should be 0. If it is greater
// than 0, that may indicate that some data is missing from this update, and
// it may be necessary to increase the number of events buffered by the
// application to ensure that data loss is avoided.
//
// If the application's instrumentation ensures reliable delivery of events,
// this will always be 0.
uint64 dropped_events = 3;
}

// An async operation.
Expand Down
12 changes: 12 additions & 0 deletions console-api/proto/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ message ResourceUpdate {

// A list of all new poll ops that have been invoked on resources since the last update.
repeated PollOp new_poll_ops = 3;

// A count of how many resource events (e.g. polls, creation, etc) were not
// recorded because the application's event buffer was at capacity.
//
// If everything is working normally, this should be 0. If it is greater
// than 0, that may indicate that some data is missing from this update, and
// it may be necessary to increase the number of events buffered by the
// application to ensure that data loss is avoided.
//
// If the application's instrumentation ensures reliable delivery of events,
// this will always be 0.
uint64 dropped_events = 4;
}

// Static data recorded when a new resource is created.
Expand Down
11 changes: 11 additions & 0 deletions console-api/proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ message TaskUpdate {
// *is* included in this map, the corresponding value represents a complete
// snapshot of that task's stats at in the current time window.
map<uint64, Stats> stats_update = 3;
// A count of how many task events (e.g. polls, spawns, etc) were not
// recorded because the application's event buffer was at capacity.
//
// If everything is working normally, this should be 0. If it is greater
// than 0, that may indicate that some data is missing from this update, and
// it may be necessary to increase the number of events buffered by the
// application to ensure that data loss is avoided.
//
// If the application's instrumentation ensures reliable delivery of events,
// this will always be 0.
uint64 dropped_events = 4;
}

// A task details update
Expand Down
34 changes: 20 additions & 14 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{AttributeUpdate, AttributeUpdateOp, Command, Event, UpdateType, WakeOp, Watch};
use super::{
AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, UpdateType, WakeOp, Watch,
};
use crate::{record::Recorder, WatchRequest};
use console_api as proto;
use proto::resources::resource;
Expand Down Expand Up @@ -42,8 +44,9 @@ pub(crate) struct Aggregator {
/// How long to keep task data after a task has completed.
retention: Duration,

/// Triggers a flush when the event buffer is approaching capacity.
flush_capacity: Arc<Flush>,
/// Shared state, including a `Notify` that triggers a flush when the event
/// buffer is approaching capacity.
shared: Arc<Shared>,

/// Currently active RPCs streaming task events.
watchers: ShrinkVec<Watch<proto::instrument::Update>>,
Expand Down Expand Up @@ -99,7 +102,7 @@ pub(crate) struct Aggregator {
temporality: Temporality,
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub(crate) struct Flush {
pub(crate) should_flush: Notify,
triggered: AtomicBool,
Expand Down Expand Up @@ -285,12 +288,10 @@ impl Aggregator {
events: mpsc::Receiver<Event>,
rpcs: mpsc::Receiver<Command>,
builder: &crate::Builder,
shared: Arc<crate::Shared>,
) -> Self {
Self {
flush_capacity: Arc::new(Flush {
should_flush: Notify::new(),
triggered: AtomicBool::new(false),
}),
shared,
rpcs,
publish_interval: builder.publish_interval,
retention: builder.retention,
Expand All @@ -316,10 +317,6 @@ impl Aggregator {
}
}

pub(crate) fn flush(&self) -> &Arc<Flush> {
&self.flush_capacity
}

pub(crate) async fn run(mut self) {
let mut publish = tokio::time::interval(self.publish_interval);
loop {
Expand All @@ -333,7 +330,7 @@ impl Aggregator {
}

// triggered when the event buffer is approaching capacity
_ = self.flush_capacity.should_flush.notified() => {
_ = self.shared.flush.should_flush.notified() => {
tracing::debug!("approaching capacity; draining buffer");
false
}
Expand Down Expand Up @@ -399,7 +396,7 @@ impl Aggregator {
}
self.cleanup_closed();
if drained {
self.flush_capacity.has_flushed();
self.shared.flush.has_flushed();
}
}
}
Expand Down Expand Up @@ -445,6 +442,7 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.task_stats.as_proto(Include::All),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
Expand All @@ -454,6 +452,7 @@ impl Aggregator {
.collect(),
stats_update: self.resource_stats.as_proto(Include::All),
new_poll_ops: (*self.all_poll_ops).clone(),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
Expand All @@ -462,6 +461,7 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.async_op_stats.as_proto(Include::All),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
now: Some(now.into()),
new_metadata: Some(proto::RegisterMetadata {
Expand Down Expand Up @@ -534,6 +534,8 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.task_stats.as_proto(Include::UpdatedOnly),

dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
Expand All @@ -543,6 +545,8 @@ impl Aggregator {
.collect(),
stats_update: self.resource_stats.as_proto(Include::UpdatedOnly),
new_poll_ops,

dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
Expand All @@ -551,6 +555,8 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly),

dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
};

Expand Down
Loading

0 comments on commit c19a402

Please sign in to comment.