Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriber): count dropped events due to buffer cap #211

Merged
merged 2 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions console-api/proto/async_ops.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ message AsyncOpUpdate {
repeated AsyncOp new_async_ops = 1;
// Any async op stats that have changed since the last update.
map<uint64, Stats> stats_update = 2;
// A count of how many async op events (e.g. polls, creation, etc) were not
// recorded because the application's event buffer was at capacity.
//
// If everything is working normally, this should be 0. If it is greater
// than 0, that may indicate that some data is missing from this update, and
// it may be necessary to increase the number of events buffered by the
// application to ensure that data loss is avoided.
//
// If the application's instrumentation ensures reliable delivery of events,
// this will always be 0.
uint64 dropped_events = 3;
}

// An async operation.
Expand Down
12 changes: 12 additions & 0 deletions console-api/proto/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ message ResourceUpdate {

// A list of all new poll ops that have been invoked on resources since the last update.
repeated PollOp new_poll_ops = 3;

// A count of how many resource events (e.g. polls, creation, etc) were not
// recorded because the application's event buffer was at capacity.
//
// If everything is working normally, this should be 0. If it is greater
// than 0, that may indicate that some data is missing from this update, and
// it may be necessary to increase the number of events buffered by the
// application to ensure that data loss is avoided.
//
// If the application's instrumentation ensures reliable delivery of events,
// this will always be 0.
uint64 dropped_events = 4;
}

// Static data recorded when a new resource is created.
Expand Down
11 changes: 11 additions & 0 deletions console-api/proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ message TaskUpdate {
// *is* included in this map, the corresponding value represents a complete
// snapshot of that task's stats at in the current time window.
map<uint64, Stats> stats_update = 3;
// A count of how many task events (e.g. polls, spawns, etc) were not
// recorded because the application's event buffer was at capacity.
//
// If everything is working normally, this should be 0. If it is greater
// than 0, that may indicate that some data is missing from this update, and
// it may be necessary to increase the number of events buffered by the
// application to ensure that data loss is avoided.
//
// If the application's instrumentation ensures reliable delivery of events,
// this will always be 0.
uint64 dropped_events = 4;
}

// A task details update
Expand Down
34 changes: 20 additions & 14 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{AttributeUpdate, AttributeUpdateOp, Command, Event, UpdateType, WakeOp, Watch};
use super::{
AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, UpdateType, WakeOp, Watch,
};
use crate::{record::Recorder, WatchRequest};
use console_api as proto;
use proto::resources::resource;
Expand Down Expand Up @@ -42,8 +44,9 @@ pub(crate) struct Aggregator {
/// How long to keep task data after a task has completed.
retention: Duration,

/// Triggers a flush when the event buffer is approaching capacity.
flush_capacity: Arc<Flush>,
/// Shared state, including a `Notify` that triggers a flush when the event
/// buffer is approaching capacity.
shared: Arc<Shared>,

/// Currently active RPCs streaming task events.
watchers: ShrinkVec<Watch<proto::instrument::Update>>,
Expand Down Expand Up @@ -99,7 +102,7 @@ pub(crate) struct Aggregator {
temporality: Temporality,
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub(crate) struct Flush {
pub(crate) should_flush: Notify,
triggered: AtomicBool,
Expand Down Expand Up @@ -285,12 +288,10 @@ impl Aggregator {
events: mpsc::Receiver<Event>,
rpcs: mpsc::Receiver<Command>,
builder: &crate::Builder,
shared: Arc<crate::Shared>,
) -> Self {
Self {
flush_capacity: Arc::new(Flush {
should_flush: Notify::new(),
triggered: AtomicBool::new(false),
}),
shared,
rpcs,
publish_interval: builder.publish_interval,
retention: builder.retention,
Expand All @@ -316,10 +317,6 @@ impl Aggregator {
}
}

pub(crate) fn flush(&self) -> &Arc<Flush> {
&self.flush_capacity
}

pub(crate) async fn run(mut self) {
let mut publish = tokio::time::interval(self.publish_interval);
loop {
Expand All @@ -333,7 +330,7 @@ impl Aggregator {
}

// triggered when the event buffer is approaching capacity
_ = self.flush_capacity.should_flush.notified() => {
_ = self.shared.flush.should_flush.notified() => {
tracing::debug!("approaching capacity; draining buffer");
false
}
Expand Down Expand Up @@ -399,7 +396,7 @@ impl Aggregator {
}
self.cleanup_closed();
if drained {
self.flush_capacity.has_flushed();
self.shared.flush.has_flushed();
}
}
}
Expand Down Expand Up @@ -445,6 +442,7 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.task_stats.as_proto(Include::All),
dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
Expand All @@ -454,6 +452,7 @@ impl Aggregator {
.collect(),
stats_update: self.resource_stats.as_proto(Include::All),
new_poll_ops: (*self.all_poll_ops).clone(),
dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
Expand All @@ -462,6 +461,7 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.async_op_stats.as_proto(Include::All),
dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
now: Some(now.into()),
new_metadata: Some(proto::RegisterMetadata {
Expand Down Expand Up @@ -534,6 +534,8 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.task_stats.as_proto(Include::UpdatedOnly),

dropped_events: self.shared.dropped_tasks.swap(0, AcqRel) as u64,
}),
resource_update: Some(proto::resources::ResourceUpdate {
new_resources: self
Expand All @@ -543,6 +545,8 @@ impl Aggregator {
.collect(),
stats_update: self.resource_stats.as_proto(Include::UpdatedOnly),
new_poll_ops,

dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64,
}),
async_op_update: Some(proto::async_ops::AsyncOpUpdate {
new_async_ops: self
Expand All @@ -551,6 +555,8 @@ impl Aggregator {
.map(|(_, value)| value.to_proto())
.collect(),
stats_update: self.async_op_stats.as_proto(Include::UpdatedOnly),

dropped_events: self.shared.dropped_async_ops.swap(0, AcqRel) as u64,
}),
};

Expand Down
Loading