diff --git a/Cargo.lock b/Cargo.lock index 5edb0dd70..a7e307948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,6 +221,7 @@ name = "console-subscriber" version = "0.1.0" dependencies = [ "console-api", + "crossbeam-channel", "futures", "hdrhistogram", "humantime", @@ -245,6 +246,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crossterm" version = "0.20.0" diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index bc7efe55a..2a5bf961e 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -40,13 +40,16 @@ tracing = "0.1.26" tracing-subscriber = { version = "0.3.0", default-features = false, features = ["fmt", "registry"] } futures = { version = "0.3", default-features = false } hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] } -serde = { version = "1", features = ["derive"] } -serde_json = "1" # The parking_lot dependency is renamed, because we want our `parking_lot` # feature to also enable `tracing-subscriber`'s parking_lot feature flag. parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true } humantime = "2.1.0" +# Required for recording: +serde = { version = "1", features = ["derive"] } +serde_json = "1" +crossbeam-channel = "0.5" + [dev-dependencies] tokio = { version = "^1.7", features = ["full", "rt-multi-thread"] } futures = "0.3" diff --git a/console-subscriber/src/aggregator/id_data.rs b/console-subscriber/src/aggregator/id_data.rs index 92417a584..ebb783c8c 100644 --- a/console-subscriber/src/aggregator/id_data.rs +++ b/console-subscriber/src/aggregator/id_data.rs @@ -1,14 +1,12 @@ -use super::{shrink::ShrinkMap, DroppedAt, Id, ToProto}; +use super::{shrink::ShrinkMap, Id, ToProto}; +use crate::stats::{DroppedAt, Unsent}; use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; use std::time::{Duration, SystemTime}; pub(crate) struct IdData { - data: ShrinkMap, + data: ShrinkMap, } -pub(crate) struct Updating<'a, T>(&'a mut (T, bool)); - pub(crate) enum Include { All, UpdatedOnly, @@ -19,31 +17,19 @@ pub(crate) enum Include { impl Default for IdData { fn default() -> Self { IdData { - data: ShrinkMap::::new(), + data: ShrinkMap::::new(), } } } -impl IdData { - pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T> - where - T: Default, - { - Updating(self.data.entry(id).or_default()) - } - - pub(crate) fn update(&mut self, id: &Id) -> Option> { - self.data.get_mut(id).map(Updating) - } - +impl IdData { pub(crate) fn insert(&mut self, id: Id, data: T) { - self.data.insert(id, (data, true)); + self.data.insert(id, data); } pub(crate) fn since_last_update(&mut self) -> impl Iterator { - self.data.iter_mut().filter_map(|(id, (data, dirty))| { - if *dirty { - *dirty = false; + self.data.iter_mut().filter_map(|(id, data)| { + if data.take_unsent() { Some((id, data)) } else { None @@ -52,11 +38,11 @@ impl IdData { } pub(crate) fn all(&self) -> impl Iterator { - self.data.iter().map(|(id, (data, _))| (id, data)) + self.data.iter() } pub(crate) fn get(&self, id: &Id) -> Option<&T> { - self.data.get(id).map(|(data, _)| data) + self.data.get(id) } pub(crate) fn as_proto(&mut self, include: Include) -> HashMap @@ -75,7 +61,7 @@ impl IdData { } } - pub(crate) fn drop_closed( + pub(crate) fn drop_closed( &mut self, stats: &mut IdData, now: SystemTime, @@ -92,18 +78,19 @@ impl IdData { // drop closed entities tracing::trace!(?retention, has_watchers, "dropping closed"); - stats.data.retain_and_shrink(|id, (stats, dirty)| { + stats.data.retain_and_shrink(|id, stats| { if let Some(dropped_at) = stats.dropped_at() { let dropped_for = now.duration_since(dropped_at).unwrap_or_default(); + let dirty = stats.is_unsent(); let should_drop = // if there are any clients watching, retain all dirty tasks regardless of age - (*dirty && has_watchers) + (dirty && has_watchers) || dropped_for > retention; tracing::trace!( stats.id = ?id, stats.dropped_at = ?dropped_at, stats.dropped_for = ?dropped_for, - stats.dirty = *dirty, + stats.dirty = dirty, should_drop, ); return !should_drop; @@ -114,27 +101,6 @@ impl IdData { // drop closed entities which no longer have stats. self.data - .retain_and_shrink(|id, (_, _)| stats.data.contains_key(id)); - } -} - -// === impl Updating === - -impl<'a, T> Deref for Updating<'a, T> { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.0 .0 - } -} - -impl<'a, T> DerefMut for Updating<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 .0 - } -} - -impl<'a, T> Drop for Updating<'a, T> { - fn drop(&mut self) { - self.0 .1 = true; + .retain_and_shrink(|id, _| stats.data.contains_key(id)); } } diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index eb9794328..14d806f8e 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -1,16 +1,14 @@ -use super::{ - AttributeUpdate, AttributeUpdateOp, Command, Event, Shared, UpdateType, WakeOp, Watch, +use super::{Command, Event, Shared, Watch}; +use crate::{ + stats::{self, Unsent}, + ToProto, WatchRequest, }; -use crate::{record::Recorder, WatchRequest}; use console_api as proto; use proto::resources::resource; -use proto::Attribute; use tokio::sync::{mpsc, Notify}; use futures::FutureExt; use std::{ - collections::hash_map::{Entry, HashMap}, - convert::TryInto, sync::{ atomic::{AtomicBool, Ordering::*}, Arc, @@ -19,11 +17,6 @@ use std::{ }; use tracing_core::{span::Id, Metadata}; -use hdrhistogram::{ - serialization::{Serializer, V2SerializeError, V2Serializer}, - Histogram, -}; - mod id_data; mod shrink; use self::id_data::{IdData, Include}; @@ -66,19 +59,19 @@ pub(crate) struct Aggregator { tasks: IdData, /// Map of task IDs to task stats. - task_stats: IdData, + task_stats: IdData>, /// Map of resource IDs to resource static data. resources: IdData, /// Map of resource IDs to resource stats. - resource_stats: IdData, + resource_stats: IdData>, /// Map of AsyncOp IDs to AsyncOp static data. async_ops: IdData, /// Map of AsyncOp IDs to AsyncOp stats. - async_op_stats: IdData, + async_op_stats: IdData>, /// *All* PollOp events for AsyncOps on Resources. /// @@ -91,9 +84,6 @@ pub(crate) struct Aggregator { /// This is emptied on every state update. new_poll_ops: Vec, - /// A sink to record all events to a file. - recorder: Option, - /// The time "state" of the aggregator, such as paused or live. temporality: Temporality, } @@ -104,170 +94,39 @@ pub(crate) struct Flush { triggered: AtomicBool, } -// An entity (e.g Task, Resource) that at some point in -// time can be dropped. This generally refers to spans that -// have been closed indicating that a task, async op or a -// resource is not in use anymore -pub(crate) trait DroppedAt { - fn dropped_at(&self) -> Option; -} - -pub(crate) trait ToProto { - type Output; - fn to_proto(&self) -> Self::Output; -} - #[derive(Debug)] enum Temporality { Live, Paused, } - -#[derive(Default)] -struct PollStats { - /// The number of polls in progress - current_polls: u64, - /// The total number of polls - polls: u64, - first_poll: Option, - last_poll_started: Option, - last_poll_ended: Option, - busy_time: Duration, -} - // Represent static data for resources struct Resource { id: Id, + is_dirty: AtomicBool, parent_id: Option, metadata: &'static Metadata<'static>, concrete_type: String, kind: resource::Kind, location: Option, is_internal: bool, - inherit_child_attrs: bool, -} - -/// Represents a key for a `proto::field::Name`. Because the -/// proto::field::Name might not be unique we also include the -/// resource id in this key -#[derive(Hash, PartialEq, Eq)] -struct FieldKey { - update_id: Id, - field_name: proto::field::Name, -} - -#[derive(Default)] -struct ResourceStats { - created_at: Option, - dropped_at: Option, - attributes: HashMap, } /// Represents static data for tasks struct Task { id: Id, + is_dirty: AtomicBool, metadata: &'static Metadata<'static>, fields: Vec, location: Option, } -struct TaskStats { - // task stats - created_at: Option, - dropped_at: Option, - - // waker stats - wakes: u64, - waker_clones: u64, - waker_drops: u64, - self_wakes: u64, - last_wake: Option, - - poll_times_histogram: Histogram, - poll_stats: PollStats, -} - struct AsyncOp { id: Id, + is_dirty: AtomicBool, parent_id: Option, resource_id: Id, metadata: &'static Metadata<'static>, source: String, - inherit_child_attrs: bool, -} - -#[derive(Default)] -struct AsyncOpStats { - created_at: Option, - dropped_at: Option, - task_id: Option, - poll_stats: PollStats, - attributes: HashMap, -} - -impl DroppedAt for ResourceStats { - fn dropped_at(&self) -> Option { - self.dropped_at - } -} - -impl DroppedAt for TaskStats { - fn dropped_at(&self) -> Option { - self.dropped_at - } -} - -impl DroppedAt for AsyncOpStats { - fn dropped_at(&self) -> Option { - self.dropped_at - } -} - -impl PollStats { - fn update_on_span_enter(&mut self, timestamp: SystemTime) { - if self.current_polls == 0 { - self.last_poll_started = Some(timestamp); - if self.first_poll == None { - self.first_poll = Some(timestamp); - } - self.polls += 1; - } - self.current_polls += 1; - } - - fn update_on_span_exit(&mut self, timestamp: SystemTime) { - self.current_polls -= 1; - if self.current_polls == 0 { - if let Some(last_poll_started) = self.last_poll_started { - let elapsed = timestamp.duration_since(last_poll_started).unwrap(); - self.last_poll_ended = Some(timestamp); - self.busy_time += elapsed; - } - } - } - - fn since_last_poll(&self, timestamp: SystemTime) -> Option { - self.last_poll_started - .map(|lps| timestamp.duration_since(lps).unwrap()) - } -} - -impl Default for TaskStats { - fn default() -> Self { - TaskStats { - created_at: None, - dropped_at: None, - wakes: 0, - waker_clones: 0, - waker_drops: 0, - self_wakes: 0, - last_wake: None, - // significant figures should be in the [0-5] range and memory usage - // grows exponentially with higher a sigfig - poll_times_histogram: Histogram::::new(2).unwrap(), - poll_stats: PollStats::default(), - } - } } impl Aggregator { @@ -295,10 +154,6 @@ impl Aggregator { async_op_stats: IdData::default(), all_poll_ops: Default::default(), new_poll_ops: Default::default(), - recorder: builder - .recording_path - .as_ref() - .map(|path| Recorder::new(path).expect("creating recorder")), temporality: Temporality::Live, } } @@ -359,10 +214,6 @@ impl Aggregator { while let Some(event) = self.events.recv().now_or_never() { match event { Some(event) => { - // always be recording... - if let Some(ref recorder) = self.recorder { - recorder.record(&event); - } self.update_state(event); drained = true; } @@ -467,7 +318,7 @@ impl Aggregator { && subscription.update(&proto::tasks::TaskDetails { task_id: Some(id.clone().into()), now: Some(now.into()), - poll_times_histogram: serialize_histogram(&stats.poll_times_histogram).ok(), + poll_times_histogram: stats.serialize_histogram(), }) { self.details_watchers @@ -542,8 +393,7 @@ impl Aggregator { let details = proto::tasks::TaskDetails { task_id: Some(id.clone().into()), now: Some(now.into()), - poll_times_histogram: serialize_histogram(&task_stats.poll_times_histogram) - .ok(), + poll_times_histogram: task_stats.serialize_histogram(), }; watchers.retain(|watch| watch.update(&details)); !watchers.is_empty() @@ -565,7 +415,7 @@ impl Aggregator { Event::Spawn { id, metadata, - at, + stats, fields, location, } => { @@ -573,6 +423,7 @@ impl Aggregator { id.clone(), Task { id: id.clone(), + is_dirty: AtomicBool::new(true), metadata, fields, location, @@ -580,104 +431,10 @@ impl Aggregator { }, ); - self.task_stats.insert( - id, - TaskStats { - created_at: Some(at), - ..Default::default() - }, - ); - } - - Event::Enter { id, parent_id, at } => { - if let Some(mut task_stats) = self.task_stats.update(&id) { - task_stats.poll_stats.update_on_span_enter(at); - return; - } - - if let Some(mut async_op_stats) = - parent_id.and_then(|parent_id| self.async_op_stats.update(&parent_id)) - { - async_op_stats.poll_stats.update_on_span_enter(at); - } - } - - Event::Exit { id, parent_id, at } => { - if let Some(mut task_stats) = self.task_stats.update(&id) { - task_stats.poll_stats.update_on_span_exit(at); - if let Some(since_last_poll) = task_stats.poll_stats.since_last_poll(at) { - task_stats - .poll_times_histogram - .record(since_last_poll.as_nanos().try_into().unwrap_or(u64::MAX)) - .unwrap(); - } - return; - } - - if let Some(mut async_op_stats) = - parent_id.and_then(|parent_id| self.async_op_stats.update(&parent_id)) - { - async_op_stats.poll_stats.update_on_span_exit(at); - } - } - - Event::Close { id, at } => { - if let Some(mut task_stats) = self.task_stats.update(&id) { - task_stats.dropped_at = Some(at); - } - - if let Some(mut resource_stats) = self.resource_stats.update(&id) { - resource_stats.dropped_at = Some(at); - } - - if let Some(mut async_op_stats) = self.async_op_stats.update(&id) { - async_op_stats.dropped_at = Some(at); - } - } - - Event::Waker { id, op, at } => { - // It's possible for wakers to exist long after a task has - // finished. We don't want those cases to create a "new" - // task that isn't closed, just to insert some waker stats. - // - // It may be useful to eventually be able to report about - // "wasted" waker ops, but we'll leave that for another time. - if let Some(mut task_stats) = self.task_stats.update(&id) { - match op { - WakeOp::Wake { self_wake } | WakeOp::WakeByRef { self_wake } => { - task_stats.wakes += 1; - task_stats.last_wake = Some(at); - - // If the task has woken itself, increment the - // self-wake count. - if self_wake { - task_stats.self_wakes += 1; - } - - // Note: `Waker::wake` does *not* call the `drop` - // implementation, so waking by value doesn't - // trigger a drop event. so, count this as a `drop` - // to ensure the task's number of wakers can be - // calculated as `clones` - `drops`. - // - // see - // https://github.com/rust-lang/rust/blob/673d0db5e393e9c64897005b470bfeb6d5aec61b/library/core/src/task/wake.rs#L211-L212 - if let WakeOp::Wake { .. } = op { - task_stats.waker_drops += 1; - } - } - WakeOp::Clone => { - task_stats.waker_clones += 1; - } - WakeOp::Drop => { - task_stats.waker_drops += 1; - } - } - } + self.task_stats.insert(id, stats); } Event::Resource { - at, id, parent_id, metadata, @@ -685,30 +442,23 @@ impl Aggregator { concrete_type, location, is_internal, - inherit_child_attrs, - .. + stats, } => { self.resources.insert( id.clone(), Resource { id: id.clone(), + is_dirty: AtomicBool::new(true), parent_id, kind, metadata, concrete_type, location, is_internal, - inherit_child_attrs, }, ); - self.resource_stats.insert( - id, - ResourceStats { - created_at: Some(at), - ..Default::default() - }, - ); + self.resource_stats.insert(id, stats); } Event::PollOp { @@ -719,9 +469,6 @@ impl Aggregator { task_id, is_ready, } => { - let mut async_op_stats = self.async_op_stats.update_or_default(async_op_id.clone()); - async_op_stats.task_id.get_or_insert(task_id.clone()); - let poll_op = proto::resources::PollOp { metadata: Some(metadata.into()), resource_id: Some(resource_id.into()), @@ -735,104 +482,27 @@ impl Aggregator { self.new_poll_ops.push(poll_op); } - Event::StateUpdate { - update_id, - update_type, - update, - .. - } => { - let mut to_update = vec![(update_id.clone(), update_type.clone())]; - - fn update_entry(e: Entry<'_, FieldKey, Attribute>, upd: &AttributeUpdate) { - e.and_modify(|attr| update_attribute(attr, upd)) - .or_insert_with(|| upd.clone().into()); - } - - match update_type { - UpdateType::Resource => { - if let Some(parent) = self - .resources - .get(&update_id) - .and_then(|r| self.resources.get(r.parent_id.as_ref()?)) - .filter(|parent| parent.inherit_child_attrs) - { - to_update.push((parent.id.clone(), UpdateType::Resource)); - } - } - UpdateType::AsyncOp => { - if let Some(parent) = self - .async_ops - .get(&update_id) - .and_then(|r| self.async_ops.get(r.parent_id.as_ref()?)) - .filter(|parent| parent.inherit_child_attrs) - { - to_update.push((parent.id.clone(), UpdateType::AsyncOp)); - } - } - } - - for (update_id, update_type) in to_update { - let field_name = match update.field.name.as_ref() { - Some(name) => name.clone(), - None => { - tracing::warn!(?update.field, "field missing name, skipping..."); - return; - } - }; - - let upd_key = FieldKey { - update_id: update_id.clone(), - field_name, - }; - - match update_type { - UpdateType::Resource => { - let mut stats = self.resource_stats.update(&update_id); - let entry = stats.as_mut().map(|s| s.attributes.entry(upd_key)); - if let Some(entry) = entry { - update_entry(entry, &update); - } - } - UpdateType::AsyncOp => { - let mut stats = self.async_op_stats.update(&update_id); - let entry = stats.as_mut().map(|s| s.attributes.entry(upd_key)); - if let Some(entry) = entry { - update_entry(entry, &update); - } - } - }; - } - } - Event::AsyncResourceOp { - at, id, source, resource_id, metadata, parent_id, - inherit_child_attrs, - .. + stats, } => { self.async_ops.insert( id.clone(), AsyncOp { id: id.clone(), + is_dirty: AtomicBool::new(true), resource_id, metadata, source, parent_id, - inherit_child_attrs, }, ); - self.async_op_stats.insert( - id, - AsyncOpStats { - created_at: Some(at), - ..Default::default() - }, - ); + self.async_op_stats.insert(id, stats); } } } @@ -872,20 +542,6 @@ impl Watch { } } -impl ToProto for PollStats { - type Output = proto::PollStats; - - fn to_proto(&self) -> Self::Output { - proto::PollStats { - polls: self.polls, - first_poll: self.first_poll.map(Into::into), - last_poll_started: self.last_poll_started.map(Into::into), - last_poll_ended: self.last_poll_ended.map(Into::into), - busy_time: Some(self.busy_time.into()), - } - } -} - impl ToProto for Task { type Output = proto::tasks::Task; @@ -902,20 +558,13 @@ impl ToProto for Task { } } -impl ToProto for TaskStats { - type Output = proto::tasks::Stats; +impl Unsent for Task { + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } - fn to_proto(&self) -> Self::Output { - proto::tasks::Stats { - poll_stats: Some(self.poll_stats.to_proto()), - created_at: self.created_at.map(Into::into), - dropped_at: self.dropped_at.map(Into::into), - wakes: self.wakes, - waker_clones: self.waker_clones, - self_wakes: self.self_wakes, - waker_drops: self.waker_drops, - last_wake: self.last_wake.map(Into::into), - } + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) } } @@ -935,16 +584,13 @@ impl ToProto for Resource { } } -impl ToProto for ResourceStats { - type Output = proto::resources::Stats; +impl Unsent for Resource { + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } - fn to_proto(&self) -> Self::Output { - let attributes = self.attributes.values().cloned().collect(); - proto::resources::Stats { - created_at: self.created_at.map(Into::into), - dropped_at: self.dropped_at.map(Into::into), - attributes, - } + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) } } @@ -962,81 +608,12 @@ impl ToProto for AsyncOp { } } -impl ToProto for AsyncOpStats { - type Output = proto::async_ops::Stats; - - fn to_proto(&self) -> Self::Output { - let attributes = self.attributes.values().cloned().collect(); - proto::async_ops::Stats { - poll_stats: Some(self.poll_stats.to_proto()), - created_at: self.created_at.map(Into::into), - dropped_at: self.dropped_at.map(Into::into), - task_id: self.task_id.clone().map(Into::into), - attributes, - } +impl Unsent for AsyncOp { + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) } -} -impl From for Attribute { - fn from(upd: AttributeUpdate) -> Self { - Attribute { - field: Some(upd.field), - unit: upd.unit, - } - } -} - -fn serialize_histogram(histogram: &Histogram) -> Result, V2SerializeError> { - let mut serializer = V2Serializer::new(); - let mut buf = Vec::new(); - serializer.serialize(histogram, &mut buf)?; - Ok(buf) -} - -fn update_attribute(attribute: &mut Attribute, update: &AttributeUpdate) { - use proto::field::Value::*; - let attribute_val = attribute.field.as_mut().and_then(|a| a.value.as_mut()); - let update_val = update.field.value.clone(); - let update_name = update.field.name.clone(); - match (attribute_val, update_val) { - (Some(BoolVal(v)), Some(BoolVal(upd))) => *v = upd, - - (Some(StrVal(v)), Some(StrVal(upd))) => *v = upd, - - (Some(DebugVal(v)), Some(DebugVal(upd))) => *v = upd, - - (Some(U64Val(v)), Some(U64Val(upd))) => match update.op { - Some(AttributeUpdateOp::Add) => *v = v.saturating_add(upd), - - Some(AttributeUpdateOp::Sub) => *v = v.saturating_sub(upd), - - Some(AttributeUpdateOp::Override) => *v = upd, - - None => tracing::warn!( - "numeric attribute update {:?} needs to have an op field", - update_name - ), - }, - - (Some(I64Val(v)), Some(I64Val(upd))) => match update.op { - Some(AttributeUpdateOp::Add) => *v = v.saturating_add(upd), - - Some(AttributeUpdateOp::Sub) => *v = v.saturating_sub(upd), - - Some(AttributeUpdateOp::Override) => *v = upd, - - None => tracing::warn!( - "numeric attribute update {:?} needs to have an op field", - update_name - ), - }, - - (val, update) => { - tracing::warn!( - "attribute {:?} cannot be updated by update {:?}", - val, - update - ); - } + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) } } diff --git a/console-subscriber/src/attribute.rs b/console-subscriber/src/attribute.rs new file mode 100644 index 000000000..ec71496ed --- /dev/null +++ b/console-subscriber/src/attribute.rs @@ -0,0 +1,116 @@ +use console_api as proto; +use std::collections::HashMap; +use tracing::span::Id; + +#[derive(Debug, Default)] +pub(crate) struct Attributes { + attributes: HashMap, +} + +#[derive(Debug, Clone)] +pub(crate) struct Update { + pub(crate) field: proto::Field, + pub(crate) op: Option, + pub(crate) unit: Option, +} + +#[derive(Debug, Clone)] +pub(crate) enum UpdateOp { + Add, + Override, + Sub, +} + +/// Represents a key for a `proto::field::Name`. Because the +/// proto::field::Name might not be unique we also include the +/// resource id in this key +#[derive(Debug, Hash, PartialEq, Eq)] +struct FieldKey { + update_id: Id, + field_name: proto::field::Name, +} + +// === impl Attributes === + +impl Attributes { + pub(crate) fn values(&self) -> impl Iterator { + self.attributes.values() + } + + pub(crate) fn update(&mut self, id: &Id, update: &Update) { + let field_name = match update.field.name.as_ref() { + Some(name) => name.clone(), + None => { + tracing::warn!(?update.field, "field missing name, skipping..."); + return; + } + }; + let update_id = id.clone(); + let key = FieldKey { + update_id, + field_name, + }; + + self.attributes + .entry(key) + .and_modify(|attr| update_attribute(attr, update)) + .or_insert_with(|| update.clone().into()); + } +} + +fn update_attribute(attribute: &mut proto::Attribute, update: &Update) { + use proto::field::Value::*; + let attribute_val = attribute.field.as_mut().and_then(|a| a.value.as_mut()); + let update_val = update.field.value.clone(); + let update_name = update.field.name.clone(); + match (attribute_val, update_val) { + (Some(BoolVal(v)), Some(BoolVal(upd))) => *v = upd, + + (Some(StrVal(v)), Some(StrVal(upd))) => *v = upd, + + (Some(DebugVal(v)), Some(DebugVal(upd))) => *v = upd, + + (Some(U64Val(v)), Some(U64Val(upd))) => match update.op { + Some(UpdateOp::Add) => *v = v.saturating_add(upd), + + Some(UpdateOp::Sub) => *v = v.saturating_sub(upd), + + Some(UpdateOp::Override) => *v = upd, + + None => tracing::warn!( + "numeric attribute update {:?} needs to have an op field", + update_name + ), + }, + + (Some(I64Val(v)), Some(I64Val(upd))) => match update.op { + Some(UpdateOp::Add) => *v = v.saturating_add(upd), + + Some(UpdateOp::Sub) => *v = v.saturating_sub(upd), + + Some(UpdateOp::Override) => *v = upd, + + None => tracing::warn!( + "numeric attribute update {:?} needs to have an op field", + update_name + ), + }, + + (val, update) => { + tracing::warn!( + "attribute {:?} cannot be updated by update {:?}", + val, + update + ); + } + } +} + +impl From for proto::Attribute { + fn from(upd: Update) -> Self { + proto::Attribute { + field: Some(upd.field), + unit: upd.unit, + } + } +} diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index eb528e0ce..1145352b1 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -15,24 +15,30 @@ use std::{ use thread_local::ThreadLocal; use tokio::sync::{mpsc, oneshot}; use tracing_core::{ - dispatcher::{self, Dispatch}, span::{self, Id}, - subscriber::{self, NoSubscriber, Subscriber}, + subscriber::{self, Subscriber}, Metadata, }; -use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; +use tracing_subscriber::{ + layer::Context, + registry::{Extensions, LookupSpan, SpanRef}, + Layer, +}; mod aggregator; +mod attribute; mod builder; mod callsites; mod record; mod stack; +mod stats; pub(crate) mod sync; mod visitors; use aggregator::Aggregator; pub use builder::Builder; use callsites::Callsites; +use record::Recorder; use stack::SpanStack; use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor}; @@ -102,8 +108,8 @@ pub struct ConsoleLayer { /// TODO: Take some time to determine more reasonable numbers async_op_state_update_callsites: Callsites<32>, - /// Used for unsetting the default dispatcher inside of span callbacks. - no_dispatch: Dispatch, + /// A sink to record all events to a file. + recorder: Option, } /// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire]. @@ -124,6 +130,11 @@ pub struct Server { client_buffer: usize, } +pub(crate) trait ToProto { + type Output; + fn to_proto(&self) -> Self::Output; +} + /// State shared between the `ConsoleLayer` and the `Aggregator` task. #[derive(Debug, Default)] struct Shared { @@ -165,39 +176,19 @@ enum Event { Spawn { id: span::Id, metadata: &'static Metadata<'static>, - at: SystemTime, + stats: Arc, fields: Vec, location: Option, }, - Enter { - id: span::Id, - parent_id: Option, - at: SystemTime, - }, - Exit { - id: span::Id, - parent_id: Option, - at: SystemTime, - }, - Close { - id: span::Id, - at: SystemTime, - }, - Waker { - id: span::Id, - op: WakeOp, - at: SystemTime, - }, Resource { id: span::Id, parent_id: Option, metadata: &'static Metadata<'static>, - at: SystemTime, concrete_type: String, kind: resource::Kind, location: Option, is_internal: bool, - inherit_child_attrs: bool, + stats: Arc, }, PollOp { metadata: &'static Metadata<'static>, @@ -207,40 +198,15 @@ enum Event { task_id: span::Id, is_ready: bool, }, - StateUpdate { - update_id: span::Id, - update_type: UpdateType, - update: AttributeUpdate, - }, AsyncResourceOp { id: span::Id, parent_id: Option, resource_id: span::Id, metadata: &'static Metadata<'static>, - at: SystemTime, source: String, - inherit_child_attrs: bool, - }, -} - -#[derive(Debug, Clone)] -enum UpdateType { - Resource, - AsyncOp, -} -#[derive(Debug, Clone)] -struct AttributeUpdate { - field: proto::Field, - op: Option, - unit: Option, -} - -#[derive(Debug, Clone)] -enum AttributeUpdateOp { - Add, - Override, - Sub, + stats: Arc, + }, } #[derive(Clone, Debug, Copy, Serialize)] @@ -300,7 +266,10 @@ impl ConsoleLayer { // Conservatively, start to trigger a flush when half the channel is full. // This tries to reduce the chance of losing events to a full channel. let flush_under_capacity = config.event_buffer_capacity / 2; - + let recorder = config + .recording_path + .as_ref() + .map(|path| Recorder::new(path).expect("creating recorder")); let server = Server { aggregator: Some(aggregator), addr: config.server_addr, @@ -320,7 +289,7 @@ impl ConsoleLayer { poll_op_callsites: Callsites::default(), resource_state_update_callsites: Callsites::default(), async_op_state_update_callsites: Callsites::default(), - no_dispatch: Dispatch::new(NoSubscriber::default()), + recorder, }; (layer, server) } @@ -407,15 +376,6 @@ impl ConsoleLayer { .unwrap_or(false) } - fn is_id_tracked(&self, id: &span::Id, cx: &Context<'_, S>) -> bool - where - S: Subscriber + for<'a> LookupSpan<'a>, - { - cx.span(id) - .map(|span| span.extensions().get::().is_some()) - .unwrap_or(false) - } - fn first_entered

(&self, stack: &SpanStack, p: P) -> Option where P: Fn(&span::Id) -> bool, @@ -429,19 +389,28 @@ impl ConsoleLayer { .cloned() } - fn send(&self, dropped: &AtomicUsize, event: Event) -> bool { + fn send_metadata(&self, dropped: &AtomicUsize, event: Event) -> bool { + self.send_stats(dropped, move || (event, ())).is_some() + } + + fn send_stats( + &self, + dropped: &AtomicUsize, + mk_event: impl FnOnce() -> (Event, S), + ) -> Option { use mpsc::error::TrySendError; // Return whether or not we actually sent the event. let sent = match self.tx.try_reserve() { Ok(permit) => { + let (event, stats) = mk_event(); permit.send(event); - true + Some(stats) } Err(TrySendError::Closed(_)) => { // we should warn here eventually, but nop for now because we // can't trigger tracing events... - false + None } Err(TrySendError::Full(_)) => { // this shouldn't happen, since we trigger a flush when @@ -449,7 +418,7 @@ impl ConsoleLayer { // time is very high, maybe the aggregator task hasn't been // polled yet. so... eek?! dropped.fetch_add(1, Ordering::Release); - false + None } }; @@ -460,6 +429,56 @@ impl ConsoleLayer { sent } + + fn record(&self, event: impl FnOnce() -> record::Event) { + if let Some(ref recorder) = self.recorder { + recorder.record(event()); + } + } + + fn state_update( + &self, + id: &Id, + event: &tracing::Event<'_>, + ctx: &Context<'_, S>, + get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>, + ) where + S: Subscriber + for<'a> LookupSpan<'a>, + { + let meta_id = event.metadata().into(); + let mut state_update_visitor = StateUpdateVisitor::new(meta_id); + event.record(&mut state_update_visitor); + + let update = match state_update_visitor.result() { + Some(update) => update, + None => return, + }; + + let span = match ctx.span(id) { + Some(span) => span, + // XXX(eliza): no span exists for a resource ID, we should maybe + // record an error here... + None => return, + }; + + let exts = span.extensions(); + let stats = match get_stats(&exts) { + Some(stats) => stats, + // XXX(eliza): a resource span was not a resource??? this is a bug + None => return, + }; + + stats.update_attribute(id, &update); + + if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) { + let exts = parent.extensions(); + if let Some(stats) = get_stats(&exts) { + if stats.inherit_child_attributes { + stats.update_attribute(id, &update); + } + } + } + } } impl Layer for ConsoleLayer @@ -503,28 +522,40 @@ where (_, _) => &self.shared.dropped_tasks, }; - self.send(dropped, Event::Metadata(meta)); + self.send_metadata(dropped, Event::Metadata(meta)); subscriber::Interest::always() } fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { let metadata = attrs.metadata(); - let sent = if self.is_spawn(metadata) { + if self.is_spawn(metadata) { let at = SystemTime::now(); let mut task_visitor = TaskVisitor::new(metadata.into()); attrs.record(&mut task_visitor); let (fields, location) = task_visitor.result(); - self.send( - &self.shared.dropped_tasks, - Event::Spawn { + self.record(|| record::Event::Spawn { + id: id.into_u64(), + at, + fields: record::SerializeFields(fields.clone()), + }); + if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || { + let stats = Arc::new(stats::TaskStats::new(at)); + let event = Event::Spawn { id: id.clone(), - at, + stats: stats.clone(), metadata, fields, location, - }, - ) - } else if self.is_resource(metadata) { + }; + (event, stats) + }) { + ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + } + return; + } + + if self.is_resource(metadata) { + let at = SystemTime::now(); let mut resource_visitor = ResourceVisitor::default(); attrs.record(&mut resource_visitor); if let Some(result) = resource_visitor.result() { @@ -535,33 +566,38 @@ where is_internal, inherit_child_attrs, } = result; - let at = SystemTime::now(); let parent_id = self.current_spans.get().and_then(|stack| { self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx)) }); - self.send( - &self.shared.dropped_resources, - Event::Resource { + if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || { + let stats = Arc::new(stats::ResourceStats::new( + at, + inherit_child_attrs, + parent_id.clone(), + )); + let event = Event::Resource { id: id.clone(), parent_id, metadata, - at, concrete_type, kind, location, is_internal, - inherit_child_attrs, - }, - ) - } else { - // else unknown resource span format - false + stats: stats.clone(), + }; + (event, stats) + }) { + ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + } } - } else if self.is_async_op(metadata) { + return; + } + + if self.is_async_op(metadata) { + let at = SystemTime::now(); let mut async_op_visitor = AsyncOpVisitor::default(); attrs.record(&mut async_op_visitor); if let Some((source, inherit_child_attrs)) = async_op_visitor.result() { - let at = SystemTime::now(); let resource_id = self.current_spans.get().and_then(|stack| { self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx)) }); @@ -571,40 +607,27 @@ where }); if let Some(resource_id) = resource_id { - self.send( - &self.shared.dropped_async_ops, - Event::AsyncResourceOp { - id: id.clone(), - parent_id, - resource_id, - at, - metadata, - source, - inherit_child_attrs, - }, - ) - } else { - false + if let Some(stats) = + self.send_stats(&self.shared.dropped_async_ops, move || { + let stats = Arc::new(stats::AsyncOpStats::new( + at, + inherit_child_attrs, + parent_id.clone(), + )); + let event = Event::AsyncResourceOp { + id: id.clone(), + parent_id, + resource_id, + metadata, + source, + stats: stats.clone(), + }; + (event, stats) + }) + { + ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats); + } } - } else { - // else async op span needs to have a source field - false - } - } else { - false - }; - - // If we were able to record the span, add a marker extension indicating - // that it's tracked by the console. - if sent { - if let Some(span) = ctx.span(id) { - span.extensions_mut().insert(Tracked {}); - } else { - debug_assert!( - false, - "span should exist if `on_new_span` was called for its ID ({:?})", - id - ); } } } @@ -615,20 +638,32 @@ where let at = SystemTime::now(); let mut visitor = WakerVisitor::default(); event.record(&mut visitor); + // XXX (eliza): ew... if let Some((id, mut op)) = visitor.result() { - if op.is_wake() { - // Are we currently inside the task's span? If so, the task - // has woken itself. - let self_wake = self - .current_spans - .get() - .map(|spans| spans.borrow().iter().any(|span| span == &id)) - .unwrap_or(false); - op = op.self_wake(self_wake); + if let Some(span) = ctx.span(&id) { + let exts = span.extensions(); + if let Some(stats) = exts.get::>() { + if op.is_wake() { + // Are we currently inside the task's span? If so, the task + // has woken itself. + + let self_wake = self + .current_spans + .get() + .map(|spans| spans.borrow().iter().any(|span| span == &id)) + .unwrap_or(false); + op = op.self_wake(self_wake); + } + + stats.record_wake_op(op, at); + self.record(|| record::Event::Waker { + id: id.into_u64(), + at, + op, + }); + } } - self.send(&self.shared.dropped_tasks, Event::Waker { id, op, at }); } - // else unknown waker event... what to do? can't trace it from here... return; } @@ -649,20 +684,28 @@ where self.first_entered(&stack, |id| self.is_id_async_op(id, &ctx))?; Some((task_id, async_op_id)) }); - // poll op event should be emitted in the context of an async op and task spans if let Some((task_id, async_op_id)) = task_and_async_op_ids { - self.send( - &self.shared.dropped_async_ops, - Event::PollOp { + if let Some(span) = ctx.span(&async_op_id) { + let exts = span.extensions(); + if let Some(stats) = exts.get::>() { + stats.set_task_id(&task_id); + } + } + + self.send_stats(&self.shared.dropped_async_ops, || { + let event = Event::PollOp { metadata, op_name, resource_id, async_op_id, task_id, is_ready, - }, - ); + }; + (event, ()) + }); + + // TODO: JSON recorder doesn't care about poll ops. } } } @@ -674,22 +717,13 @@ where let resource_id = self.current_spans.get().and_then(|stack| { self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx)) }); - - if let Some(resource_id) = resource_id { - let meta_id = event.metadata().into(); - let mut state_update_visitor = StateUpdateVisitor::new(meta_id); - event.record(&mut state_update_visitor); - if let Some(update) = state_update_visitor.result() { - self.send( - &self.shared.dropped_resources, - Event::StateUpdate { - update_id: resource_id, - update_type: UpdateType::Resource, - update, - }, - ); - } + if let Some(id) = resource_id { + self.state_update(&id, event, &ctx, |exts| { + exts.get::>() + .map( as std::ops::Deref>::deref) + }); } + return; } @@ -697,89 +731,116 @@ where let async_op_id = self.current_spans.get().and_then(|stack| { self.first_entered(&stack.borrow(), |id| self.is_id_async_op(id, &ctx)) }); - if let Some(async_op_id) = async_op_id { - let meta_id = event.metadata().into(); - let mut state_update_visitor = StateUpdateVisitor::new(meta_id); - event.record(&mut state_update_visitor); - if let Some(update) = state_update_visitor.result() { - self.send( - &self.shared.dropped_async_ops, - Event::StateUpdate { - update_id: async_op_id, - update_type: UpdateType::AsyncOp, - update, - }, - ); - } + if let Some(id) = async_op_id { + self.state_update(&id, event, &ctx, |exts| { + let async_op = exts.get::>()?; + Some(&async_op.stats) + }); } } } fn on_enter(&self, id: &span::Id, cx: Context<'_, S>) { - if !self.is_id_tracked(id, &cx) { - return; + fn update LookupSpan<'a>>( + span: &SpanRef, + at: Option, + ) -> Option { + let exts = span.extensions(); + // if the span we are entering is a task or async op, record the + // poll stats. + if let Some(stats) = exts.get::>() { + let at = at.unwrap_or_else(SystemTime::now); + stats.start_poll(at); + Some(at) + } else if let Some(stats) = exts.get::>() { + let at = at.unwrap_or_else(SystemTime::now); + stats.start_poll(at); + Some(at) + // otherwise, is the span a resource? in that case, we also want + // to enter it, although we don't care about recording poll + // stats. + } else if exts.get::>().is_some() { + Some(at.unwrap_or_else(SystemTime::now)) + } else { + None + } } - let _default = dispatcher::set_default(&self.no_dispatch); - let parent_id = cx.span(id).and_then(|s| s.parent().map(|p| p.id())); - let sent = self.send( - &self.shared.dropped_tasks, - Event::Enter { - at: SystemTime::now(), - id: id.clone(), - parent_id, - }, - ); - // if we were able to record the send successfully, track entering the - // span. if not, ignore the enter, to avoid inconsistent data. - if sent { - self.current_spans - .get_or_default() - .borrow_mut() - .push(id.clone()); + if let Some(span) = cx.span(id) { + if let Some(now) = update(&span, None) { + if let Some(parent) = span.parent() { + update(&parent, Some(now)); + } + self.current_spans + .get_or_default() + .borrow_mut() + .push(id.clone()); + + self.record(|| record::Event::Enter { + id: id.into_u64(), + at: now, + }); + } } } fn on_exit(&self, id: &span::Id, cx: Context<'_, S>) { - if !self.is_id_tracked(id, &cx) { - return; - } - - let _default = dispatcher::set_default(&self.no_dispatch); - if let Some(spans) = self.current_spans.get() { - if !spans.borrow_mut().pop(id) { - // we did not actually pop the span --- entering it may not have - // been successfully recorded. in this case, ignore the exit, - // since the aggregator was never informed of the entry. - return; + fn update LookupSpan<'a>>( + span: &SpanRef, + at: Option, + ) -> Option { + let exts = span.extensions(); + // if the span we are entering is a task or async op, record the + // poll stats. + if let Some(stats) = exts.get::>() { + let at = at.unwrap_or_else(SystemTime::now); + stats.end_poll(at); + Some(at) + } else if let Some(stats) = exts.get::>() { + let at = at.unwrap_or_else(SystemTime::now); + stats.end_poll(at); + Some(at) + // otherwise, is the span a resource? in that case, we also want + // to enter it, although we don't care about recording poll + // stats. + } else if exts.get::>().is_some() { + Some(at.unwrap_or_else(SystemTime::now)) + } else { + None } } - let parent_id = cx.span(id).and_then(|s| s.parent().map(|p| p.id())); + if let Some(span) = cx.span(id) { + if let Some(now) = update(&span, None) { + if let Some(parent) = span.parent() { + update(&parent, Some(now)); + } + self.current_spans.get_or_default().borrow_mut().pop(id); - self.send( - &self.shared.dropped_tasks, - Event::Exit { - id: id.clone(), - parent_id, - at: SystemTime::now(), - }, - ); + self.record(|| record::Event::Exit { + id: id.into_u64(), + at: now, + }); + } + } } fn on_close(&self, id: span::Id, cx: Context<'_, S>) { - if !self.is_id_tracked(&id, &cx) { - return; + if let Some(span) = cx.span(&id) { + let now = SystemTime::now(); + let exts = span.extensions(); + if let Some(stats) = exts.get::>() { + stats.drop_task(now); + } else if let Some(stats) = exts.get::>() { + stats.drop_async_op(now); + } else if let Some(stats) = exts.get::>() { + stats.drop_resource(now); + } + self.record(|| record::Event::Close { + id: id.into_u64(), + at: now, + }); } - - let _default = dispatcher::set_default(&self.no_dispatch); - self.send( - &self.shared.dropped_tasks, - Event::Close { - at: SystemTime::now(), - id, - }, - ); } } diff --git a/console-subscriber/src/record.rs b/console-subscriber/src/record.rs index dc7b5708f..4c16fcc69 100644 --- a/console-subscriber/src/record.rs +++ b/console-subscriber/src/record.rs @@ -1,16 +1,10 @@ +use console_api as proto; +use crossbeam_channel::{Receiver, Sender}; use serde::{ ser::{SerializeSeq, SerializeStruct}, Serialize, }; -use std::{ - fs::File, - io, - path::Path, - sync::{Arc, Mutex}, - time::SystemTime, -}; - -use console_api as proto; +use std::{fs::File, io, path::Path, time::SystemTime}; /// This marks the currently understood version of the recording format. This /// should be increased whenever the format has a breaking change that we @@ -21,23 +15,9 @@ use console_api as proto; const DATA_FORMAT_VERSION: u8 = 1; pub(crate) struct Recorder { - buf: Arc>, - - worker: std::thread::JoinHandle<()>, -} - -struct Io { - buf: Arc>, - file: File, -} - -struct RecordBuf { - /// The current buffer to serialize events into. - bytes: Vec, - /// The "next" buffer that should be used when the IO thread takes the - /// current buffer. After flushing, the IO thread will put the buffer - /// back in this slot, so the allocation can be reused. - next: Vec, + tx: Sender, + // TODO(eliza): terminate and flush when dropping... + _worker: std::thread::JoinHandle<()>, } #[derive(Serialize)] @@ -46,11 +26,11 @@ struct Header { } #[derive(Serialize)] -enum Event<'a> { +pub(crate) enum Event { Spawn { id: u64, at: SystemTime, - fields: SerializeFields<'a>, + fields: SerializeFields, }, Enter { id: u64, @@ -71,120 +51,73 @@ enum Event<'a> { }, } -struct SerializeFields<'a>(&'a [proto::Field]); +pub(crate) struct SerializeFields(pub(crate) Vec); struct SerializeField<'a>(&'a proto::Field); impl Recorder { pub(crate) fn new(path: &Path) -> io::Result { - let buf = Arc::new(Mutex::new(RecordBuf::new())); - let buf2 = buf.clone(); let file = std::fs::File::create(path)?; - - let worker = std::thread::Builder::new() + let (tx, rx) = crossbeam_channel::bounded(4096); + let _worker = std::thread::Builder::new() .name("console/subscriber/recorder/io".into()) - .spawn(move || { - record_io(Io { buf: buf2, file }); + .spawn(move || match record_io(file, rx) { + Err(e) => eprintln!("event recorder failed: {}", e), + Ok(()) => {} })?; - let recorder = Recorder { buf, worker }; - - recorder.write(&Header { - v: DATA_FORMAT_VERSION, - }); + let recorder = Recorder { tx, _worker }; Ok(recorder) } - pub(crate) fn record(&self, event: &crate::Event) { - let event = match event { - crate::Event::Spawn { id, at, fields, .. } => Event::Spawn { - id: id.into_u64(), - at: *at, - fields: SerializeFields(fields), - }, - crate::Event::Enter { id, at, .. } => Event::Enter { - id: id.into_u64(), - at: *at, - }, - crate::Event::Exit { id, at, .. } => Event::Exit { - id: id.into_u64(), - at: *at, - }, - crate::Event::Close { id, at } => Event::Close { - id: id.into_u64(), - at: *at, - }, - crate::Event::Waker { id, op, at } => Event::Waker { - id: id.into_u64(), - at: *at, - op: *op, - }, - _ => return, - }; - - self.write(&event); - } - - fn write(&self, val: &T) { - let mut buf = self.buf.lock().unwrap(); - serde_json::to_writer(&mut buf.bytes, val).expect("json"); - buf.bytes.push(b'\n'); - drop(buf); - self.worker.thread().unpark(); + pub(crate) fn record(&self, event: Event) { + if self.tx.send(event).is_err() { + eprintln!("event recorder thread has terminated!"); + } } } -impl RecordBuf { - fn new() -> Self { - Self { - bytes: Vec::new(), - next: Vec::new(), - } - } +fn record_io(file: File, rx: Receiver) -> io::Result<()> { + use std::io::{BufWriter, Write}; - /// Takes the existing bytes to be written, and resets self so that - /// it may continue to buffer events. - fn take(&mut self) -> Vec { - let next = std::mem::take(&mut self.next); - std::mem::replace(&mut self.bytes, next) + fn write(mut file: &mut BufWriter, val: &T) -> io::Result<()> { + serde_json::to_writer(&mut file, val)?; + file.write_all(b"\n") } - fn put(&mut self, mut next: Vec) { - debug_assert_eq!(self.next.capacity(), 0); - next.clear(); - self.next = next; - } -} - -fn record_io(mut dst: Io) { - use std::io::Write; + let mut file = BufWriter::new(file); + write( + &mut file, + &Header { + v: DATA_FORMAT_VERSION, + }, + )?; - loop { - std::thread::park(); + // wait to recieve an event... + while let Ok(event) = rx.recv() { + // TODO: what to do if file error? + write(&mut file, &event)?; - // Only lock the mutex to take the bytes out. The file write could - // take a relatively long time, and we don't want to be blocking - // the serialization end holding this lock. - let bytes = dst.buf.lock().unwrap().take(); - match dst.file.write_all(&bytes) { - Ok(()) => { - dst.buf.lock().unwrap().put(bytes); - } - Err(_e) => { - // TODO: what to do if file error? - } + // drain any additional events that are ready now + while let Ok(event) = rx.try_recv() { + write(&mut file, &event)?; } + + file.flush()?; } + + tracing::debug!("event stream ended; flushing file"); + file.flush() } -impl serde::Serialize for SerializeFields<'_> { +impl serde::Serialize for SerializeFields { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { let mut seq = serializer.serialize_seq(Some(self.0.len()))?; - for element in self.0 { + for element in &self.0 { seq.serialize_element(&SerializeField(element))?; } seq.end() diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs new file mode 100644 index 000000000..ad2b67f67 --- /dev/null +++ b/console-subscriber/src/stats.rs @@ -0,0 +1,503 @@ +use crate::{attribute, sync::Mutex, ToProto}; +use hdrhistogram::{ + serialization::{Serializer, V2Serializer}, + Histogram, +}; +use std::cmp; +use std::sync::{ + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering::*}, + Arc, +}; +use std::time::{Duration, SystemTime}; +use tracing::span::Id; + +use console_api as proto; + +/// A type which records whether it has unsent updates. +/// +/// If something implementing this trait has been changed since the last time +/// data was sent to a client, it will indicate that it is "dirty". If it has +/// not been changed, it does not have to be included in the current update. +pub(crate) trait Unsent { + /// Returns `true` if this type has unsent updates, and if it does, clears + /// the flag indicating there are unsent updates. + /// + /// This is called when filtering which stats need to be included in the + /// current update. If this returns `true`, it will be included, so it + /// becomes no longer dirty. + fn take_unsent(&self) -> bool; + + /// Returns `true` if this type has unsent updates, without changing the + /// flag. + fn is_unsent(&self) -> bool; +} + +// An entity (e.g Task, Resource) that at some point in +// time can be dropped. This generally refers to spans that +// have been closed indicating that a task, async op or a +// resource is not in use anymore +pub(crate) trait DroppedAt { + fn dropped_at(&self) -> Option; +} + +/// Stats associated with a task. +#[derive(Debug)] +pub(crate) struct TaskStats { + is_dirty: AtomicBool, + is_dropped: AtomicBool, + // task stats + pub(crate) created_at: SystemTime, + timestamps: Mutex, + + // waker stats + wakes: AtomicUsize, + waker_clones: AtomicUsize, + waker_drops: AtomicUsize, + self_wakes: AtomicUsize, + + /// Poll durations and other stats. + poll_stats: PollStats, +} + +/// Stats associated with an async operation. +/// +/// This shares all of the same fields as [`ResourceStats]`, with the addition +/// of [`PollStats`] tracking when the async operation is polled, and the task +/// ID of the last task to poll the async op. +#[derive(Debug)] +pub(crate) struct AsyncOpStats { + /// The task ID of the last task to poll this async op. + /// + /// This is set every time the async op is polled, in case a future is + /// passed between tasks. + task_id: AtomicU64, + + /// Fields shared with `ResourceStats`. + pub(crate) stats: ResourceStats, + + /// Poll durations and other stats. + poll_stats: PollStats, +} + +/// Stats associated with a resource. +#[derive(Debug)] +pub(crate) struct ResourceStats { + is_dirty: AtomicBool, + is_dropped: AtomicBool, + created_at: SystemTime, + dropped_at: Mutex>, + attributes: Mutex, + pub(crate) inherit_child_attributes: bool, + pub(crate) parent_id: Option, +} + +#[derive(Debug, Default)] +struct TaskTimestamps { + dropped_at: Option, + last_wake: Option, +} + +#[derive(Debug, Default)] +struct PollStats { + /// The number of polls in progress + current_polls: AtomicUsize, + /// The total number of polls + polls: AtomicUsize, + timestamps: Mutex, +} + +#[derive(Debug, Default)] +struct PollTimestamps { + first_poll: Option, + last_poll_started: Option, + last_poll_ended: Option, + busy_time: Duration, + histogram: Option>, +} + +impl TaskStats { + pub(crate) fn new(created_at: SystemTime) -> Self { + // significant figures should be in the [0-5] range and memory usage + // grows exponentially with higher a sigfig + let poll_times_histogram = Histogram::::new(2).unwrap(); + Self { + is_dirty: AtomicBool::new(true), + is_dropped: AtomicBool::new(false), + created_at, + timestamps: Mutex::new(TaskTimestamps::default()), + poll_stats: PollStats { + timestamps: Mutex::new(PollTimestamps { + histogram: Some(poll_times_histogram), + ..Default::default() + }), + ..Default::default() + }, + wakes: AtomicUsize::new(0), + waker_clones: AtomicUsize::new(0), + waker_drops: AtomicUsize::new(0), + self_wakes: AtomicUsize::new(0), + } + } + + pub(crate) fn record_wake_op(&self, op: crate::WakeOp, at: SystemTime) { + use crate::WakeOp; + match op { + WakeOp::Clone => { + self.waker_clones.fetch_add(1, Release); + } + WakeOp::Drop => { + self.waker_drops.fetch_add(1, Release); + } + WakeOp::WakeByRef { self_wake } => self.wake(at, self_wake), + WakeOp::Wake { self_wake } => { + // Note: `Waker::wake` does *not* call the `drop` + // implementation, so waking by value doesn't + // trigger a drop event. so, count this as a `drop` + // to ensure the task's number of wakers can be + // calculated as `clones` - `drops`. + // + // see + // https://github.com/rust-lang/rust/blob/673d0db5e393e9c64897005b470bfeb6d5aec61b/library/core/src/task/wake.rs#L211-L212 + self.waker_drops.fetch_add(1, Release); + + self.wake(at, self_wake) + } + } + self.make_dirty(); + } + + fn wake(&self, at: SystemTime, self_wake: bool) { + let mut timestamps = self.timestamps.lock(); + timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); + self.wakes.fetch_add(1, Release); + + if self_wake { + self.wakes.fetch_add(1, Release); + } + } + + pub(crate) fn start_poll(&self, at: SystemTime) { + self.poll_stats.start_poll(at); + self.make_dirty(); + } + + pub(crate) fn end_poll(&self, at: SystemTime) { + self.poll_stats.end_poll(at); + self.make_dirty(); + } + + pub(crate) fn drop_task(&self, dropped_at: SystemTime) { + if self.is_dropped.swap(true, AcqRel) { + // The task was already dropped. + // TODO(eliza): this could maybe panic in debug mode... + return; + } + + let mut timestamps = self.timestamps.lock(); + let _prev = timestamps.dropped_at.replace(dropped_at); + debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!"); + self.make_dirty(); + } + + #[inline] + fn make_dirty(&self) { + self.is_dirty.swap(true, AcqRel); + } + + pub(crate) fn serialize_histogram(&self) -> Option> { + let poll_timestamps = self.poll_stats.timestamps.lock(); + let histogram = poll_timestamps.histogram.as_ref()?; + let mut serializer = V2Serializer::new(); + let mut buf = Vec::new(); + serializer.serialize(histogram, &mut buf).ok()?; + Some(buf) + } +} + +impl ToProto for TaskStats { + type Output = proto::tasks::Stats; + + fn to_proto(&self) -> Self::Output { + let poll_stats = Some(self.poll_stats.to_proto()); + let timestamps = self.timestamps.lock(); + proto::tasks::Stats { + poll_stats, + created_at: Some(self.created_at.into()), + dropped_at: timestamps.dropped_at.map(Into::into), + wakes: self.wakes.load(Acquire) as u64, + waker_clones: self.waker_clones.load(Acquire) as u64, + self_wakes: self.self_wakes.load(Acquire) as u64, + waker_drops: self.waker_drops.load(Acquire) as u64, + last_wake: timestamps.last_wake.map(Into::into), + } + } +} + +impl Unsent for TaskStats { + #[inline] + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } + + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) + } +} + +impl DroppedAt for TaskStats { + fn dropped_at(&self) -> Option { + // avoid acquiring the lock if we know we haven't tried to drop this + // thing yet + if self.is_dropped.load(Acquire) { + return self.timestamps.lock().dropped_at; + } + + None + } +} + +// === impl AsyncOpStats === + +impl AsyncOpStats { + pub(crate) fn new( + created_at: SystemTime, + inherit_child_attributes: bool, + parent_id: Option, + ) -> Self { + Self { + task_id: AtomicU64::new(0), + stats: ResourceStats::new(created_at, inherit_child_attributes, parent_id), + poll_stats: PollStats::default(), + } + } + + pub(crate) fn task_id(&self) -> Option { + let id = self.task_id.load(Acquire); + if id > 0 { + Some(id as u64) + } else { + None + } + } + + pub(crate) fn set_task_id(&self, id: &tracing::span::Id) { + self.task_id.store(id.into_u64(), Release); + self.make_dirty(); + } + + pub(crate) fn drop_async_op(&self, dropped_at: SystemTime) { + self.stats.drop_resource(dropped_at) + } + + pub(crate) fn start_poll(&self, at: SystemTime) { + self.poll_stats.start_poll(at); + self.make_dirty(); + } + + pub(crate) fn end_poll(&self, at: SystemTime) { + self.poll_stats.end_poll(at); + self.make_dirty(); + } + + #[inline] + fn make_dirty(&self) { + self.stats.make_dirty() + } +} + +impl Unsent for AsyncOpStats { + #[inline] + fn take_unsent(&self) -> bool { + self.stats.take_unsent() + } + + #[inline] + fn is_unsent(&self) -> bool { + self.stats.is_unsent() + } +} + +impl DroppedAt for AsyncOpStats { + fn dropped_at(&self) -> Option { + self.stats.dropped_at() + } +} + +impl ToProto for AsyncOpStats { + type Output = proto::async_ops::Stats; + + fn to_proto(&self) -> Self::Output { + let attributes = self.stats.attributes.lock().values().cloned().collect(); + proto::async_ops::Stats { + poll_stats: Some(self.poll_stats.to_proto()), + created_at: Some(self.stats.created_at.into()), + dropped_at: self.stats.dropped_at.lock().map(Into::into), + task_id: self.task_id().map(Into::into), + attributes, + } + } +} + +// === impl ResourceStats === + +impl ResourceStats { + pub(crate) fn new( + created_at: SystemTime, + inherit_child_attributes: bool, + parent_id: Option, + ) -> Self { + Self { + is_dirty: AtomicBool::new(true), + is_dropped: AtomicBool::new(false), + created_at, + dropped_at: Mutex::new(None), + attributes: Default::default(), + inherit_child_attributes, + parent_id, + } + } + + pub(crate) fn update_attribute(&self, id: &Id, update: &attribute::Update) { + self.attributes.lock().update(id, update); + self.make_dirty(); + } + + #[inline] + pub(crate) fn drop_resource(&self, dropped_at: SystemTime) { + if self.is_dropped.swap(true, AcqRel) { + // The task was already dropped. + // TODO(eliza): this could maybe panic in debug mode... + return; + } + + let mut timestamp = self.dropped_at.lock(); + let _prev = timestamp.replace(dropped_at); + debug_assert_eq!( + _prev, None, + "tried to drop a resource/async op twice; this is a bug!" + ); + self.make_dirty(); + } + + #[inline] + fn make_dirty(&self) { + self.is_dirty.swap(true, AcqRel); + } +} + +impl Unsent for ResourceStats { + #[inline] + fn take_unsent(&self) -> bool { + self.is_dirty.swap(false, AcqRel) + } + + fn is_unsent(&self) -> bool { + self.is_dirty.load(Acquire) + } +} + +impl DroppedAt for ResourceStats { + fn dropped_at(&self) -> Option { + // avoid acquiring the lock if we know we haven't tried to drop this + // thing yet + if self.is_dropped.load(Acquire) { + return *self.dropped_at.lock(); + } + + None + } +} + +impl ToProto for ResourceStats { + type Output = proto::resources::Stats; + + fn to_proto(&self) -> Self::Output { + let attributes = self.attributes.lock().values().cloned().collect(); + proto::resources::Stats { + created_at: Some(self.created_at.into()), + dropped_at: self.dropped_at.lock().map(Into::into), + attributes, + } + } +} + +// === impl PollStats === + +impl PollStats { + fn start_poll(&self, at: SystemTime) { + if self.current_polls.fetch_add(1, AcqRel) == 0 { + // We are starting the first poll + let mut timestamps = self.timestamps.lock(); + if timestamps.first_poll.is_none() { + timestamps.first_poll = Some(at); + } + + timestamps.last_poll_started = Some(at); + + self.polls.fetch_add(1, Release); + } + } + + fn end_poll(&self, at: SystemTime) { + if self.current_polls.fetch_sub(1, AcqRel) == 1 { + // We are ending the last current poll + let mut timestamps = self.timestamps.lock(); + let last_poll_started = timestamps.last_poll_started; + debug_assert!(last_poll_started.is_some(), "must have started a poll before ending a poll; this is a `console-subscriber` bug!"); + timestamps.last_poll_ended = Some(at); + let elapsed = last_poll_started.and_then(|started| at.duration_since(started).ok()); + debug_assert!(elapsed.is_some(), "the current poll must have started before it ended; this is a `console-subscriber` bug!"); + if let Some(elapsed) = elapsed { + // if we have a poll time histogram, add the timestamp + if let Some(ref mut histogram) = timestamps.histogram { + let elapsed_ns = elapsed.as_nanos().try_into().unwrap_or(u64::MAX); + histogram + .record(elapsed_ns) + .expect("failed to record histogram for some kind of reason"); + } + + timestamps.busy_time += elapsed; + } + } + } +} + +impl ToProto for PollStats { + type Output = proto::PollStats; + + fn to_proto(&self) -> Self::Output { + let timestamps = self.timestamps.lock(); + proto::PollStats { + polls: self.polls.load(Acquire) as u64, + first_poll: timestamps.first_poll.map(Into::into), + last_poll_started: timestamps.last_poll_started.map(Into::into), + last_poll_ended: timestamps.last_poll_ended.map(Into::into), + busy_time: Some(timestamps.busy_time.into()), + } + } +} + +// === impl Arc === + +impl DroppedAt for Arc { + fn dropped_at(&self) -> Option { + T::dropped_at(self) + } +} + +impl Unsent for Arc { + fn take_unsent(&self) -> bool { + T::take_unsent(self) + } + + fn is_unsent(&self) -> bool { + T::is_unsent(self) + } +} + +impl ToProto for Arc { + type Output = T::Output; + fn to_proto(&self) -> T::Output { + T::to_proto(self) + } +} diff --git a/console-subscriber/src/sync.rs b/console-subscriber/src/sync.rs index cff615aaf..c2907e325 100644 --- a/console-subscriber/src/sync.rs +++ b/console-subscriber/src/sync.rs @@ -2,7 +2,7 @@ #![allow(dead_code, unused_imports)] #[cfg(feature = "parking_lot")] -pub(crate) use parking_lot_crate::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub(crate) use parking_lot_crate::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; #[cfg(not(feature = "parking_lot"))] pub(crate) use self::std_impl::*; @@ -10,7 +10,22 @@ pub(crate) use self::std_impl::*; #[cfg(not(feature = "parking_lot"))] mod std_impl { use std::sync::{self, PoisonError, TryLockError}; - pub use std::sync::{RwLockReadGuard, RwLockWriteGuard}; + pub use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard}; + + #[derive(Debug, Default)] + pub(crate) struct Mutex(sync::Mutex); + + impl Mutex { + pub(crate) fn new(data: T) -> Self { + Self(sync::Mutex::new(data)) + } + } + + impl Mutex { + pub(crate) fn lock(&self) -> MutexGuard<'_, T> { + self.0.lock().unwrap_or_else(PoisonError::into_inner) + } + } #[derive(Debug, Default)] pub(crate) struct RwLock(sync::RwLock); diff --git a/console-subscriber/src/visitors.rs b/console-subscriber/src/visitors.rs index f0027094b..20458e6b0 100644 --- a/console-subscriber/src/visitors.rs +++ b/console-subscriber/src/visitors.rs @@ -2,7 +2,7 @@ //! fields from tracing metadata and producing the parts //! needed to construct `Event` instances. -use super::{AttributeUpdate, AttributeUpdateOp, WakeOp}; +use super::{attribute, WakeOp}; use console_api as proto; use proto::resources::resource; use tracing_core::{ @@ -162,7 +162,7 @@ pub(crate) struct StateUpdateVisitor { meta_id: proto::MetaId, field: Option, unit: Option, - op: Option, + op: Option, } impl ResourceVisitor { @@ -456,8 +456,8 @@ impl StateUpdateVisitor { } } - pub(crate) fn result(self) -> Option { - Some(AttributeUpdate { + pub(crate) fn result(self) -> Option { + Some(attribute::Update { field: self.field?, op: self.op, unit: self.unit, @@ -517,9 +517,9 @@ impl Visit for StateUpdateVisitor { fn record_str(&mut self, field: &field::Field, value: &str) { if field.name().ends_with(Self::STATE_OP_SUFFIX) { match value { - Self::OP_ADD => self.op = Some(AttributeUpdateOp::Add), - Self::OP_SUB => self.op = Some(AttributeUpdateOp::Sub), - Self::OP_OVERRIDE => self.op = Some(AttributeUpdateOp::Override), + Self::OP_ADD => self.op = Some(attribute::UpdateOp::Add), + Self::OP_SUB => self.op = Some(attribute::UpdateOp::Sub), + Self::OP_OVERRIDE => self.op = Some(attribute::UpdateOp::Override), _ => {} }; } else if field.name().ends_with(Self::STATE_UNIT_SUFFIX) {