Skip to content

Commit

Permalink
chore(subscriber): trace events drain loop stats (#465)
Browse files Browse the repository at this point in the history
The number of each variant of event encountered while draining the mpsc
channel of events sent from the `ConsoleLayer` to the `Aggregator` is
useful debugging information.

The change adds a `DEBUG` level event which is recorded at the end of
each loop draining the channel. It contains the number of each variant
that has been received.

I've written this same code three separate times looking into different
issues, so I decided it was probably worth the small overhead of
aggregating the counts, even when the tracing event isn't output
anywhere.

Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hds and hawkw authored Oct 10, 2023
1 parent e1b1e16 commit 0bb14fe
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,11 @@ impl Aggregator {
// to be woken when the flush interval has elapsed, or when the
// channel is almost full.
let mut drained = false;
let mut counts = EventCounts::new();
while let Some(event) = recv_now_or_never(&mut self.events) {
match event {
Some(event) => {
counts.update(&event);
self.update_state(event);
drained = true;
}
Expand All @@ -234,6 +236,15 @@ impl Aggregator {
}
};
}
tracing::debug!(
async_resource_ops = counts.async_resource_op,
metadatas = counts.metadata,
poll_ops = counts.poll_op,
resources = counts.resource,
spawns = counts.spawn,
total = counts.total(),
"event channel drain loop",
);

// flush data to clients, if there are any currently subscribed
// watchers and we should send a new update.
Expand Down Expand Up @@ -509,6 +520,43 @@ fn recv_now_or_never<T>(receiver: &mut mpsc::Receiver<T>) -> Option<Option<T>> {
}
}

/// Count of events received in each aggregator drain cycle.
struct EventCounts {
async_resource_op: usize,
metadata: usize,
poll_op: usize,
resource: usize,
spawn: usize,
}

impl EventCounts {
fn new() -> Self {
Self {
async_resource_op: 0,
metadata: 0,
poll_op: 0,
resource: 0,
spawn: 0,
}
}

/// Count the event based on its variant.
fn update(&mut self, event: &Event) {
match event {
Event::AsyncResourceOp { .. } => self.async_resource_op += 1,
Event::Metadata(_) => self.metadata += 1,
Event::PollOp { .. } => self.poll_op += 1,
Event::Resource { .. } => self.resource += 1,
Event::Spawn { .. } => self.spawn += 1,
}
}

/// Total number of events recorded.
fn total(&self) -> usize {
self.async_resource_op + self.metadata + self.poll_op + self.resource + self.spawn
}
}

// ==== impl Flush ===

impl Flush {
Expand Down

0 comments on commit 0bb14fe

Please sign in to comment.