Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscriber): unset subscriber in aggregator #169

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ members = [
"console-subscriber",
"console-api"
]
resolver = "2"
resolver = "2"

[patch.crates-io]
tokio = { git = "https://github.com/zaharidichev/tokio/", branch = "zd/semaphore-instrument" }
5 changes: 5 additions & 0 deletions console-api/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ thread_local = "1.1.3"
console-api = { path = "../console-api", features = ["transport"] }
tonic = { version = "0.5", features = ["transport"] }
tracing-core = "0.1.18"
tracing = "0.1.26"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["fmt", "registry"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "registry"] }
futures = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
serde = { version = "1", features = ["derive"] }
Expand Down
23 changes: 3 additions & 20 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,14 @@ impl<T> IdData<T> {
has_watchers: bool,
ids: &mut Ids,
) {
let _span = tracing::debug_span!(
"drop_closed",
entity = %std::any::type_name::<T>(),
stats = %std::any::type_name::<R>(),
)
.entered();

// drop closed entities
tracing::trace!(?retention, has_watchers, "dropping closed");

let mut dropped_ids = HashSet::new();
stats.data.retain_and_shrink(|id, (stats, dirty)| {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
|| dropped_for > retention;
tracing::trace!(
stats.id = ?id,
stats.dropped_at = ?dropped_at,
stats.dropped_for = ?dropped_for,
stats.dirty = *dirty,
should_drop,
);
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
|| dropped_for > retention;

if should_drop {
dropped_ids.insert(*id);
Expand Down
31 changes: 13 additions & 18 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ impl Aggregator {
// triggered when the event buffer is approaching capacity
_ = self.flush_capacity.should_flush.notified() => {
self.flush_capacity.triggered.store(false, Release);
tracing::debug!("approaching capacity; draining buffer");
false
}

Expand All @@ -360,10 +359,7 @@ impl Aggregator {
Some(Command::Resume) => {
self.temporality = Temporality::Live;
}
None => {
tracing::debug!("rpc channel closed, terminating");
return;
}
None => return,
};

false
Expand All @@ -390,10 +386,7 @@ impl Aggregator {
}
// The channel closed, no more events will be emitted...time
// to stop aggregating.
None => {
tracing::debug!("event channel closed; terminating");
return;
}
None => return,
};
}

Expand Down Expand Up @@ -775,7 +768,10 @@ impl Aggregator {
let field_name = match update.field.name.clone() {
Some(name) => name,
None => {
tracing::warn!(?update.field, "field missing name, skipping...");
eprintln!(
"warning: field {:?} missing name, skipping...",
update.field
);
return;
}
};
Expand Down Expand Up @@ -1008,8 +1004,8 @@ fn update_attribute(attribute: &mut Attribute, update: AttributeUpdate) {

Some(AttributeUpdateOp::Override) => *v = upd,

None => tracing::warn!(
"numeric attribute update {:?} needs to have an op field",
None => eprintln!(
"warning: numeric attribute update {:?} needs to have an op field",
update_name
),
},
Expand All @@ -1021,17 +1017,16 @@ fn update_attribute(attribute: &mut Attribute, update: AttributeUpdate) {

Some(AttributeUpdateOp::Override) => *v = upd,

None => tracing::warn!(
"numeric attribute update {:?} needs to have an op field",
None => eprintln!(
"warning: numeric attribute update {:?} needs to have an op field",
update_name
),
},

(val, update) => {
tracing::warn!(
"attribute {:?} cannot be updated by update {:?}",
val,
update
eprintln!(
"warning: attribute {:?} cannot be updated by update {:?}",
val, update
);
}
}
Expand Down
41 changes: 0 additions & 41 deletions console-subscriber/src/aggregator/shrink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ where
self.retain(f);

if self.len() < len0 {
tracing::debug!(
len = self.len(),
dropped = len0.saturating_sub(self.len()),
data.key = %type_name::<K>(),
data.val = %type_name::<V>(),
"dropped unused entries"
);
self.try_shrink();
}
}
Expand Down Expand Up @@ -108,12 +101,6 @@ impl<T> ShrinkVec<T> {
self.retain(f);

if self.len() < len0 {
tracing::debug!(
len = self.len(),
dropped = len0.saturating_sub(self.len()),
data = %type_name::<T>(),
"dropped unused data"
);
self.try_shrink();
}
}
Expand Down Expand Up @@ -171,14 +158,6 @@ impl Shrink {
// Has the required interval elapsed since the last shrink?
self.since_shrink = self.since_shrink.saturating_add(1);
if self.since_shrink < self.shrink_every {
tracing::trace!(
self.since_shrink,
self.shrink_every,
capacity_bytes = capacity * mem::size_of::<T>(),
used_bytes = len * mem::size_of::<T>(),
data = %type_name::<T>(),
"should_shrink: shrink interval has not elapsed"
);
return false;
}

Expand All @@ -187,31 +166,11 @@ impl Shrink {
let used_bytes = len * mem::size_of::<T>();
let diff = capacity_bytes.saturating_sub(used_bytes);
if diff < self.min_bytes {
tracing::trace!(
self.since_shrink,
self.shrink_every,
self.min_bytes,
freed_bytes = diff,
capacity_bytes,
used_bytes,
data = %type_name::<T>(),
"should_shrink: would not free sufficient bytes"
);
return false;
}

// Reset the clock! time to shrink!
self.since_shrink = 0;
tracing::debug!(
self.since_shrink,
self.shrink_every,
self.min_bytes,
freed_bytes = diff,
capacity_bytes,
used_bytes,
data = %type_name::<T>(),
"should_shrink: shrinking!"
);
true
}
}
Expand Down
2 changes: 1 addition & 1 deletion console-subscriber/src/callsites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
fmt, ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};
use tracing_core::{callsite, Metadata};
use tracing::{callsite, Metadata};

pub(crate) struct Callsites<const MAX_CALLSITES: usize> {
ptrs: [AtomicPtr<Metadata<'static>>; MAX_CALLSITES],
Expand Down
9 changes: 6 additions & 3 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use std::{
};
use thread_local::ThreadLocal;
use tokio::sync::{mpsc, oneshot};
use tracing_core::{
use tracing::{
dispatcher::{self, Dispatch},
instrument::WithSubscriber,
span,
subscriber::{self, NoSubscriber, Subscriber},
Metadata,
Expand Down Expand Up @@ -561,8 +562,10 @@ impl Server {
let aggregate = self
.aggregator
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
.expect("cannot start server multiple times")
.run()
.with_subscriber(tracing::subscriber::NoSubscriber::default());
let aggregate = spawn_named(aggregate, "console::aggregate");
let addr = self.addr;
let serve = builder
.add_service(proto::instrument::instrument_server::InstrumentServer::new(
Expand Down