diff --git a/crates/time-weighted-average/src/lib.rs b/crates/time-weighted-average/src/lib.rs index 03a65b48..7a29659f 100644 --- a/crates/time-weighted-average/src/lib.rs +++ b/crates/time-weighted-average/src/lib.rs @@ -189,6 +189,16 @@ impl TimeWeightSummary { let duration = (self.last.ts - self.first.ts) as f64; Ok(self.w_sum / duration) } + + /// Evaluate the integral in microseconds. + pub fn time_weighted_integral(&self) -> f64 { + if self.last.ts == self.first.ts { + // the integral of a duration of zero width is zero + 0.0 + } else { + self.w_sum + } + } } impl TimeWeightMethod { diff --git a/extension/src/accessors.rs b/extension/src/accessors.rs index b52a86ff..87c28068 100644 --- a/extension/src/accessors.rs +++ b/extension/src/accessors.rs @@ -90,7 +90,6 @@ accessor! { last_val() } accessor! { first_time() } accessor! { last_time() } - // The rest are more complex, with String or other challenges. Leaving alone for now. pg_type! { @@ -565,3 +564,34 @@ pub fn accessor_unnest( } } } + +#[pg_schema] +pub mod toolkit_experimental { + use super::*; + + pg_type! { + #[derive(Debug)] + struct AccessorIntegral<'input> { + len: u32, + bytes: [u8; self.len], + } + } + + // FIXME string IO + ron_inout_funcs!(AccessorIntegral); + + #[pg_extern(immutable, parallel_safe, name="integral")] + pub fn accessor_integral( + unit: default!(&str, "'second'"), + ) -> AccessorIntegral<'static> { + unsafe { + flatten!{ + AccessorIntegral { + len: unit.len().try_into().unwrap(), + bytes: unit.as_bytes().into(), + } + } + } + } +} +pub use toolkit_experimental::*; diff --git a/extension/src/duration.rs b/extension/src/duration.rs new file mode 100644 index 00000000..cc252070 --- /dev/null +++ b/extension/src/duration.rs @@ -0,0 +1,75 @@ +//! Utilities for working with durations. Parsing of duration units is intended to match how +//! PostgreSQL parses duration units. Currently units longer than an hour are unsupported since +//! the length of days varies when in a timezone with daylight savings time. + +// Canonical PostgreSQL units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/include/utils/datetime.h#L48-L60 +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum DurationUnit { + // units should be ordered smallest -> largest + Microsec, + Millisec, + Second, + Minute, + Hour, +} + +impl DurationUnit { + fn microseconds(self) -> u32 { + match self { + Self::Microsec => 1, + Self::Millisec => 1000, + Self::Second => 1_000_000, + Self::Minute => 60_000_000, + Self::Hour => 3_600_000_000, + } + } + + /// Convert `amount` of a unit to another unit. + pub fn convert_unit(self, amount: f64, to: Self) -> f64 { + let microseconds = amount * (self.microseconds() as f64); + microseconds / (to.microseconds() as f64) + } + + /// Tries to get a duration unit from a string, returning `None` if no known unit matched. + pub fn from_str(s: &str) -> Option { + // Aliases for canonical units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/backend/utils/adt/datetime.c#L187-L247 + match s.to_lowercase().as_str() { + "usecond" | "microsecond" | "microseconds" | "microsecon" | "us" | "usec" + | "useconds" | "usecs" => Some(Self::Microsec), + "msecond" | "millisecond" | "milliseconds" | "millisecon" | "ms" | "msec" + | "mseconds" | "msecs" => Some(Self::Millisec), + "second" | "s" | "sec" | "seconds" | "secs" => Some(Self::Second), + "minute" | "m" | "min" | "mins" | "minutes" => Some(Self::Minute), + "hour" | "hours" | "h" | "hr" | "hrs" => Some(Self::Hour), + _ => None, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn convert_unit() { + let load_time_secs = 75.0; + let load_time_mins = + DurationUnit::convert_unit(DurationUnit::Second, load_time_secs, DurationUnit::Minute); + assert_eq!(load_time_mins, 1.25); + } + + #[test] + fn parse_unit() { + assert_eq!( + DurationUnit::from_str("usecs"), + Some(DurationUnit::Microsec) + ); + assert_eq!(DurationUnit::from_str("MINUTE"), Some(DurationUnit::Minute)); + assert_eq!( + DurationUnit::from_str("MiLlIsEcOn"), + Some(DurationUnit::Millisec) + ); + assert_eq!(DurationUnit::from_str("pahar"), None); + assert_eq!(DurationUnit::from_str(""), None); + } +} diff --git a/extension/src/lib.rs b/extension/src/lib.rs index d5d62537..03935abc 100644 --- a/extension/src/lib.rs +++ b/extension/src/lib.rs @@ -30,6 +30,7 @@ mod stabilization_info; mod raw; mod datum_utils; mod pg_any_element; +mod duration; #[cfg(any(test, feature = "pg_test"))] mod aggregate_builder_tests; diff --git a/extension/src/time_weighted_average.rs b/extension/src/time_weighted_average.rs index 9bf2315a..ad7bff35 100644 --- a/extension/src/time_weighted_average.rs +++ b/extension/src/time_weighted_average.rs @@ -10,7 +10,7 @@ use crate::{ aggregate_utils::in_aggregate_context, flatten, palloc::{Inner, Internal, InternalAsValue, ToInternal}, - pg_type, ron_inout_funcs, + pg_type, ron_inout_funcs, duration::DurationUnit, }; use tspoint::TSPoint; @@ -417,6 +417,22 @@ pub fn arrow_time_weighted_average_average( time_weighted_average_average(sketch) } +mod toolkit_experimental { + use crate::accessors::toolkit_experimental::*; + use super::*; + + #[pg_operator(immutable, parallel_safe)] + #[opname(->)] + pub fn arrow_time_weighted_average_integral( + tws: Option, + accessor: AccessorIntegral, + ) -> Option { + time_weighted_average_integral(tws, String::from_utf8_lossy(accessor.bytes.as_slice()).to_string()) + } +} + +pub use toolkit_experimental::*; + #[pg_extern(immutable, parallel_safe, name = "average")] pub fn time_weighted_average_average( @@ -438,25 +454,63 @@ pub fn time_weighted_average_average( } } -#[pg_extern(immutable, parallel_safe, name = "interpolated_average", schema = "toolkit_experimental")] -pub fn time_weighted_average_interpolated_average( +#[pg_extern(immutable, parallel_safe, name = "integral", schema = "toolkit_experimental")] +pub fn time_weighted_average_integral( + tws: Option, + unit: default!(String, "'second'"), +) -> Option { + let unit = match DurationUnit::from_str(&unit) { + Some(unit) => unit, + None => pgx::error!( + "Unrecognized duration unit: {}. Valid units are: usecond, msecond, second, minute, hour", + unit, + ), + }; + let integral_microsecs = tws?.internal().time_weighted_integral(); + Some(DurationUnit::Microsec.convert_unit(integral_microsecs, unit)) +} + +fn interpolate<'a>( tws: Option, start: crate::raw::TimestampTz, interval: crate::raw::Interval, prev: Option, next: Option, -) -> Option { - let target = match tws { +) -> Option> { + match tws { None => None, Some(tws) => { let interval = crate::datum_utils::interval_to_ms(&start, &interval); Some(tws.interpolate(start.into(), interval, prev, next)) } - }; + } +} +#[pg_extern(immutable, parallel_safe, name = "interpolated_average", schema = "toolkit_experimental")] +pub fn time_weighted_average_interpolated_average( + tws: Option, + start: crate::raw::TimestampTz, + interval: crate::raw::Interval, + prev: Option, + next: Option, +) -> Option { + let target = interpolate(tws, start, interval, prev, next); time_weighted_average_average(target) } +#[pg_extern(immutable, parallel_safe, name = "interpolated_integral", schema = "toolkit_experimental")] +pub fn time_weighted_average_interpolated_integral( + tws: Option, + start: crate::raw::TimestampTz, + interval: crate::raw::Interval, + prev: Option, + next: Option, + unit: String, +) -> Option { + let target = interpolate(tws, start, interval, prev, next); + time_weighted_average_integral(target, unit) +} + #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { @@ -478,8 +532,17 @@ mod tests { let stmt = "CREATE TABLE test(ts timestamptz, val DOUBLE PRECISION); SET TIME ZONE 'UTC'"; client.select(stmt, None, None); - // add a couple points - let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0), ('2020-01-01 00:01:00+00', 20.0)"; + // add a point + let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0)"; + client.select(stmt, None, None); + + let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'hrs') FROM test"; + assert_eq!(select_one!(client, stmt, f64), 0.0); + let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'msecond') FROM test"; + assert_eq!(select_one!(client, stmt, f64), 0.0); + + // add another point + let stmt = "INSERT INTO test VALUES('2020-01-01 00:01:00+00', 20.0)"; client.select(stmt, None, None); // test basic with 2 points @@ -519,6 +582,11 @@ mod tests { let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test"; assert!((select_one!(client, stmt, f64) - 15.0).abs() < f64::EPSILON); + let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'mins') FROM test"; + assert!((select_one!(client, stmt, f64) - 60.0).abs() < f64::EPSILON); + let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'hour') FROM test"; + assert!((select_one!(client, stmt, f64) - 1.0).abs() < f64::EPSILON); + //non-evenly spaced values let stmt = "INSERT INTO test VALUES('2020-01-01 00:08:00+00', 30.0), ('2020-01-01 00:10:00+00', 10.0), ('2020-01-01 00:10:30+00', 20.0), ('2020-01-01 00:20:00+00', 30.0)"; client.select(stmt, None, None); @@ -532,9 +600,20 @@ mod tests { // arrow syntax should be the same assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON); + let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'microseconds') FROM test"; + assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON); + let stmt = "SELECT time_weight('Linear', ts, val) \ + ->toolkit_experimental.integral('microseconds') \ + FROM test"; + // arrow syntax should be the same + assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON); + let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test"; // expected = (10 + 20 + 10 + 20 + 10*4 + 30*2 +10*.5 + 20*9.5) / 20 = 17.75 using last value and carrying for each point assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON); + + let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test"; + assert!((select_one!(client, stmt, f64) - 21300000.0).abs() < f64::EPSILON); //make sure this works with whatever ordering we throw at it let stmt = "SELECT average(time_weight('Linear', ts, val ORDER BY random())) FROM test"; @@ -542,6 +621,11 @@ mod tests { let stmt = "SELECT average(time_weight('LOCF', ts, val ORDER BY random())) FROM test"; assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON); + let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test"; + assert!((select_one!(client, stmt, f64) - 25500.0).abs() < f64::EPSILON); + let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test"; + assert!((select_one!(client, stmt, f64) - 21300.0).abs() < f64::EPSILON); + // make sure we get the same result if we do multi-level aggregation let stmt = "WITH t AS (SELECT date_trunc('minute', ts), time_weight('Linear', ts, val) AS tws FROM test GROUP BY 1) SELECT average(rollup(tws)) FROM t"; assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON); @@ -691,7 +775,7 @@ mod tests { agg, bucket, '1 day'::interval, - LAG(agg) OVER (ORDER BY bucket), + LAG(agg) OVER (ORDER BY bucket), LEAD(agg) OVER (ORDER BY bucket) ) FROM ( SELECT bucket, time_weight('LOCF', time, value) as agg @@ -702,23 +786,54 @@ mod tests { None, None, ); + let mut integrals = client.select( + r#"SELECT + toolkit_experimental.interpolated_integral( + agg, + bucket, + '1 day'::interval, + LAG(agg) OVER (ORDER BY bucket), + LEAD(agg) OVER (ORDER BY bucket), + 'hours' + ) FROM ( + SELECT bucket, time_weight('LOCF', time, value) as agg + FROM test + GROUP BY bucket + ) s + ORDER BY bucket"#, + None, + None, + ); // Day 1, 4 hours @ 10, 4 @ 40, 8 @ 20 assert_eq!( averages.next().unwrap()[1].value(), Some((4. * 10. + 4. * 40. + 8. * 20.) / 16.) ); + assert_eq!( + integrals.next().unwrap()[1].value(), + Some(4. * 10. + 4. * 40. + 8. * 20.) + ); // Day 2, 2 hours @ 20, 10 @ 15, 8 @ 50, 4 @ 25 assert_eq!( averages.next().unwrap()[1].value(), Some((2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.) / 24.) ); + assert_eq!( + integrals.next().unwrap()[1].value(), + Some(2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.) + ); // Day 3, 10 hours @ 25, 2 @ 30, 4 @ 0 assert_eq!( averages.next().unwrap()[1].value(), Some((10. * 25. + 2. * 30.) / 16.) ); + assert_eq!( + integrals.next().unwrap()[1].value(), + Some(10. * 25. + 2. * 30.) + ); assert!(averages.next().is_none()); + assert!(integrals.next().is_none()); }); } }