Skip to content

Commit

Permalink
Add (interpolated_)integral for time_weight
Browse files Browse the repository at this point in the history
Implements #455
  • Loading branch information
syvb committed Sep 15, 2022
1 parent b743334 commit 367d3e9
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 10 deletions.
10 changes: 10 additions & 0 deletions crates/time-weighted-average/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ impl TimeWeightSummary {
let duration = (self.last.ts - self.first.ts) as f64;
Ok(self.w_sum / duration)
}

/// Evaluate the integral in microseconds.
pub fn time_weighted_integral(&self) -> f64 {
if self.last.ts == self.first.ts {
// the integral of a duration of zero width is zero
0.0
} else {
self.w_sum
}
}
}

impl TimeWeightMethod {
Expand Down
29 changes: 29 additions & 0 deletions extension/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,32 @@ pub fn accessor_unnest() -> AccessorUnnest<'static> {
}
}
}

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

pg_type! {
#[derive(Debug)]
struct AccessorIntegral<'input> {
len: u32,
bytes: [u8; self.len],
}
}

// FIXME string IO
ron_inout_funcs!(AccessorIntegral);

#[pg_extern(immutable, parallel_safe, name = "integral")]
pub fn accessor_integral(unit: default!(&str, "'second'")) -> AccessorIntegral<'static> {
unsafe {
flatten! {
AccessorIntegral {
len: unit.len().try_into().unwrap(),
bytes: unit.as_bytes().into(),
}
}
}
}
}
pub use toolkit_experimental::*;
75 changes: 75 additions & 0 deletions extension/src/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Utilities for working with durations. Parsing of duration units is intended to match how
//! PostgreSQL parses duration units. Currently units longer than an hour are unsupported since
//! the length of days varies when in a timezone with daylight savings time.
// Canonical PostgreSQL units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/include/utils/datetime.h#L48-L60
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum DurationUnit {
// units should be ordered smallest -> largest
Microsec,
Millisec,
Second,
Minute,
Hour,
}

impl DurationUnit {
fn microseconds(self) -> u32 {
match self {
Self::Microsec => 1,
Self::Millisec => 1000,
Self::Second => 1_000_000,
Self::Minute => 60_000_000,
Self::Hour => 3_600_000_000,
}
}

/// Convert `amount` of a unit to another unit.
pub fn convert_unit(self, amount: f64, to: Self) -> f64 {
let microseconds = amount * (self.microseconds() as f64);
microseconds / (to.microseconds() as f64)
}

/// Tries to get a duration unit from a string, returning `None` if no known unit matched.
pub fn from_str(s: &str) -> Option<Self> {
// Aliases for canonical units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/backend/utils/adt/datetime.c#L187-L247
match s.to_lowercase().as_str() {
"usecond" | "microsecond" | "microseconds" | "microsecon" | "us" | "usec"
| "useconds" | "usecs" => Some(Self::Microsec),
"msecond" | "millisecond" | "milliseconds" | "millisecon" | "ms" | "msec"
| "mseconds" | "msecs" => Some(Self::Millisec),
"second" | "s" | "sec" | "seconds" | "secs" => Some(Self::Second),
"minute" | "m" | "min" | "mins" | "minutes" => Some(Self::Minute),
"hour" | "hours" | "h" | "hr" | "hrs" => Some(Self::Hour),
_ => None,
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn convert_unit() {
let load_time_secs = 75.0;
let load_time_mins =
DurationUnit::convert_unit(DurationUnit::Second, load_time_secs, DurationUnit::Minute);
assert_eq!(load_time_mins, 1.25);
}

#[test]
fn parse_unit() {
assert_eq!(
DurationUnit::from_str("usecs"),
Some(DurationUnit::Microsec)
);
assert_eq!(DurationUnit::from_str("MINUTE"), Some(DurationUnit::Minute));
assert_eq!(
DurationUnit::from_str("MiLlIsEcOn"),
Some(DurationUnit::Millisec)
);
assert_eq!(DurationUnit::from_str("pahar"), None);
assert_eq!(DurationUnit::from_str(""), None);
}
}
1 change: 1 addition & 0 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod utilities;

mod aggregate_utils;
mod datum_utils;
mod duration;
mod palloc;
mod pg_any_element;
mod raw;
Expand Down
143 changes: 133 additions & 10 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use serde::{Deserialize, Serialize};

use crate::{
accessors::{
AccessorAverage, AccessorFirstTime, AccessorFirstVal, AccessorLastTime, AccessorLastVal,
toolkit_experimental, AccessorAverage, AccessorFirstTime, AccessorFirstVal,
AccessorLastTime, AccessorLastVal,
},
aggregate_utils::in_aggregate_context,
duration::DurationUnit,
flatten,
palloc::{Inner, Internal, InternalAsValue, ToInternal},
pg_type, ron_inout_funcs,
Expand Down Expand Up @@ -401,6 +403,18 @@ pub fn arrow_time_weighted_average_average(
time_weighted_average_average(sketch)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_time_weighted_average_integral(
tws: Option<TimeWeightSummary>,
accessor: toolkit_experimental::AccessorIntegral,
) -> Option<f64> {
time_weighted_average_integral(
tws,
String::from_utf8_lossy(accessor.bytes.as_slice()).to_string(),
)
}

#[pg_extern(immutable, parallel_safe, name = "average")]
pub fn time_weighted_average_average(tws: Option<TimeWeightSummary>) -> Option<f64> {
match tws {
Expand All @@ -422,27 +436,75 @@ pub fn time_weighted_average_average(tws: Option<TimeWeightSummary>) -> Option<f
#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_average",
name = "integral",
schema = "toolkit_experimental"
)]
pub fn time_weighted_average_interpolated_average(
pub fn time_weighted_average_integral(
tws: Option<TimeWeightSummary>,
unit: default!(String, "'second'"),
) -> Option<f64> {
let unit = match DurationUnit::from_str(&unit) {
Some(unit) => unit,
None => pgx::error!(
"Unrecognized duration unit: {}. Valid units are: usecond, msecond, second, minute, hour",
unit,
),
};
let integral_microsecs = tws?.internal().time_weighted_integral();
Some(DurationUnit::Microsec.convert_unit(integral_microsecs, unit))
}

fn interpolate<'a>(
tws: Option<TimeWeightSummary>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
) -> Option<f64> {
let target = match tws {
) -> Option<TimeWeightSummary<'a>> {
match tws {
None => None,
Some(tws) => {
let interval = crate::datum_utils::interval_to_ms(&start, &interval);
Some(tws.interpolate(start.into(), interval, prev, next))
}
};
}
}

#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_average",
schema = "toolkit_experimental"
)]
pub fn time_weighted_average_interpolated_average(
tws: Option<TimeWeightSummary>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
) -> Option<f64> {
let target = interpolate(tws, start, interval, prev, next);
time_weighted_average_average(target)
}

#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_integral",
schema = "toolkit_experimental"
)]
pub fn time_weighted_average_interpolated_integral(
tws: Option<TimeWeightSummary>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
unit: String,
) -> Option<f64> {
let target = interpolate(tws, start, interval, prev, next);
time_weighted_average_integral(target, unit)
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand All @@ -465,8 +527,17 @@ mod tests {
"CREATE TABLE test(ts timestamptz, val DOUBLE PRECISION); SET TIME ZONE 'UTC'";
client.select(stmt, None, None);

// add a couple points
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0), ('2020-01-01 00:01:00+00', 20.0)";
// add a point
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0)";
client.select(stmt, None, None);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'hrs') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);

// add another point
let stmt = "INSERT INTO test VALUES('2020-01-01 00:01:00+00', 20.0)";
client.select(stmt, None, None);

// test basic with 2 points
Expand Down Expand Up @@ -506,6 +577,11 @@ mod tests {
let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
assert!((select_one!(client, stmt, f64) - 15.0).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'mins') FROM test";
assert!((select_one!(client, stmt, f64) - 60.0).abs() < f64::EPSILON);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'hour') FROM test";
assert!((select_one!(client, stmt, f64) - 1.0).abs() < f64::EPSILON);

//non-evenly spaced values
let stmt = "INSERT INTO test VALUES('2020-01-01 00:08:00+00', 30.0), ('2020-01-01 00:10:00+00', 10.0), ('2020-01-01 00:10:30+00', 20.0), ('2020-01-01 00:20:00+00', 30.0)";
client.select(stmt, None, None);
Expand All @@ -519,16 +595,32 @@ mod tests {
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
let stmt = "SELECT time_weight('Linear', ts, val) \
->toolkit_experimental.integral('microseconds') \
FROM test";
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);

let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
// expected = (10 + 20 + 10 + 20 + 10*4 + 30*2 +10*.5 + 20*9.5) / 20 = 17.75 using last value and carrying for each point
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 21300000.0).abs() < f64::EPSILON);

//make sure this works with whatever ordering we throw at it
let stmt = "SELECT average(time_weight('Linear', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
let stmt = "SELECT average(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500.0).abs() < f64::EPSILON);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 21300.0).abs() < f64::EPSILON);

// make sure we get the same result if we do multi-level aggregation
let stmt = "WITH t AS (SELECT date_trunc('minute', ts), time_weight('Linear', ts, val) AS tws FROM test GROUP BY 1) SELECT average(rollup(tws)) FROM t";
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
Expand Down Expand Up @@ -720,8 +812,8 @@ mod tests {
toolkit_experimental.interpolated_average(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket)
) FROM (
SELECT bucket, time_weight('LOCF', time, value) as agg
Expand All @@ -732,23 +824,54 @@ mod tests {
None,
None,
);
let mut integrals = client.select(
r#"SELECT
toolkit_experimental.interpolated_integral(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket),
'hours'
) FROM (
SELECT bucket, time_weight('LOCF', time, value) as agg
FROM test
GROUP BY bucket
) s
ORDER BY bucket"#,
None,
None,
);

// Day 1, 4 hours @ 10, 4 @ 40, 8 @ 20
assert_eq!(
averages.next().unwrap()[1].value(),
Some((4. * 10. + 4. * 40. + 8. * 20.) / 16.)
);
assert_eq!(
integrals.next().unwrap()[1].value(),
Some(4. * 10. + 4. * 40. + 8. * 20.)
);
// Day 2, 2 hours @ 20, 10 @ 15, 8 @ 50, 4 @ 25
assert_eq!(
averages.next().unwrap()[1].value(),
Some((2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.) / 24.)
);
assert_eq!(
integrals.next().unwrap()[1].value(),
Some(2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.)
);
// Day 3, 10 hours @ 25, 2 @ 30, 4 @ 0
assert_eq!(
averages.next().unwrap()[1].value(),
Some((10. * 25. + 2. * 30.) / 16.)
);
assert_eq!(
integrals.next().unwrap()[1].value(),
Some(10. * 25. + 2. * 30.)
);
assert!(averages.next().is_none());
assert!(integrals.next().is_none());
});
}
}

0 comments on commit 367d3e9

Please sign in to comment.