Skip to content

Commit

Permalink
Ergonomize listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
fralalonde committed Apr 9, 2019
1 parent 5c5ec50 commit f59d403
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 128 deletions.
26 changes: 10 additions & 16 deletions examples/observer.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,37 @@
//!
//! A sample application to demonstrate observing of a value.
//! A sample application to demonstrate flush-triggered and scheduled observation of gauge values.
//!
//! This is the expected output:
//!
//! ```
//! cargo run --example observer
//! process.threads 4
//! process.uptime 6
//! process.threads 4
//! process.uptime 6
//! ...
//! Press Enter key to exit
//! process.threads 2
//! process.uptime 1000
//! process.threads 2
//! process.uptime 2001
//! process.threads 2
//! process.uptime 3002
//! ```
//!

extern crate dipstick;

use std::time::{Duration};
use std::sync::atomic::AtomicUsize;

use dipstick::{AtomicBucket, InputScope, MetricValue, Prefixed, ScheduleFlush, Stream, OnFlush, Observe};
use dipstick::*;

fn main() {
let mut metrics = AtomicBucket::new().named("process");
metrics.drain(Stream::to_stdout());

metrics.flush_every(Duration::from_secs(3));

let uptime = metrics.gauge("uptime");
metrics.on_flush(move || uptime.value(6));
metrics.observe(uptime, || 6).on_flush();

let threads = metrics.gauge("threads");
metrics.observe(threads, Duration::from_secs(1), thread_count);
metrics.observe(threads, thread_count).every(Duration::from_secs(1));

metrics.flush_every(Duration::from_secs(3));

loop {
metrics.counter("counter_a").count(123);
metrics.timer("timer_a").interval_us(2000000);
std::thread::sleep(Duration::from_millis(40));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ mod test {
use bucket::{stats_all, stats_average, stats_summary};

use core::clock::{mock_clock_advance, mock_clock_reset};
use output::map::StatsMap;
use output::map::StatsMapScope;

use std::time::Duration;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -506,7 +506,7 @@ mod test {

mock_clock_advance(Duration::from_secs(3));

let map = StatsMap::default();
let map = StatsMapScope::default();
metrics.flush_to(&map).unwrap();
map.into()
}
Expand Down
95 changes: 58 additions & 37 deletions src/core/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,48 +89,69 @@ pub trait WithAttributes: Clone {

/// Register and notify scope-flush listeners
pub trait OnFlush {
/// Register a new flush listener
fn on_flush<F: Fn() -> () + Send + Sync + 'static>(&mut self, listener: F);

/// Notify registered listeners of an impending flush.
fn notify_flush_listeners(&self);
}

impl <T> OnFlush for T where T: Flush + WithAttributes {
fn on_flush<F: Fn() -> () + Send + Sync + 'static>(&mut self, listener: F) {
self.mut_attributes().flush_listeners.push(Arc::new(listener));
}

fn notify_flush_listeners(&self) {
for listener in self.get_attributes().flush_listeners.iter() {
for listener in &self.get_attributes().flush_listeners {
(listener)()
}
}
}

pub struct ObserveWhen<'a, T, F> {
target: &'a mut T,
gauge: Gauge,
operation: Arc<F>,
}

impl<'a, T, F> ObserveWhen<'a, T, F>
where F: Fn() -> MetricValue + Send + Sync + 'static,
T: InputScope + WithAttributes + Send + Sync,
{
pub fn on_flush(self) {
let gauge = self.gauge;
let op = self.operation;
self.target.mut_attributes().flush_listeners.push(Arc::new(move || gauge.value(op())));
}

pub fn every(self, period: Duration,) -> CancelHandle {
let gauge = self.gauge;
let op = self.operation;
let handle = SCHEDULER.schedule(period, move || gauge.value(op()));
self.target.mut_attributes().tasks.push(handle.clone());
handle
}
}

/// Schedule a recurring task
pub trait Observe {

/// Schedule a recurring task.
/// The returned handle can be used to cancel the task.
fn observe<F>(&mut self, gauge: Gauge, every: Duration, operation: F) -> CancelHandle
where F: Fn() -> MetricValue + Send + Sync + 'static;
fn observe<F>(&mut self, gauge: Gauge, operation: F) -> ObserveWhen<Self, F>
where F: Fn() -> MetricValue + Send + Sync + 'static, Self: Sized;

}

impl<T: InputScope + WithAttributes> Observe for T {
fn observe<F>(&mut self, gauge: Gauge, period: Duration, operation: F) -> CancelHandle
where F: Fn() -> MetricValue + Send + Sync + 'static
fn observe<F>(&mut self, gauge: Gauge, operation: F) -> ObserveWhen<Self, F>
where F: Fn() -> MetricValue + Send + Sync + 'static, Self: Sized
{
let handle = SCHEDULER.schedule(period, move || gauge.value(operation())) ;
self.mut_attributes().tasks.push(handle.clone());
handle
ObserveWhen {
target: self,
gauge,
operation: Arc::new(operation),
}
}
}

impl Drop for Attributes {
fn drop(&mut self) {
for t in self.tasks.drain(..) {
t.cancel()
for task in self.tasks.drain(..) {
task.cancel()
}
}
}
Expand Down Expand Up @@ -182,12 +203,10 @@ impl<T: WithAttributes> Prefixed for T {
&self.get_attributes().naming
}

/// Replace any existing names with a single name.
/// Return a clone of the component with the new name.
/// If multiple names are required, `add_name` may also be used.
fn named<S: Into<String>>(&self, name: S) -> Self {
let parts = NameParts::from(name);
self.with_attributes(|new_attr| new_attr.naming = parts.clone())
/// Append a name to the existing names.
/// Return a clone of the component with the updated names.
fn add_prefix<S: Into<String>>(&self, name: S) -> Self {
self.add_name(name)
}

/// Append a name to the existing names.
Expand All @@ -197,10 +216,12 @@ impl<T: WithAttributes> Prefixed for T {
self.with_attributes(|new_attr| new_attr.naming.push_back(name.clone()))
}

/// Append a name to the existing names.
/// Return a clone of the component with the updated names.
fn add_prefix<S: Into<String>>(&self, name: S) -> Self {
self.add_name(name)
/// Replace any existing names with a single name.
/// Return a clone of the component with the new name.
/// If multiple names are required, `add_name` may also be used.
fn named<S: Into<String>>(&self, name: S) -> Self {
let parts = NameParts::from(name);
self.with_attributes(|new_attr| new_attr.naming = parts.clone())
}

}
Expand Down Expand Up @@ -241,19 +262,19 @@ pub trait Buffered: WithAttributes {

#[cfg(test)]
mod test {
use StatsMap;
use core::attributes::OnFlush;
use std::sync::atomic::{AtomicBool, Ordering};
use output::map::StatsMap;
use core::attributes::*;
use core::input::*;
use core::Flush;
use std::sync::Arc;
use core::input::Input;
use StatsMapScope;

#[test]
fn on_flush() {
let flushed = Arc::new(AtomicBool::new(false));
let mut map = StatsMap::default();
let flushed1 = flushed.clone();
map.on_flush(move || flushed1.store(true, Ordering::Relaxed));
map.flush().unwrap();
assert_eq!(true, flushed.load(Ordering::Relaxed))
let mut metrics: StatsMapScope = StatsMap::default().metrics();
let gauge = metrics.gauge("my_gauge");
metrics.observe(gauge, || 4).on_flush();
metrics.flush().unwrap();
assert_eq!(Some(&4), metrics.into_map().get("my_gauge"))
}
}
67 changes: 10 additions & 57 deletions src/core/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,14 @@ use std::ops::Add;
use std::time::Duration;

use std::time::Instant;
use std::ops::Deref;

use core::MetricValue;

#[cfg(test)]
use std::sync::{Arc};

use std::cell::RefCell;

#[cfg(not(feature="parking_lot"))]
use std::sync::{RwLock};

#[cfg(feature="parking_lot")]
use parking_lot::{RwLock};


#[derive(Debug, Copy, Clone)]
/// A handle to the start time of a counter.
/// Wrapped so it may be changed safely later.
pub struct TimeHandle(Instant);

impl Deref for TimeHandle {
type Target = Instant;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl TimeHandle {
/// Get a handle on current time.
/// Used by the TimerMetric start_time() method.
Expand All @@ -53,11 +32,10 @@ impl TimeHandle {
}
}

/// Use independent mock clock per thread so that tests can run concurrently without interfering.
/// Tests covering concurrent behavior can override this and share a single clock between threads.
#[cfg(test)]
/// The mock clock is thread local so that tests can run in parallel without affecting each other.
use std::cell::RefCell;
thread_local! {
static MOCK_CLOCK: RefCell<Arc<RwLock<Instant>>> = RefCell::new(Arc::new(RwLock::new(Instant::now())));
static MOCK_CLOCK: RefCell<Instant> = RefCell::new(Instant::now());
}

/// Set the mock clock to the current time.
Expand All @@ -69,10 +47,8 @@ pub fn mock_clock_reset() {
if !cfg!(not(test)) {
warn!("Mock clock used outside of cfg[]tests has no effect")
}
// TODO mock_clock crate
MOCK_CLOCK.with(|clock_cell| {
let now = clock_cell.borrow();
*write_lock!(now) = Instant::now();
MOCK_CLOCK.with(|now| {
*now.borrow_mut() = Instant::now();
})
}

Expand All @@ -83,46 +59,23 @@ pub fn mock_clock_reset() {
/// Not feature-gated so it stays visible to outside crates but may not be used outside of tests.
#[cfg(test)]
pub fn mock_clock_advance(period: Duration) {
MOCK_CLOCK.with(|clock_cell| {
let now = clock_cell.borrow();
let mut now = write_lock!(now);
println!("advancing mock clock {:?} + {:?}", *now, period);
MOCK_CLOCK.with(|now| {
let mut now = now.borrow_mut();
*now = now.add(period);
println!("advanced mock clock {:?}", *now);
})
}

//#[cfg(test)]
//pub fn share_mock_clock() -> Arc<RwLock<Instant>> {
// // TODO mock_clock crate
// MOCK_CLOCK.with(|clock_cell| {
// let now = clock_cell.borrow();
// now.clone()
// })
//}
//
//#[cfg(test)]
//pub fn use_mock_clock(shared: Arc<RwLock<Instant>>) {
// // TODO mock_clock crate
// MOCK_CLOCK.with(|clock_cell| {
// let mut clock = clock_cell.borrow_mut();
// *clock = shared
// })
//}

#[cfg(not(test))]
fn now() -> Instant {
Instant::now()
}

#[cfg(test)]
/// Metrics mock_clock enabled!
/// thread::sleep will have no effect on metrics.
/// Use advance_time() to simulate passing time.
#[cfg(test)]
fn now() -> Instant {
MOCK_CLOCK.with(|clock_cell| {
let clock = clock_cell.borrow();
let now = read_lock!(clock);
*now
MOCK_CLOCK.with(|now| {
*now.borrow()
})
}
6 changes: 3 additions & 3 deletions src/core/label.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ pub mod test {

use std::sync::Mutex;

/// Label tests use the globally shared AppLabels which may make them interfere as tests are run concurrently.
/// We do not want to mandate usage of `RUST_TEST_THREADS=1` which would penalize the whole test suite.
/// Instead we use a local mutex to make sure the label tests run in sequence.
lazy_static!{
/// Label tests use the globally shared AppLabels which may make them interfere as tests are run concurrently.
/// We do not want to mandate usage of `RUST_TEST_THREADS=1` which would penalize the whole test suite.
/// Instead we use a local mutex to make sure the label tests run in sequence.
static ref TEST_SEQUENCE: Mutex<()> = Mutex::new(());
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub use output::format::{LineFormat, SimpleFormat, LineOp, LabelOp, LineTemplate
pub use output::stream::{Stream, TextScope};
pub use output::graphite::{Graphite, GraphiteScope, GraphiteMetric};
pub use output::statsd::{Statsd, StatsdScope, StatsdMetric};
pub use output::map::{StatsMap};
pub use output::map::StatsMapScope;
pub use output::log::{Log, LogScope};

//#[cfg(feature="prometheus")]
Expand Down
Loading

0 comments on commit f59d403

Please sign in to comment.