diff --git a/HANDBOOK.md b/HANDBOOK.md index 8b9a226..482100c 100755 --- a/HANDBOOK.md +++ b/HANDBOOK.md @@ -57,48 +57,94 @@ Timer's internal precision is microseconds but can be scaled down on output. extern crate dipstick; use dipstick::*; fn main() { - let app_metrics = Stream::to_stdout().metrics(); - let timer = app_metrics.timer("my_timer"); - time!(timer, {/* slow code here */} ); - timer.time(|| {/* slow code here */} ); + let metrics = Stream::to_stdout().metrics(); + let timer = metrics.timer("my_timer"); - let start = timer.start(); - /* slow code here */ - timer.stop(start); + // using macro + time!(timer, {/* timed code here ... */} ); + // using closure + timer.time(|| {/* timed code here ... */} ); + + // using start/stop + let handle = timer.start(); + /* timed code here ... */ + timer.stop(handle); + + // directly reporting microseconds timer.interval_us(123_456); } ``` ### Level -Relative quantity counter. Accepts positive and negative cumulative values. -If aggregated, observed minimum and maximum track the sum of values rather than individual values as for `Counter`. +A relative, cumulative quantity counter. Accepts positive and negative values. +If aggregated, observed minimum and maximum track the _sum_ of values (as opposed to `Counter` min and max _individual_ values). ### Gauge An instant observation of a resource's value (positive or negative, but non-cumulative). -The observation of Gauges is not automatic and must be performed programmatically as with other metric types. +The observation of Gauges can be performed programmatically as with other metric types or +it can be triggered automatically, either on schedule or upon flushing the scope: + +````rust +extern crate dipstick; +use dipstick::*; +use std::time::Duration; + +fn main() { + let metrics = Stream::to_stdout().metrics(); + let uptime = metrics.gauge("uptime"); + + // report gauge value programmatically + uptime.value(2); + + // observe a constant value before each flush + let uptime = metrics.gauge("uptime"); + metrics.observe(uptime, || 6).on_flush(); + + // observe a function-provided value periodically + metrics + .observe(metrics.gauge("threads"), thread_count) + .every(Duration::from_secs(1)); +} + +fn thread_count() -> MetricValue { + 6 +} +```` ### Names Each metric must be given a name upon creation. Names are opaque to the application and are used only to identify the metrics upon output. -Names may be prepended with a namespace by each configured backend. -Aggregated statistics may also append identifiers to the metric's name. - -Names should exclude characters that can interfere with namespaces, separator and output protocols. -A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters. +Names may be prepended with a application-namespace shared across all backends. ```rust extern crate dipstick; use dipstick::*; fn main() { - let app_metrics = Stream::to_stdout().metrics(); - let db_metrics = app_metrics.named("database"); - let _db_timer = db_metrics.timer("db_timer"); - let _db_counter = db_metrics.counter("db_counter"); + let metrics = Stream::to_stdout().metrics(); + + // plainly name "timer" + let _timer = metrics.timer("timer"); + + // prepend namespace + let db_metrics = metrics.named("database"); + + // qualified name will be "database.counter" + let _db_counter = db_metrics.counter("counter"); } ``` +Names may be prepended with a namespace by each configured backend. +For example, the same metric `request.success` could appear under different qualified names: +- logging as `app_module.request.success` +- statsd as `environment.hostname.pid.module.request.success` + +Aggregation statistics may also append identifiers to the metric's name, such as `counter_mean` or `marker_rate`. + +Names should exclude characters that can interfere with namespaces, separator and output protocols. +A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters. + ### Labels diff --git a/examples/observer.rs b/examples/observer.rs new file mode 100644 index 0000000..506aacf --- /dev/null +++ b/examples/observer.rs @@ -0,0 +1,42 @@ +//! +//! A sample application to demonstrate flush-triggered and scheduled observation of gauge values. +//! +//! This is the expected output: +//! +//! ``` +//! cargo run --example observer +//! process.threads 4 +//! process.uptime 6 +//! process.threads 4 +//! process.uptime 6 +//! ... +//! ``` +//! + +extern crate dipstick; + +use std::time::Duration; + +use dipstick::*; + +fn main() { + let metrics = AtomicBucket::new().named("process"); + metrics.drain(Stream::to_stdout()); + metrics.flush_every(Duration::from_secs(3)); + + let uptime = metrics.gauge("uptime"); + metrics.observe(uptime, || 6).on_flush(); + + metrics + .observe(metrics.gauge("threads"), thread_count) + .every(Duration::from_secs(1)); + + loop { + std::thread::sleep(Duration::from_millis(40)); + } +} + +/// Query number of running threads in this process using Linux's /proc filesystem. +fn thread_count() -> MetricValue { + 4 +} diff --git a/src/bucket/atomic.rs b/src/bucket/atomic.rs index 0d217f1..4bc0372 100755 --- a/src/bucket/atomic.rs +++ b/src/bucket/atomic.rs @@ -2,7 +2,7 @@ use bucket::ScoreType::*; use bucket::{stats_summary, ScoreType}; -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::clock::TimeHandle; use core::error; use core::input::{InputKind, InputMetric, InputScope}; @@ -83,7 +83,7 @@ lazy_static! { } impl InnerAtomicBucket { - pub fn flush(&mut self) -> error::Result<()> { + fn flush(&mut self) -> error::Result<()> { let pub_scope = match self.drain { Some(ref out) => out.output_dyn(), None => read_lock!(DEFAULT_AGGREGATE_OUTPUT).output_dyn(), @@ -110,7 +110,7 @@ impl InnerAtomicBucket { /// Take a snapshot of aggregated values and reset them. /// Compute stats on captured values using assigned or default stats function. /// Write stats to assigned or default output. - pub fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> { + fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> { let now = TimeHandle::now(); let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0; self.period_start = now; @@ -234,7 +234,7 @@ impl AtomicBucket { } /// Revert this bucket's statistics generator to the default stats. - pub fn unset_stat(&self) { + pub fn unset_stats(&self) { write_lock!(self.inner).stats = None } @@ -277,6 +277,7 @@ impl Flush for AtomicBucket { /// Collect and reset aggregated data. /// Publish statistics fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); let mut inner = write_lock!(self.inner); inner.flush() } @@ -327,7 +328,7 @@ impl AtomicScores { } /// Update scores with new value - pub fn update(&self, value: MetricValue) { + pub fn update(&self, value: MetricValue) -> () { // TODO detect & report any concurrent updates / resets for measurement of contention // Count is tracked for all metrics self.scores[HIT].fetch_add(1, Relaxed); @@ -494,7 +495,7 @@ mod test { use bucket::{stats_all, stats_average, stats_summary}; use core::clock::{mock_clock_advance, mock_clock_reset}; - use output::map::StatsMap; + use output::map::StatsMapScope; use std::collections::BTreeMap; use std::time::Duration; @@ -535,7 +536,7 @@ mod test { mock_clock_advance(Duration::from_secs(3)); - let map = StatsMap::default(); + let map = StatsMapScope::default(); metrics.flush_to(&map).unwrap(); map.into() } diff --git a/src/cache/cache_in.rs b/src/cache/cache_in.rs index 07641f3..2d2709d 100755 --- a/src/cache/cache_in.rs +++ b/src/cache/cache_in.rs @@ -1,7 +1,7 @@ //! Metric input scope caching. use cache::lru_cache as lru; -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -102,6 +102,7 @@ impl InputScope for InputScopeCache { impl Flush for InputScopeCache { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); self.target.flush() } } diff --git a/src/cache/cache_out.rs b/src/cache/cache_out.rs index ce7ba13..5d2d90b 100755 --- a/src/cache/cache_out.rs +++ b/src/cache/cache_out.rs @@ -1,7 +1,7 @@ //! Metric output scope caching. use cache::lru_cache as lru; -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::InputKind; use core::name::MetricName; @@ -105,6 +105,7 @@ impl OutputScope for OutputScopeCache { impl Flush for OutputScopeCache { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); self.target.flush() } } diff --git a/src/core/attributes.rs b/src/core/attributes.rs index 00c06ce..927ee55 100755 --- a/src/core/attributes.rs +++ b/src/core/attributes.rs @@ -3,6 +3,18 @@ use std::default::Default; use std::sync::Arc; use core::name::{MetricName, NameParts}; +use core::scheduler::SCHEDULER; +use std::fmt; +use std::time::Duration; +use MetricValue; +use {CancelHandle, Flush}; +use {Gauge, InputScope}; + +#[cfg(not(feature = "parking_lot"))] +use std::sync::RwLock; + +#[cfg(feature = "parking_lot")] +use parking_lot::RwLock; /// The actual distribution (random, fixed-cycled, etc) depends on selected sampling method. #[derive(Debug, Clone, Copy)] @@ -45,13 +57,25 @@ impl Default for Buffering { } } +type Shared = Arc>; + /// Attributes common to metric components. /// Not all attributes used by all components. -#[derive(Debug, Clone, Default)] +#[derive(Clone, Default)] pub struct Attributes { naming: NameParts, sampling: Sampling, buffering: Buffering, + flush_listeners: Shared () + Send + Sync + 'static>>>, + tasks: Shared>, +} + +impl fmt::Debug for Attributes { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "naming: {:?}", self.naming)?; + write!(f, "sampling: {:?}", self.sampling)?; + write!(f, "buffering: {:?}", self.buffering) + } } /// This trait should not be exposed outside the crate. @@ -71,6 +95,84 @@ pub trait WithAttributes: Clone { } } +/// Register and notify scope-flush listeners +pub trait OnFlush { + /// Notify registered listeners of an impending flush. + fn notify_flush_listeners(&self); +} + +impl OnFlush for T +where + T: Flush + WithAttributes, +{ + fn notify_flush_listeners(&self) { + for listener in read_lock!(self.get_attributes().flush_listeners).iter() { + (listener)() + } + } +} + +pub struct ObserveWhen<'a, T, F> { + target: &'a T, + gauge: Gauge, + operation: Arc, +} + +impl<'a, T, F> ObserveWhen<'a, T, F> +where + F: Fn() -> MetricValue + Send + Sync + 'static, + T: InputScope + WithAttributes + Send + Sync, +{ + /// Observe the metric's value upon flushing the scope. + pub fn on_flush(self) { + let gauge = self.gauge; + let op = self.operation; + write_lock!(self.target.get_attributes().flush_listeners) + .push(Arc::new(move || gauge.value(op()))); + } + + /// Observe the metric's value periodically. + pub fn every(self, period: Duration) -> CancelHandle { + let gauge = self.gauge; + let op = self.operation; + let handle = SCHEDULER.schedule(period, move || gauge.value(op())); + write_lock!(self.target.get_attributes().tasks).push(handle.clone()); + handle + } +} + +/// Schedule a recurring task +pub trait Observe { + /// Provide a source for a metric's values. + fn observe(&self, gauge: Gauge, operation: F) -> ObserveWhen + where + F: Fn() -> MetricValue + Send + Sync + 'static, + Self: Sized; +} + +impl Observe for T { + fn observe(&self, gauge: Gauge, operation: F) -> ObserveWhen + where + F: Fn() -> MetricValue + Send + Sync + 'static, + Self: Sized, + { + ObserveWhen { + target: self, + gauge, + operation: Arc::new(operation), + } + } +} + +impl Drop for Attributes { + fn drop(&mut self) { + let mut tasks = write_lock!(self.tasks); + for task in tasks.drain(..) { + task.cancel() + } + } +} + /// Name operations support. pub trait Prefixed { /// Returns namespace of component. @@ -78,7 +180,7 @@ pub trait Prefixed { /// Append a name to the existing names. /// Return a clone of the component with the updated names. - #[deprecated(since = "0.7.2", note = "Use add_name()")] + #[deprecated(since = "0.7.2", note = "Use named() or add_name()")] fn add_prefix>(&self, name: S) -> Self; /// Append a name to the existing names. @@ -116,12 +218,10 @@ impl Prefixed for T { &self.get_attributes().naming } - /// Replace any existing names with a single name. - /// Return a clone of the component with the new name. - /// If multiple names are required, `add_name` may also be used. - fn named>(&self, name: S) -> Self { - let parts = NameParts::from(name); - self.with_attributes(|new_attr| new_attr.naming = parts.clone()) + /// Append a name to the existing names. + /// Return a clone of the component with the updated names. + fn add_prefix>(&self, name: S) -> Self { + self.add_name(name) } /// Append a name to the existing names. @@ -131,10 +231,12 @@ impl Prefixed for T { self.with_attributes(|new_attr| new_attr.naming.push_back(name.clone())) } - /// Append a name to the existing names. - /// Return a clone of the component with the updated names. - fn add_prefix>(&self, name: S) -> Self { - self.add_name(name) + /// Replace any existing names with a single name. + /// Return a clone of the component with the new name. + /// If multiple names are required, `add_name` may also be used. + fn named>(&self, name: S) -> Self { + let parts = NameParts::from(name); + self.with_attributes(|new_attr| new_attr.naming = parts.clone()) } } @@ -171,3 +273,22 @@ pub trait Buffered: WithAttributes { !(self.get_attributes().buffering == Buffering::Unbuffered) } } + +#[cfg(test)] +mod test { + use core::attributes::*; + use core::input::Input; + use core::input::*; + use core::Flush; + use output::map::StatsMap; + use StatsMapScope; + + #[test] + fn on_flush() { + let metrics: StatsMapScope = StatsMap::default().metrics(); + let gauge = metrics.gauge("my_gauge"); + metrics.observe(gauge, || 4).on_flush(); + metrics.flush().unwrap(); + assert_eq!(Some(&4), metrics.into_map().get("my_gauge")) + } +} diff --git a/src/core/input.rs b/src/core/input.rs index 28a239e..04468d8 100755 --- a/src/core/input.rs +++ b/src/core/input.rs @@ -1,269 +1,269 @@ -use core::clock::TimeHandle; -use core::label::Labels; -use core::name::MetricName; -use core::{Flush, MetricValue}; - -use std::fmt; -use std::sync::Arc; - -// TODO maybe define an 'AsValue' trait + impl for supported number types, then drop 'num' crate -pub use num::integer; -pub use num::ToPrimitive; - -/// A function trait that opens a new metric capture scope. -pub trait Input: Send + Sync + 'static + InputDyn { - /// The type of Scope returned byt this input. - type SCOPE: InputScope + Send + Sync + 'static; - - /// Open a new scope from this input. - fn metrics(&self) -> Self::SCOPE; - - /// Open a new scope from this input. - #[deprecated(since = "0.7.2", note = "Use metrics()")] - fn input(&self) -> Self::SCOPE { - self.metrics() - } -} - -/// A function trait that opens a new metric capture scope. -pub trait InputDyn: Send + Sync + 'static { - /// Open a new scope from this output. - fn input_dyn(&self) -> Arc; -} - -/// Blanket impl of dyn input trait -impl InputDyn for T { - fn input_dyn(&self) -> Arc { - Arc::new(self.metrics()) - } -} - -/// InputScope -/// Define metrics, write values and flush them. -pub trait InputScope: Flush { - /// Define a generic metric of the specified type. - /// It is preferable to use counter() / marker() / timer() / gauge() methods. - fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric; - - /// Define a Counter. - fn counter(&self, name: &str) -> Counter { - self.new_metric(name.into(), InputKind::Counter).into() - } - - /// Define a Marker. - fn marker(&self, name: &str) -> Marker { - self.new_metric(name.into(), InputKind::Marker).into() - } - - /// Define a Timer. - fn timer(&self, name: &str) -> Timer { - self.new_metric(name.into(), InputKind::Timer).into() - } - - /// Define a Gauge. - fn gauge(&self, name: &str) -> Gauge { - self.new_metric(name.into(), InputKind::Gauge).into() - } - - /// Define a Level. - fn level(&self, name: &str) -> Level { - self.new_metric(name.into(), InputKind::Level).into() - } -} - -/// A metric is actually a function that knows to write a metric value to a metric output. -#[derive(Clone)] -pub struct InputMetric { - inner: Arc, -} - -impl fmt::Debug for InputMetric { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "InputMetric") - } -} - -impl InputMetric { - /// Utility constructor - pub fn new(metric: F) -> InputMetric { - InputMetric { - inner: Arc::new(metric), - } - } - - /// Collect a new value for this metric. - #[inline] - pub fn write(&self, value: MetricValue, labels: Labels) { - (self.inner)(value, labels) - } -} - -/// Used to differentiate between metric kinds in the backend. -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub enum InputKind { - /// Handling one item at a time. - Marker, - /// Handling cumulative observed quantities. - Counter, - /// Handling quantity fluctuations. - Level, - /// Reporting instant measurement of a resource at a point in time (non-cumulative). - Gauge, - /// Measuring a time interval, internal to the app or provided by an external source. - Timer, -} - -/// Used by the metrics! macro to obtain the InputKind from the stringified type. -impl<'a> From<&'a str> for InputKind { - fn from(s: &str) -> InputKind { - match s { - "Marker" => InputKind::Marker, - "Counter" => InputKind::Counter, - "Gauge" => InputKind::Gauge, - "Timer" => InputKind::Timer, - "Level" => InputKind::Level, - _ => panic!("No InputKind '{}' defined", s), - } - } -} - -/// A monotonic counter metric. -/// Since value is only ever increased by one, no value parameter is provided, -/// preventing programming errors. -#[derive(Debug, Clone)] -pub struct Marker { - inner: InputMetric, -} - -impl Marker { - /// Record a single event occurence. - pub fn mark(&self) { - self.inner.write(1, labels![]) - } -} - -/// A counter of absolute observed values (non-negative amounts). -/// Used to count things that cannot be undone: -/// - Bytes sent -/// - Records written -/// - Apples eaten -/// For relative (possibly negative) values, the `Level` counter type can be used. -/// If aggregated, minimum and maximum scores will track the collected values, not their sum. -#[derive(Debug, Clone)] -pub struct Counter { - inner: InputMetric, -} - -impl Counter { - /// Record a value count. - pub fn count(&self, count: usize) { - self.inner.write(count as isize, labels![]) - } -} - -/// A counter of fluctuating resources accepting positive and negative values. -/// Can be used as a stateful `Gauge` or as a `Counter` of possibly decreasing amounts. -/// - Size of messages in a queue -/// - Strawberries on a conveyor belt -/// If aggregated, minimum and maximum scores will track the sum of values, not the collected values themselves. -#[derive(Debug, Clone)] -pub struct Level { - inner: InputMetric, -} - -impl Level { - /// Record a positive or negative value count - pub fn adjust(&self, count: V) { - self.inner.write(count.to_isize().unwrap(), labels![]) - } -} - -/// A gauge that sends values to the metrics backend -#[derive(Debug, Clone)] -pub struct Gauge { - inner: InputMetric, -} - -impl Gauge { - /// Record a value point for this gauge. - pub fn value(&self, value: V) { - self.inner.write(value.to_isize().unwrap(), labels![]) - } -} - -/// A timer that sends values to the metrics backend -/// Timers can record time intervals in multiple ways : -/// - with the time! macrohich wraps an expression or block with start() and stop() calls. -/// - with the time(Fn) methodhich wraps a closure with start() and stop() calls. -/// - with start() and stop() methodsrapping around the operation to time -/// - with the interval_us() method, providing an externally determined microsecond interval -#[derive(Debug, Clone)] -pub struct Timer { - inner: InputMetric, -} - -impl Timer { - /// Record a microsecond interval for this timer - /// Can be used in place of start()/stop() if an external time interval source is used - pub fn interval_us(&self, interval_us: u64) -> u64 { - self.inner.write(interval_us as isize, labels![]); - interval_us - } - - /// Obtain a opaque handle to the current time. - /// The handle is passed back to the stop() method to record a time interval. - /// This is actually a convenience method to the TimeHandle::now() - /// Beware, handles obtained here are not bound to this specific timer instance - /// _for now_ but might be in the future for safety. - /// If you require safe multi-timer handles, get them through TimeType::now() - pub fn start(&self) -> TimeHandle { - TimeHandle::now() - } - - /// Record the time elapsed since the start_time handle was obtained. - /// This call can be performed multiple times using the same handle, - /// reporting distinct time intervals each time. - /// Returns the microsecond interval value that was recorded. - pub fn stop(&self, start_time: TimeHandle) -> MetricValue { - let elapsed_us = start_time.elapsed_us(); - self.interval_us(elapsed_us) as isize - } - - /// Record the time taken to execute the provided closure - pub fn time R, R>(&self, operations: F) -> R { - let start_time = self.start(); - let value: R = operations(); - self.stop(start_time); - value - } -} - -impl From for Gauge { - fn from(metric: InputMetric) -> Gauge { - Gauge { inner: metric } - } -} - -impl From for Timer { - fn from(metric: InputMetric) -> Timer { - Timer { inner: metric } - } -} - -impl From for Counter { - fn from(metric: InputMetric) -> Counter { - Counter { inner: metric } - } -} - -impl From for Marker { - fn from(metric: InputMetric) -> Marker { - Marker { inner: metric } - } -} - -impl From for Level { - fn from(metric: InputMetric) -> Level { - Level { inner: metric } - } -} +use core::clock::TimeHandle; +use core::label::Labels; +use core::name::MetricName; +use core::{Flush, MetricValue}; + +use std::fmt; +use std::sync::Arc; + +// TODO maybe define an 'AsValue' trait + impl for supported number types, then drop 'num' crate +pub use num::integer; +pub use num::ToPrimitive; + +/// A function trait that opens a new metric capture scope. +pub trait Input: Send + Sync + 'static + InputDyn { + /// The type of Scope returned byt this input. + type SCOPE: InputScope + Send + Sync + 'static; + + /// Open a new scope from this input. + fn metrics(&self) -> Self::SCOPE; + + /// Open a new scope from this input. + #[deprecated(since = "0.7.2", note = "Use metrics()")] + fn input(&self) -> Self::SCOPE { + self.metrics() + } +} + +/// A function trait that opens a new metric capture scope. +pub trait InputDyn: Send + Sync + 'static { + /// Open a new scope from this output. + fn input_dyn(&self) -> Arc; +} + +/// Blanket impl of dyn input trait +impl InputDyn for T { + fn input_dyn(&self) -> Arc { + Arc::new(self.metrics()) + } +} + +/// InputScope +/// Define metrics, write values and flush them. +pub trait InputScope: Flush { + /// Define a generic metric of the specified type. + /// It is preferable to use counter() / marker() / timer() / gauge() methods. + fn new_metric(&self, name: MetricName, kind: InputKind) -> InputMetric; + + /// Define a Counter. + fn counter(&self, name: &str) -> Counter { + self.new_metric(name.into(), InputKind::Counter).into() + } + + /// Define a Marker. + fn marker(&self, name: &str) -> Marker { + self.new_metric(name.into(), InputKind::Marker).into() + } + + /// Define a Timer. + fn timer(&self, name: &str) -> Timer { + self.new_metric(name.into(), InputKind::Timer).into() + } + + /// Define a Gauge. + fn gauge(&self, name: &str) -> Gauge { + self.new_metric(name.into(), InputKind::Gauge).into() + } + + /// Define a Level. + fn level(&self, name: &str) -> Level { + self.new_metric(name.into(), InputKind::Level).into() + } +} + +/// A metric is actually a function that knows to write a metric value to a metric output. +#[derive(Clone)] +pub struct InputMetric { + inner: Arc, +} + +impl fmt::Debug for InputMetric { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "InputMetric") + } +} + +impl InputMetric { + /// Utility constructor + pub fn new(metric: F) -> InputMetric { + InputMetric { + inner: Arc::new(metric), + } + } + + /// Collect a new value for this metric. + #[inline] + pub fn write(&self, value: MetricValue, labels: Labels) { + (self.inner)(value, labels) + } +} + +/// Used to differentiate between metric kinds in the backend. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum InputKind { + /// Handling one item at a time. + Marker, + /// Handling cumulative observed quantities. + Counter, + /// Handling quantity fluctuations. + Level, + /// Reporting instant measurement of a resource at a point in time (non-cumulative). + Gauge, + /// Measuring a time interval, internal to the app or provided by an external source. + Timer, +} + +/// Used by the metrics! macro to obtain the InputKind from the stringified type. +impl<'a> From<&'a str> for InputKind { + fn from(s: &str) -> InputKind { + match s { + "Marker" => InputKind::Marker, + "Counter" => InputKind::Counter, + "Gauge" => InputKind::Gauge, + "Timer" => InputKind::Timer, + "Level" => InputKind::Level, + _ => panic!("No InputKind '{}' defined", s), + } + } +} + +/// A monotonic counter metric. +/// Since value is only ever increased by one, no value parameter is provided, +/// preventing programming errors. +#[derive(Debug, Clone)] +pub struct Marker { + inner: InputMetric, +} + +impl Marker { + /// Record a single event occurence. + pub fn mark(&self) { + self.inner.write(1, labels![]) + } +} + +/// A counter of absolute observed values (non-negative amounts). +/// Used to count things that cannot be undone: +/// - Bytes sent +/// - Records written +/// - Apples eaten +/// For relative (possibly negative) values, the `Level` counter type can be used. +/// If aggregated, minimum and maximum scores will track the collected values, not their sum. +#[derive(Debug, Clone)] +pub struct Counter { + inner: InputMetric, +} + +impl Counter { + /// Record a value count. + pub fn count(&self, count: usize) { + self.inner.write(count as isize, labels![]) + } +} + +/// A counter of fluctuating resources accepting positive and negative values. +/// Can be used as a stateful `Gauge` or as a `Counter` of possibly decreasing amounts. +/// - Size of messages in a queue +/// - Strawberries on a conveyor belt +/// If aggregated, minimum and maximum scores will track the sum of values, not the collected values themselves. +#[derive(Debug, Clone)] +pub struct Level { + inner: InputMetric, +} + +impl Level { + /// Record a positive or negative value count + pub fn adjust(&self, count: V) { + self.inner.write(count.to_isize().unwrap(), labels![]) + } +} + +/// A gauge that sends values to the metrics backend +#[derive(Debug, Clone)] +pub struct Gauge { + inner: InputMetric, +} + +impl Gauge { + /// Record a value point for this gauge. + pub fn value(&self, value: V) { + self.inner.write(value.to_isize().unwrap(), labels![]) + } +} + +/// A timer that sends values to the metrics backend +/// Timers can record time intervals in multiple ways : +/// - with the time! macrohich wraps an expression or block with start() and stop() calls. +/// - with the time(Fn) methodhich wraps a closure with start() and stop() calls. +/// - with start() and stop() methodsrapping around the operation to time +/// - with the interval_us() method, providing an externally determined microsecond interval +#[derive(Debug, Clone)] +pub struct Timer { + inner: InputMetric, +} + +impl Timer { + /// Record a microsecond interval for this timer + /// Can be used in place of start()/stop() if an external time interval source is used + pub fn interval_us(&self, interval_us: u64) -> u64 { + self.inner.write(interval_us as isize, labels![]); + interval_us + } + + /// Obtain a opaque handle to the current time. + /// The handle is passed back to the stop() method to record a time interval. + /// This is actually a convenience method to the TimeHandle::now() + /// Beware, handles obtained here are not bound to this specific timer instance + /// _for now_ but might be in the future for safety. + /// If you require safe multi-timer handles, get them through TimeType::now() + pub fn start(&self) -> TimeHandle { + TimeHandle::now() + } + + /// Record the time elapsed since the start_time handle was obtained. + /// This call can be performed multiple times using the same handle, + /// reporting distinct time intervals each time. + /// Returns the microsecond interval value that was recorded. + pub fn stop(&self, start_time: TimeHandle) -> MetricValue { + let elapsed_us = start_time.elapsed_us(); + self.interval_us(elapsed_us) as isize + } + + /// Record the time taken to execute the provided closure + pub fn time R, R>(&self, operations: F) -> R { + let start_time = self.start(); + let value: R = operations(); + self.stop(start_time); + value + } +} + +impl From for Gauge { + fn from(metric: InputMetric) -> Gauge { + Gauge { inner: metric } + } +} + +impl From for Timer { + fn from(metric: InputMetric) -> Timer { + Timer { inner: metric } + } +} + +impl From for Counter { + fn from(metric: InputMetric) -> Counter { + Counter { inner: metric } + } +} + +impl From for Marker { + fn from(metric: InputMetric) -> Marker { + Marker { inner: metric } + } +} + +impl From for Level { + fn from(metric: InputMetric) -> Level { + Level { inner: metric } + } +} diff --git a/src/core/label.rs b/src/core/label.rs index 458efce..a4b2d87 100644 --- a/src/core/label.rs +++ b/src/core/label.rs @@ -248,10 +248,10 @@ pub mod test { use std::sync::Mutex; - /// Label tests use the globally shared AppLabels which may make them interfere as tests are run concurrently. - /// We do not want to mandate usage of `RUST_TEST_THREADS=1` which would penalize the whole test suite. - /// Instead we use a local mutex to make sure the label tests run in sequence. lazy_static! { + /// Label tests use the globally shared AppLabels which may make them interfere as tests are run concurrently. + /// We do not want to mandate usage of `RUST_TEST_THREADS=1` which would penalize the whole test suite. + /// Instead we use a local mutex to make sure the label tests run in sequence. static ref TEST_SEQUENCE: Mutex<()> = Mutex::new(()); } diff --git a/src/core/locking.rs b/src/core/locking.rs index b269231..b5b1b9d 100755 --- a/src/core/locking.rs +++ b/src/core/locking.rs @@ -2,7 +2,7 @@ //! This makes all outputs also immediately usable as inputs. //! The alternatives are queuing or thread local. -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{Input, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -49,6 +49,7 @@ impl InputScope for LockingOutput { impl Flush for LockingOutput { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); self.inner.lock().expect("LockingOutput").flush() } } diff --git a/src/core/mod.rs b/src/core/mod.rs index ced5902..52a057b 100755 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -18,9 +18,7 @@ pub type MetricValue = isize; /// Both InputScope and OutputScope share the ability to flush the recorded data. pub trait Flush { /// Flush does nothing by default. - fn flush(&self) -> error::Result<()> { - Ok(()) - } + fn flush(&self) -> error::Result<()>; } #[cfg(test)] diff --git a/src/core/proxy.rs b/src/core/proxy.rs index 27c2b56..96523a0 100755 --- a/src/core/proxy.rs +++ b/src/core/proxy.rs @@ -1,6 +1,6 @@ //! Decouple metric definition from configuration with trait objects. -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{InputKind, InputMetric, InputScope}; use core::name::{MetricName, NameParts}; @@ -260,6 +260,7 @@ impl InputScope for Proxy { impl Flush for Proxy { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); write_lock!(self.inner).flush(self.get_prefixes()) } } diff --git a/src/core/scheduler.rs b/src/core/scheduler.rs index eba9303..78ffc97 100644 --- a/src/core/scheduler.rs +++ b/src/core/scheduler.rs @@ -2,11 +2,13 @@ use core::input::InputScope; +use std::cmp::{max, Ordering}; +use std::collections::BinaryHeap; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::SeqCst; -use std::sync::Arc; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; /// A handle to cancel a scheduled task if required. #[derive(Debug, Clone)] @@ -19,7 +21,9 @@ impl CancelHandle { /// Signals the task to stop. pub fn cancel(&self) { - self.0.store(true, SeqCst); + if self.0.swap(true, SeqCst) { + warn!("Scheduled task was already cancelled.") + } } fn is_cancelled(&self) -> bool { @@ -27,43 +31,17 @@ impl CancelHandle { } } -/// Schedule a task to run periodically. -/// Starts a new thread for every task. -/// -/// # Panics -/// -/// Panics if the OS fails to create a thread. -fn set_schedule(every: Duration, operation: F) -> CancelHandle -where - F: Fn() -> () + Send + 'static, -{ - let handle = CancelHandle::new(); - let inner_handle = handle.clone(); - - thread::Builder::new() - .name("dipstick-scheduler".to_string()) - .spawn(move || loop { - thread::sleep(every); - if inner_handle.is_cancelled() { - break; - } - operation(); - }) - .unwrap(); // TODO: Panic, change API to return Result? - handle -} - /// Enable background periodical publication of metrics pub trait ScheduleFlush { - /// Start a thread dedicated to flushing this scope at regular intervals. + /// Flush this scope at regular intervals. fn flush_every(&self, period: Duration) -> CancelHandle; } impl ScheduleFlush for T { - /// Start a thread dedicated to flushing this scope at regular intervals. + /// Flush this scope at regular intervals. fn flush_every(&self, period: Duration) -> CancelHandle { let scope = self.clone(); - set_schedule(period, move || { + SCHEDULER.schedule(period, move || { if let Err(err) = scope.flush() { error!("Could not flush metrics: {}", err); } @@ -71,26 +49,198 @@ impl ScheduleFlush for T { } } -//use std::net::{SocketAddr, ToSocketAddrs}; -// -//use tiny_http::{Server, StatusCode, self}; -// -//pub fn http_serve(addresses: A) -> CancelHandle { -// let handle = CancelHandle::new(); -// let inner_handle = handle.clone(); -// let server = tiny_http::Server::http("0.0.0.0:0")?; -// -// thread::spawn(move || loop { -// match server.recv_timeout(Duration::from_secs(1)) { -// Ok(Some(req)) => { -// let response = tiny_http::Response::new_empty(StatusCode::from(200)); -// if let Err(err) = req.respond(response) { -// warn!("Metrics response error: {}", err) -// } -// } -// Ok(None) => if inner_handle.is_cancelled() { break; } -// Err(err) => warn!("Metrics request error: {}", err) -// }; -// }); -// handle -//} +lazy_static! { + pub static ref SCHEDULER: Scheduler = Scheduler::new(); +} + +struct ScheduledTask { + next_time: Instant, + period: Duration, + handle: CancelHandle, + operation: Arc () + Send + Sync + 'static>, +} + +impl Ord for ScheduledTask { + fn cmp(&self, other: &ScheduledTask) -> Ordering { + other.next_time.cmp(&self.next_time) + } +} + +impl PartialOrd for ScheduledTask { + fn partial_cmp(&self, other: &ScheduledTask) -> Option { + other.next_time.partial_cmp(&self.next_time) + } +} + +impl PartialEq for ScheduledTask { + fn eq(&self, other: &ScheduledTask) -> bool { + self.next_time.eq(&other.next_time) + } +} + +impl Eq for ScheduledTask {} + +pub struct Scheduler { + next_tasks: Arc<(Mutex>, Condvar)>, +} + +pub static MIN_DELAY: Duration = Duration::from_millis(50); + +impl Scheduler { + /// Launch a new scheduler thread. + fn new() -> Self { + let sched: Arc<(Mutex>, Condvar)> = + Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new())); + let sched1 = Arc::downgrade(&sched); + + thread::Builder::new() + .name("dipstick_scheduler".to_string()) + .spawn(move || { + let mut wait_for = MIN_DELAY; + while let Some(sss) = sched1.upgrade() { + let &(ref heap_mutex, ref condvar) = &*sss; + let heap = heap_mutex.lock().unwrap(); + let (mut tasks, _timed_out) = condvar.wait_timeout(heap, wait_for).unwrap(); + 'work: loop { + let now = Instant::now(); + match tasks.peek() { + Some(task) if task.next_time > now => { + // next task is not ready yet, update schedule + wait_for = max(MIN_DELAY, task.next_time - now); + break 'work; + } + None => { + // TODO no tasks left. exit thread? + break 'work; + } + _ => {} + } + if let Some(mut task) = tasks.pop() { + if task.handle.is_cancelled() { + // do not execute, do not reinsert + continue; + } + (task.operation)(); + task.next_time = now + task.period; + tasks.push(task); + } + } + } + }) + .unwrap(); + + Scheduler { next_tasks: sched } + } + + #[cfg(test)] + pub fn task_count(&self) -> usize { + self.next_tasks.0.lock().unwrap().len() + } + + /// Schedule a task to run periodically. + pub fn schedule(&self, period: Duration, operation: F) -> CancelHandle + where + F: Fn() -> () + Send + Sync + 'static, + { + let handle = CancelHandle::new(); + let new_task = ScheduledTask { + next_time: Instant::now() + period, + period, + handle: handle.clone(), + operation: Arc::new(operation), + }; + self.next_tasks.0.lock().unwrap().push(new_task); + self.next_tasks.1.notify_one(); + handle + } +} + +#[cfg(test)] +pub mod test { + use super::*; + use std::sync::atomic::AtomicUsize; + + #[test] + fn schedule_one_and_cancel() { + let trig1a = Arc::new(AtomicUsize::new(0)); + let trig1b = trig1a.clone(); + + let sched = Scheduler::new(); + + let handle1 = sched.schedule(Duration::from_millis(50), move || { + trig1b.fetch_add(1, SeqCst); + }); + assert_eq!(sched.task_count(), 1); + thread::sleep(Duration::from_millis(170)); + assert_eq!(3, trig1a.load(SeqCst)); + + handle1.cancel(); + thread::sleep(Duration::from_millis(70)); + assert_eq!(sched.task_count(), 0); + assert_eq!(3, trig1a.load(SeqCst)); + } + + #[test] + fn schedule_two_and_cancel() { + let trig1a = Arc::new(AtomicUsize::new(0)); + let trig1b = trig1a.clone(); + + let trig2a = Arc::new(AtomicUsize::new(0)); + let trig2b = trig2a.clone(); + + let sched = Scheduler::new(); + + let handle1 = sched.schedule(Duration::from_millis(50), move || { + trig1b.fetch_add(1, SeqCst); + println!("ran 1"); + }); + + let handle2 = sched.schedule(Duration::from_millis(100), move || { + trig2b.fetch_add(1, SeqCst); + println!("ran 2"); + }); + + thread::sleep(Duration::from_millis(110)); + assert_eq!(2, trig1a.load(SeqCst)); + assert_eq!(1, trig2a.load(SeqCst)); + + handle1.cancel(); + thread::sleep(Duration::from_millis(110)); + assert_eq!(2, trig1a.load(SeqCst)); + assert_eq!(2, trig2a.load(SeqCst)); + + handle2.cancel(); + thread::sleep(Duration::from_millis(160)); + assert_eq!(2, trig1a.load(SeqCst)); + assert_eq!(2, trig2a.load(SeqCst)); + } + + #[test] + fn schedule_one_and_more() { + let trig1a = Arc::new(AtomicUsize::new(0)); + let trig1b = trig1a.clone(); + + let sched = Scheduler::new(); + + let handle1 = sched.schedule(Duration::from_millis(100), move || { + trig1b.fetch_add(1, SeqCst); + }); + + thread::sleep(Duration::from_millis(110)); + assert_eq!(1, trig1a.load(SeqCst)); + + let trig2a = Arc::new(AtomicUsize::new(0)); + let trig2b = trig2a.clone(); + + let handle2 = sched.schedule(Duration::from_millis(50), move || { + trig2b.fetch_add(1, SeqCst); + }); + + thread::sleep(Duration::from_millis(110)); + assert_eq!(2, trig1a.load(SeqCst)); + assert_eq!(2, trig2a.load(SeqCst)); + + handle1.cancel(); + handle2.cancel(); + } +} diff --git a/src/core/void.rs b/src/core/void.rs index cccf87e..f0ba48d 100755 --- a/src/core/void.rs +++ b/src/core/void.rs @@ -3,6 +3,7 @@ use core::name::MetricName; use core::output::{Output, OutputMetric, OutputScope}; use core::Flush; +use std::error::Error; use std::sync::Arc; lazy_static! { @@ -48,4 +49,8 @@ impl OutputScope for VoidOutput { } } -impl Flush for VoidOutput {} +impl Flush for VoidOutput { + fn flush(&self) -> Result<(), Box> { + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 608e0a1..8686556 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,8 +30,6 @@ extern crate crossbeam_channel; #[cfg(feature = "parking_lot")] extern crate parking_lot; -//extern crate tiny_http; - #[macro_use] mod macros; pub use macros::*; @@ -65,7 +63,7 @@ macro_rules! read_lock { } mod core; -pub use core::attributes::{Buffered, Buffering, Prefixed, Sampled, Sampling}; +pub use core::attributes::{Buffered, Buffering, Observe, OnFlush, Prefixed, Sampled, Sampling}; pub use core::clock::TimeHandle; pub use core::error::Result; pub use core::input::{ @@ -88,7 +86,7 @@ mod output; pub use output::format::{Formatting, LabelOp, LineFormat, LineOp, LineTemplate, SimpleFormat}; pub use output::graphite::{Graphite, GraphiteMetric, GraphiteScope}; pub use output::log::{Log, LogScope}; -pub use output::map::StatsMap; +pub use output::map::StatsMapScope; pub use output::statsd::{Statsd, StatsdMetric, StatsdScope}; pub use output::stream::{Stream, TextScope}; diff --git a/src/multi/multi_in.rs b/src/multi/multi_in.rs index 64f2ae8..c7f520f 100755 --- a/src/multi/multi_in.rs +++ b/src/multi/multi_in.rs @@ -1,6 +1,6 @@ //! Dispatch metrics to multiple sinks. -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -102,6 +102,7 @@ impl InputScope for MultiInputScope { impl Flush for MultiInputScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); for w in &self.scopes { w.flush()?; } diff --git a/src/multi/multi_out.rs b/src/multi/multi_out.rs index 2de7ec8..d03e552 100755 --- a/src/multi/multi_out.rs +++ b/src/multi/multi_out.rs @@ -1,6 +1,6 @@ //! Dispatch metrics to multiple sinks. -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::InputKind; use core::name::MetricName; @@ -104,6 +104,7 @@ impl OutputScope for MultiOutputScope { impl Flush for MultiOutputScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); for w in &self.scopes { w.flush()?; } diff --git a/src/output/graphite.rs b/src/output/graphite.rs index eabef0f..ef42159 100755 --- a/src/output/graphite.rs +++ b/src/output/graphite.rs @@ -1,7 +1,7 @@ //! Send metrics to a graphite server. use cache::cache_out; -use core::attributes::{Attributes, Buffered, Prefixed, WithAttributes}; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::InputKind; use core::metrics; @@ -103,6 +103,7 @@ impl OutputScope for GraphiteScope { impl Flush for GraphiteScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); let buf = self.buffer.borrow_mut(); self.flush_inner(buf) } diff --git a/src/output/log.rs b/src/output/log.rs index 5ab8d41..be30e9d 100755 --- a/src/output/log.rs +++ b/src/output/log.rs @@ -1,5 +1,5 @@ use cache::cache_in; -use core::attributes::{Attributes, Buffered, Prefixed, WithAttributes}; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{Input, InputKind, InputMetric, InputScope}; use core::name::MetricName; @@ -150,6 +150,7 @@ impl InputScope for LogScope { impl Flush for LogScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); let mut entries = write_lock!(self.entries); if !entries.is_empty() { let mut buf: Vec = Vec::with_capacity(32 * entries.len()); diff --git a/src/output/map.rs b/src/output/map.rs index f35603b..e43b8b9 100755 --- a/src/output/map.rs +++ b/src/output/map.rs @@ -1,35 +1,99 @@ +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::input::InputKind; +use core::input::{Input, InputMetric, InputScope}; use core::name::MetricName; -use core::output::{OutputMetric, OutputScope}; use core::{Flush, MetricValue}; -use std::cell::RefCell; use std::collections::BTreeMap; -use std::rc::Rc; +use std::error::Error; -/// A HashMap wrapper to receive metrics or stats values. +use std::sync::{Arc, RwLock}; +use {OutputMetric, OutputScope}; + +/// A BTreeMap wrapper to receive metrics or stats values. /// Every received value for a metric replaces the previous one (if any). #[derive(Clone, Default)] pub struct StatsMap { - inner: Rc>>, + attributes: Attributes, +} + +impl WithAttributes for StatsMap { + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } +} + +impl Input for StatsMap { + type SCOPE = StatsMapScope; + + fn metrics(&self) -> Self::SCOPE { + StatsMapScope { + attributes: self.attributes.clone(), + inner: Arc::new(RwLock::new(BTreeMap::new())), + } + } +} + +/// A BTreeMap wrapper to receive metrics or stats values. +/// Every received value for a metric replaces the previous one (if any). +#[derive(Clone, Default)] +pub struct StatsMapScope { + attributes: Attributes, + inner: Arc>>, +} + +impl WithAttributes for StatsMapScope { + fn get_attributes(&self) -> &Attributes { + &self.attributes + } + fn mut_attributes(&mut self) -> &mut Attributes { + &mut self.attributes + } +} + +impl InputScope for StatsMapScope { + fn new_metric(&self, name: MetricName, _kind: InputKind) -> InputMetric { + let name = self.prefix_append(name); + let write_to = self.inner.clone(); + let name: String = name.join("."); + InputMetric::new(move |value, _labels| { + let _previous = write_to.write().expect("Lock").insert(name.clone(), value); + }) + } } -impl OutputScope for StatsMap { +impl OutputScope for StatsMapScope { fn new_metric(&self, name: MetricName, _kind: InputKind) -> OutputMetric { + let name = self.prefix_append(name); let write_to = self.inner.clone(); let name: String = name.join("."); OutputMetric::new(move |value, _labels| { - let _previous = write_to.borrow_mut().insert(name.clone(), value); + let _previous = write_to.write().expect("Lock").insert(name.clone(), value); }) } } -impl Flush for StatsMap {} +impl Flush for StatsMapScope { + fn flush(&self) -> Result<(), Box> { + self.notify_flush_listeners(); + Ok(()) + } +} -impl From for BTreeMap { - fn from(map: StatsMap) -> Self { - // FIXME this is is possibly a full map copy, for nothing. +impl From for BTreeMap { + fn from(map: StatsMapScope) -> Self { + // FIXME this is is possibly a full map copy, for no reason. // into_inner() is what we'd really want here but would require some `unsafe`? don't know how to do this yet. - map.inner.borrow().clone() + map.inner.read().unwrap().clone() + } +} + +impl StatsMapScope { + /// Extract the backing BTreeMap. + pub fn into_map(self) -> BTreeMap { + self.into() } } diff --git a/src/output/prometheus.rs b/src/output/prometheus.rs index 5206d14..678b7aa 100755 --- a/src/output/prometheus.rs +++ b/src/output/prometheus.rs @@ -1,7 +1,7 @@ //! Send metrics to a Prometheus server. use cache::cache_out; -use core::attributes::{Attributes, Buffered, Prefixed, WithAttributes}; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::InputKind; use core::label::Labels; @@ -90,6 +90,7 @@ impl OutputScope for PrometheusScope { impl Flush for PrometheusScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); let buf = self.buffer.borrow_mut(); self.flush_inner(buf) } diff --git a/src/output/statsd.rs b/src/output/statsd.rs index 1eb1b49..daef10f 100755 --- a/src/output/statsd.rs +++ b/src/output/statsd.rs @@ -1,7 +1,9 @@ //! Send metrics to a statsd server. use cache::cache_out; -use core::attributes::{Attributes, Buffered, Prefixed, Sampled, Sampling, WithAttributes}; +use core::attributes::{ + Attributes, Buffered, OnFlush, Prefixed, Sampled, Sampling, WithAttributes, +}; use core::error; use core::input::InputKind; use core::metrics; @@ -130,6 +132,7 @@ impl OutputScope for StatsdScope { impl Flush for StatsdScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); let buf = self.buffer.borrow_mut(); self.flush_inner(buf) } diff --git a/src/output/stream.rs b/src/output/stream.rs index e0b3ac7..f452085 100755 --- a/src/output/stream.rs +++ b/src/output/stream.rs @@ -2,7 +2,7 @@ // TODO parameterize templates -use core::attributes::{Attributes, Buffered, Prefixed, WithAttributes}; +use core::attributes::{Attributes, Buffered, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::InputKind; use core::name::MetricName; @@ -67,7 +67,7 @@ impl Stream { Ok(Stream::write_to(file)) } - /// Write to a new file. + /// Write metrics to a new file. /// /// Creates a new file to dump data into. If `clobber` is set to true, it allows overwriting /// existing file, if false, the attempt will result in an error. @@ -82,7 +82,7 @@ impl Stream { } impl Stream { - /// Write metric values to stdout. + /// Write metric values to stderr. pub fn to_stderr() -> Stream { Stream::write_to(io::stderr()) } @@ -196,6 +196,7 @@ impl OutputScope for TextScope { impl Flush for TextScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); let mut entries = self.entries.borrow_mut(); if !entries.is_empty() { let mut output = write_lock!(self.output.inner); diff --git a/src/queue/queue_in.rs b/src/queue/queue_in.rs index 25e0705..79a6718 100755 --- a/src/queue/queue_in.rs +++ b/src/queue/queue_in.rs @@ -3,7 +3,7 @@ //! If queue size is exceeded, calling code reverts to blocking. use cache::cache_in::CachedInput; -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope}; use core::label::Labels; @@ -200,6 +200,7 @@ impl InputScope for InputQueueScope { impl Flush for InputQueueScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); if let Err(e) = self.sender.send(InputQueueCmd::Flush(self.target.clone())) { metrics::SEND_FAILED.mark(); debug!("Failed to flush async metrics: {}", e); diff --git a/src/queue/queue_out.rs b/src/queue/queue_out.rs index 93ee688..1628ec1 100755 --- a/src/queue/queue_out.rs +++ b/src/queue/queue_out.rs @@ -3,7 +3,7 @@ //! If queue size is exceeded, calling code reverts to blocking. use cache::cache_in; -use core::attributes::{Attributes, Prefixed, WithAttributes}; +use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes}; use core::error; use core::input::{Input, InputKind, InputMetric, InputScope}; use core::label::Labels; @@ -189,6 +189,7 @@ impl InputScope for OutputQueueScope { impl Flush for OutputQueueScope { fn flush(&self) -> error::Result<()> { + self.notify_flush_listeners(); if let Err(e) = self.sender.send(OutputQueueCmd::Flush(self.target.clone())) { metrics::SEND_FAILED.mark(); debug!("Failed to flush async metrics: {}", e);