Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/observers #50

Merged
merged 18 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions HANDBOOK.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,48 +57,94 @@ Timer's internal precision is microseconds but can be scaled down on output.
extern crate dipstick;
use dipstick::*;
fn main() {
let app_metrics = Stream::to_stdout().metrics();
let timer = app_metrics.timer("my_timer");
time!(timer, {/* slow code here */} );
timer.time(|| {/* slow code here */} );
let metrics = Stream::to_stdout().metrics();
let timer = metrics.timer("my_timer");

let start = timer.start();
/* slow code here */
timer.stop(start);
// using macro
time!(timer, {/* timed code here ... */} );

// using closure
timer.time(|| {/* timed code here ... */} );

// using start/stop
let handle = timer.start();
/* timed code here ... */
timer.stop(handle);

// directly reporting microseconds
timer.interval_us(123_456);
}
```

### Level
Relative quantity counter. Accepts positive and negative cumulative values.
If aggregated, observed minimum and maximum track the sum of values rather than individual values as for `Counter`.
A relative, cumulative quantity counter. Accepts positive and negative values.
If aggregated, observed minimum and maximum track the _sum_ of values (as opposed to `Counter` min and max _individual_ values).

### Gauge
An instant observation of a resource's value (positive or negative, but non-cumulative).
The observation of Gauges is not automatic and must be performed programmatically as with other metric types.
The observation of Gauges can be performed programmatically as with other metric types or
it can be triggered automatically, either on schedule or upon flushing the scope:

````rust
extern crate dipstick;
use dipstick::*;
use std::time::Duration;

fn main() {
let metrics = Stream::to_stdout().metrics();
let uptime = metrics.gauge("uptime");

// report gauge value programmatically
uptime.value(2);

// observe a constant value before each flush
let uptime = metrics.gauge("uptime");
metrics.observe(uptime, || 6).on_flush();

// observe a function-provided value periodically
metrics
.observe(metrics.gauge("threads"), thread_count)
.every(Duration::from_secs(1));
}

fn thread_count() -> MetricValue {
6
}
````

### Names
Each metric must be given a name upon creation.
Names are opaque to the application and are used only to identify the metrics upon output.

Names may be prepended with a namespace by each configured backend.
Aggregated statistics may also append identifiers to the metric's name.

Names should exclude characters that can interfere with namespaces, separator and output protocols.
A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters.
Names may be prepended with a application-namespace shared across all backends.

```rust
extern crate dipstick;
use dipstick::*;
fn main() {
let app_metrics = Stream::to_stdout().metrics();
let db_metrics = app_metrics.named("database");
let _db_timer = db_metrics.timer("db_timer");
let _db_counter = db_metrics.counter("db_counter");
let metrics = Stream::to_stdout().metrics();

// plainly name "timer"
let _timer = metrics.timer("timer");

// prepend namespace
let db_metrics = metrics.named("database");

// qualified name will be "database.counter"
let _db_counter = db_metrics.counter("counter");
}
```

Names may be prepended with a namespace by each configured backend.
For example, the same metric `request.success` could appear under different qualified names:
- logging as `app_module.request.success`
- statsd as `environment.hostname.pid.module.request.success`

Aggregation statistics may also append identifiers to the metric's name, such as `counter_mean` or `marker_rate`.

Names should exclude characters that can interfere with namespaces, separator and output protocols.
A good convention is to stick with lowercase alphanumeric identifiers of less than 12 characters.


### Labels

Expand Down
42 changes: 42 additions & 0 deletions examples/observer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//!
//! A sample application to demonstrate flush-triggered and scheduled observation of gauge values.
//!
//! This is the expected output:
//!
//! ```
//! cargo run --example observer
//! process.threads 4
//! process.uptime 6
//! process.threads 4
//! process.uptime 6
//! ...
//! ```
//!

extern crate dipstick;

use std::time::Duration;

use dipstick::*;

fn main() {
let metrics = AtomicBucket::new().named("process");
metrics.drain(Stream::to_stdout());
metrics.flush_every(Duration::from_secs(3));

let uptime = metrics.gauge("uptime");
metrics.observe(uptime, || 6).on_flush();

metrics
.observe(metrics.gauge("threads"), thread_count)
.every(Duration::from_secs(1));

loop {
std::thread::sleep(Duration::from_millis(40));
}
}

/// Query number of running threads in this process using Linux's /proc filesystem.
fn thread_count() -> MetricValue {
4
}
15 changes: 8 additions & 7 deletions src/bucket/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use bucket::ScoreType::*;
use bucket::{stats_summary, ScoreType};
use core::attributes::{Attributes, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::clock::TimeHandle;
use core::error;
use core::input::{InputKind, InputMetric, InputScope};
Expand Down Expand Up @@ -83,7 +83,7 @@ lazy_static! {
}

impl InnerAtomicBucket {
pub fn flush(&mut self) -> error::Result<()> {
fn flush(&mut self) -> error::Result<()> {
let pub_scope = match self.drain {
Some(ref out) => out.output_dyn(),
None => read_lock!(DEFAULT_AGGREGATE_OUTPUT).output_dyn(),
Expand All @@ -110,7 +110,7 @@ impl InnerAtomicBucket {
/// Take a snapshot of aggregated values and reset them.
/// Compute stats on captured values using assigned or default stats function.
/// Write stats to assigned or default output.
pub fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> {
fn flush_to(&mut self, target: &OutputScope) -> error::Result<()> {
let now = TimeHandle::now();
let duration_seconds = self.period_start.elapsed_us() as f64 / 1_000_000.0;
self.period_start = now;
Expand Down Expand Up @@ -234,7 +234,7 @@ impl AtomicBucket {
}

/// Revert this bucket's statistics generator to the default stats.
pub fn unset_stat(&self) {
pub fn unset_stats<F>(&self) {
write_lock!(self.inner).stats = None
}

Expand Down Expand Up @@ -277,6 +277,7 @@ impl Flush for AtomicBucket {
/// Collect and reset aggregated data.
/// Publish statistics
fn flush(&self) -> error::Result<()> {
self.notify_flush_listeners();
let mut inner = write_lock!(self.inner);
inner.flush()
}
Expand Down Expand Up @@ -327,7 +328,7 @@ impl AtomicScores {
}

/// Update scores with new value
pub fn update(&self, value: MetricValue) {
pub fn update(&self, value: MetricValue) -> () {
// TODO detect & report any concurrent updates / resets for measurement of contention
// Count is tracked for all metrics
self.scores[HIT].fetch_add(1, Relaxed);
Expand Down Expand Up @@ -494,7 +495,7 @@ mod test {
use bucket::{stats_all, stats_average, stats_summary};

use core::clock::{mock_clock_advance, mock_clock_reset};
use output::map::StatsMap;
use output::map::StatsMapScope;

use std::collections::BTreeMap;
use std::time::Duration;
Expand Down Expand Up @@ -535,7 +536,7 @@ mod test {

mock_clock_advance(Duration::from_secs(3));

let map = StatsMap::default();
let map = StatsMapScope::default();
metrics.flush_to(&map).unwrap();
map.into()
}
Expand Down
3 changes: 2 additions & 1 deletion src/cache/cache_in.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Metric input scope caching.

use cache::lru_cache as lru;
use core::attributes::{Attributes, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::error;
use core::input::{Input, InputDyn, InputKind, InputMetric, InputScope};
use core::name::MetricName;
Expand Down Expand Up @@ -102,6 +102,7 @@ impl InputScope for InputScopeCache {

impl Flush for InputScopeCache {
fn flush(&self) -> error::Result<()> {
self.notify_flush_listeners();
self.target.flush()
}
}
3 changes: 2 additions & 1 deletion src/cache/cache_out.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Metric output scope caching.

use cache::lru_cache as lru;
use core::attributes::{Attributes, Prefixed, WithAttributes};
use core::attributes::{Attributes, OnFlush, Prefixed, WithAttributes};
use core::error;
use core::input::InputKind;
use core::name::MetricName;
Expand Down Expand Up @@ -105,6 +105,7 @@ impl OutputScope for OutputScopeCache {

impl Flush for OutputScopeCache {
fn flush(&self) -> error::Result<()> {
self.notify_flush_listeners();
self.target.flush()
}
}
Loading