From 1656c791af35bb0500bb6dd3c60344a0ceb12520 Mon Sep 17 00:00:00 2001 From: Graham King Date: Mon, 22 Jan 2024 12:47:51 -0500 Subject: [PATCH] fix(subscriber): Don't save poll_ops if no-one is receiving them (#501) Do not record poll_ops if there are no current connected clients (watchers). Without this `Aggregator::poll_ops` would grow forever. Follow up to https://github.com/tokio-rs/console/pull/311 and fix for these two: - https://github.com/tokio-rs/console/issues/184 - https://github.com/tokio-rs/console/pull/500 Fixes #184 Co-authored-by: Graham King Co-authored-by: Hayden Stainsby --- console-subscriber/src/aggregator/mod.rs | 32 +++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 88d89ca1f..e3a1eb62a 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,12 +1,3 @@ -use super::{Command, Event, Shared, Watch}; -use crate::{ - stats::{self, Unsent}, - ToProto, WatchRequest, -}; -use console_api as proto; -use proto::resources::resource; -use tokio::sync::{mpsc, Notify}; - use std::{ sync::{ atomic::{AtomicBool, Ordering::*}, @@ -14,8 +5,18 @@ use std::{ }, time::{Duration, Instant}, }; + +use console_api as proto; +use proto::resources::resource; +use tokio::sync::{mpsc, Notify}; use tracing_core::{span::Id, Metadata}; +use super::{Command, Event, Shared, Watch}; +use crate::{ + stats::{self, Unsent}, + ToProto, WatchRequest, +}; + mod id_data; mod shrink; use self::id_data::{IdData, Include}; @@ -269,6 +270,9 @@ impl Aggregator { .drop_closed(&mut self.resource_stats, now, self.retention, has_watchers); self.async_ops .drop_closed(&mut self.async_op_stats, now, self.retention, has_watchers); + if !has_watchers { + self.poll_ops.clear(); + } } /// Add the task subscription to the watchers after sending the first update @@ -305,14 +309,10 @@ impl Aggregator { } fn resource_update(&mut self, include: Include) -> proto::resources::ResourceUpdate { - let new_poll_ops = match include { - Include::All => self.poll_ops.clone(), - Include::UpdatedOnly => std::mem::take(&mut self.poll_ops), - }; proto::resources::ResourceUpdate { new_resources: self.resources.as_proto_list(include, &self.base_time), stats_update: self.resource_stats.as_proto(include, &self.base_time), - new_poll_ops, + new_poll_ops: std::mem::take(&mut self.poll_ops), dropped_events: self.shared.dropped_resources.swap(0, AcqRel) as u64, } } @@ -472,6 +472,10 @@ impl Aggregator { task_id, is_ready, } => { + // CLI doesn't show historical poll ops, so don't save them if no-one is watching + if self.watchers.is_empty() { + return; + } let poll_op = proto::resources::PollOp { metadata: Some(metadata.into()), resource_id: Some(resource_id.into()),