Skip to content

Commit

Permalink
Merge #141
Browse files Browse the repository at this point in the history
141: Stats agg r=davidkohn88 a=davidkohn88



Co-authored-by: davidkohn88 <david@timescale.com>
  • Loading branch information
bors[bot] and davidkohn88 authored Jun 17, 2021
2 parents d992183 + 70b9f3a commit 6d89805
Show file tree
Hide file tree
Showing 17 changed files with 1,574 additions and 142 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"crates/asap",
"crates/counter-agg",
"crates/time-series",
"crates/stats-agg",
]

[profile.dev]
Expand Down
2 changes: 1 addition & 1 deletion crates/counter-agg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2018"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
stats_agg = {path="../stats-agg"}
time_series = {path="../time-series"}


[dev-dependencies]
approx = "0.4.0"
21 changes: 11 additions & 10 deletions crates/counter-agg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@

use time_series::TSPoint;
use stats_agg::{XYPair, stats2d::StatsSummary2D};
use serde::{Deserialize, Serialize};
use regression::{XYPair, RegressionSummary};

pub mod regression;

pub mod range;
mod tests;

Expand All @@ -21,7 +22,7 @@ pub struct CounterSummary {
pub reset_sum: f64,
pub num_resets: u64,
pub num_changes: u64,
pub regress: RegressionSummary,
pub stats: StatsSummary2D,
pub bounds: Option<range::I64Range>,
}

Expand Down Expand Up @@ -54,10 +55,10 @@ impl CounterSummary {
reset_sum: 0.0,
num_resets: 0,
num_changes: 0,
regress: RegressionSummary::new(),
stats: StatsSummary2D::new(),
bounds,
};
n.regress.accum(ts_to_xy(*pt)).unwrap();
n.stats.accum(ts_to_xy(*pt)).unwrap();
n
}

Expand Down Expand Up @@ -88,7 +89,7 @@ impl CounterSummary {
self.last = *incoming;
let mut incoming_xy = ts_to_xy(*incoming);
incoming_xy.y += self.reset_sum;
self.regress.accum(incoming_xy).unwrap();
self.stats.accum(incoming_xy).unwrap();
Ok(())
}

Expand Down Expand Up @@ -119,15 +120,15 @@ impl CounterSummary {
if self.single_value() {
self.second = incoming.first;
}
let mut regress = incoming.regress.clone();
let mut stats = incoming.stats.clone();
// have to offset based on our reset_sum, including the amount we added based on any resets that happened at the boundary (but before we add in the incoming reset_sum)
regress.offset(XYPair{x:0.0, y: self.reset_sum}).unwrap();
stats.offset(XYPair{x:0.0, y: self.reset_sum}).unwrap();
self.last = incoming.last;
self.reset_sum += incoming.reset_sum;
self.num_resets += incoming.num_resets;
self.num_changes += incoming.num_changes;

self.regress = self.regress.combine(regress).unwrap();
self.stats = self.stats.combine(stats).unwrap();
self.bounds_extend(incoming.bounds);
Ok(())
}
Expand Down Expand Up @@ -217,7 +218,7 @@ impl CounterSummary {
let mut duration_to_start = to_seconds((self.first.ts - self.bounds.unwrap().left.unwrap()) as f64);
let duration_to_end = to_seconds((self.bounds.unwrap().right.unwrap() - self.last.ts) as f64);
let sampled_interval = self.time_delta();
let avg_duration_between_samples = sampled_interval / (self.regress.n64() - 1.0); // don't have to worry about divide by zero because we know we have at least 2 values from the above.
let avg_duration_between_samples = sampled_interval / (self.stats.n - 1) as f64; // don't have to worry about divide by zero because we know we have at least 2 values from the above.

// we don't want to extrapolate to negative counter values, so we calculate the duration to the zero point of the counter (based on what we know here) and set that as duration_to_start if it's smaller than duration_to_start
if result_val > 0.0 && self.first.val >= 0.0 {
Expand Down
24 changes: 12 additions & 12 deletions crates/counter-agg/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ pub mod tests {
assert_eq!(p1.last, p2.last, "last");
assert_eq!(p1.num_changes, p2.num_changes, "num_changes");
assert_eq!(p1.num_resets, p2.num_resets, "num_resets");
assert_eq!(p1.regress.n, p2.regress.n, "n");
assert_relative_eq!(p1.regress.sx, p2.regress.sx);
assert_relative_eq!(p1.regress.sxx, p2.regress.sxx);
assert_relative_eq!(p1.regress.sy, p2.regress.sy);
assert_relative_eq!(p1.regress.syy, p2.regress.syy);
assert_relative_eq!(p1.regress.sxy, p2.regress.sxy);
assert_eq!(p1.stats.n, p2.stats.n, "n");
assert_relative_eq!(p1.stats.sx, p2.stats.sx);
assert_relative_eq!(p1.stats.sxx, p2.stats.sxx);
assert_relative_eq!(p1.stats.sy, p2.stats.sy);
assert_relative_eq!(p1.stats.syy, p2.stats.syy);
assert_relative_eq!(p1.stats.sxy, p2.stats.sxy);
}
#[test]
fn create() {
Expand Down Expand Up @@ -68,10 +68,10 @@ pub mod tests {
assert_relative_eq!(summary.reset_sum, 50.0);
assert_eq!(summary.num_resets, 1);
assert_eq!(summary.num_changes, 4);
assert_eq!(summary.regress.count(), 6);
assert_relative_eq!(summary.regress.sum().unwrap().x, to_seconds(75.0));
assert_eq!(summary.stats.count(), 6);
assert_relative_eq!(summary.stats.sum().unwrap().x, to_seconds(75.0));
// non obvious one here, sumy should be the sum of all values including the resets at the time.
assert_relative_eq!(summary.regress.sum().unwrap().y, 0.0 + 10.0 + 20.0 + 20.0 + 50.0 + 60.0);
assert_relative_eq!(summary.stats.sum().unwrap().y, 0.0 + 10.0 + 20.0 + 20.0 + 50.0 + 60.0);
}


Expand Down Expand Up @@ -170,10 +170,10 @@ pub mod tests {
assert_relative_eq!(summary.reset_sum, 60.0);
assert_eq!(summary.num_resets, 2);
assert_eq!(summary.num_changes, 6);
assert_eq!(summary.regress.count(), 7);
assert_relative_eq!(summary.regress.sum().unwrap().x, to_seconds(105.0));
assert_eq!(summary.stats.count(), 7);
assert_relative_eq!(summary.stats.sum().unwrap().x, to_seconds(105.0));
// non obvious one here, sy should be the sum of all values including the resets at the time they were added.
assert_relative_eq!(summary.regress.sum().unwrap().y, 0.0 + 10.0 + 20.0 + 30.0 + 60.0 + 80.0 + 100.0);
assert_relative_eq!(summary.stats.sum().unwrap().y, 0.0 + 10.0 + 20.0 + 30.0 + 60.0 + 80.0 + 100.0);

let mut part1 = CounterSummary::new(&TSPoint{ts: 0, val:0.0}, None);
part1.add_point(&TSPoint{ts: 5, val:10.0}).unwrap();
Expand Down
13 changes: 13 additions & 0 deletions crates/stats-agg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "stats_agg"
version = "0.1.0"
authors = ["davidkohn88 <david@timescale.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0", features = ["derive"] }

[dev-dependencies]
approx = "0.4.0"
35 changes: 35 additions & 0 deletions crates/stats-agg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// stats is a small statistical regression lib that implements the Youngs-Cramer algorithm and is based on the Postgres implementation
// here for 1D regression analysis:


// And here for 2D regression analysis:
// https://github.com/postgres/postgres/blob/472e518a44eacd9caac7d618f1b6451672ca4481/src/backend/utils/adt/float.c#L3260
//

#[derive(Debug, PartialEq)]
pub enum StatsError {
DoubleOverflow,
}

#[derive(Debug, PartialEq)]
pub struct XYPair {
pub x: f64,
pub y: f64,
}

// The threshold at which we should re-calculate when we're doing the inverse transition in a windowed aggregate
// essentially, if we're shifting the data by enough as we remove a value from the aggregate we can end up with
// extra floating point error because in real arithmetic x = x + C - C
// but in floating point arithmetic, if C is large compared to x, we can accumulate significant error.
// In our case, because C is added in the normal transition or combine function, and then removed later in the
// inverse function, we have x + C and C and we are testing the following: C / (x + C) > INV_FLOATING_ERROR_THRESHOLD
// Because of the way that Postgres performs inverse functions, if we return a NULL value, the only thing that happens
// is that the partial will get re-calculated from scratch from the values in the window function. So providing
// the inverse function is purely an optimization. There are several cases where the C/(x + C) is likely to be larger
// than our threshold, but we don't care too much, namely when there are one or two values this can happen frequently,
// but then the cost of recalculation is low, compared to when there are many values in a rolling calculation, so we
// test early in the function for whether we need to recalculate and pass NULL quickly so that we don't affect those
// cases too heavily.
const INV_FLOATING_ERROR_THRESHOLD : f64 = 0.99;
pub mod stats2d;
pub mod stats1d;
Loading

0 comments on commit 6d89805

Please sign in to comment.