diff --git a/extension/src/counter_agg.rs b/extension/src/counter_agg.rs index 6c3fcbe2d..b2d21ead9 100644 --- a/extension/src/counter_agg.rs +++ b/extension/src/counter_agg.rs @@ -92,7 +92,8 @@ impl<'input> CounterSummary<'input> { prev: Option, next: Option, ) -> CounterSummary<'static> { - let prev = prev.map(|summary| { + let prev = if self.first.ts > interval_start { + prev.map(|summary| { let first = if summary.last.val > self.first.val { TSPoint{ ts: summary.last.ts, val: 0.} } else { @@ -101,7 +102,10 @@ impl<'input> CounterSummary<'input> { time_weighted_average::TimeWeightMethod::Linear .interpolate(first, Some(self.first), interval_start) .expect("unable to interpolate lower bound") - }); + }) + } else { + None + }; let next = next.map(|summary| { let last = if self.last.val > summary.first.val { @@ -1221,6 +1225,7 @@ mod tests { deltas.next().unwrap()[1].value(), Some(35. + 30. - 27.5) ); + assert!(deltas.next().is_none()); let mut rates = client.select( r#"SELECT @@ -1255,6 +1260,58 @@ mod tests { rates.next().unwrap()[1].value(), Some((35. + 30. - 27.5)/(16. * 60. * 60.)) ); + assert!(rates.next().is_none()); + }); + } + + #[pg_test] + fn interpolated_delta_with_aligned_point() { + Spi::execute(|client| { + client.select( + "CREATE TABLE test(time timestamptz, value double precision, bucket timestamptz)", + None, + None, + ); + client.select( + r#"INSERT INTO test VALUES + ('2020-1-1 10:00'::timestamptz, 10.0, '2020-1-1'::timestamptz), + ('2020-1-1 12:00'::timestamptz, 40.0, '2020-1-1'::timestamptz), + ('2020-1-1 16:00'::timestamptz, 20.0, '2020-1-1'::timestamptz), + ('2020-1-2 0:00'::timestamptz, 15.0, '2020-1-2'::timestamptz), + ('2020-1-2 12:00'::timestamptz, 50.0, '2020-1-2'::timestamptz), + ('2020-1-2 20:00'::timestamptz, 25.0, '2020-1-2'::timestamptz)"#, + None, + None, + ); + + let mut deltas = client.select( + r#"SELECT + toolkit_experimental.interpolated_delta( + agg, + bucket, + '1 day'::interval, + LAG(agg) OVER (ORDER BY bucket), + LEAD(agg) OVER (ORDER BY bucket) + ) FROM ( + SELECT bucket, counter_agg(time, value) as agg + FROM test + GROUP BY bucket + ) s + ORDER BY bucket"#, + None, + None, + ); + // Day 1, start at 10, interpolated end of day is 15 (after reset), reset at 40 and 20 + assert_eq!( + deltas.next().unwrap()[1].value(), + Some(15. + 40. + 20. - 10.) + ); + // Day 2, start is 15, end is 25, reset at 50 + assert_eq!( + deltas.next().unwrap()[1].value(), + Some(25. + 50. - 15.) + ); + assert!(deltas.next().is_none()); }); } diff --git a/extension/src/gauge_agg.rs b/extension/src/gauge_agg.rs index a18b480e6..b16602dc0 100644 --- a/extension/src/gauge_agg.rs +++ b/extension/src/gauge_agg.rs @@ -61,11 +61,15 @@ mod toolkit_experimental { let prev = prev.map(MetricSummary::from); let next = next.map(MetricSummary::from); - let prev = prev.map(|summary| - time_weighted_average::TimeWeightMethod::Linear - .interpolate(summary.last, Some(this.first), interval_start) - .expect("unable to interpolate lower bound") - ); + let prev = if this.first.ts > interval_start { + prev.map(|summary| + time_weighted_average::TimeWeightMethod::Linear + .interpolate(summary.last, Some(this.first), interval_start) + .expect("unable to interpolate lower bound") + ) + } else { + None + }; let next = next.map(|summary| time_weighted_average::TimeWeightMethod::Linear diff --git a/extension/src/time_weighted_average.rs b/extension/src/time_weighted_average.rs index 12139d06c..b10e1e058 100644 --- a/extension/src/time_weighted_average.rs +++ b/extension/src/time_weighted_average.rs @@ -48,7 +48,7 @@ impl<'input> TimeWeightSummary<'input> { prev: Option, next: Option, ) -> TimeWeightSummary<'static> { - assert!(interval_start <= self.first.ts && interval_start + interval_len >= self.last.ts); + assert!(interval_start <= self.first.ts && interval_start + interval_len > self.last.ts); let mut new_sum = self.weighted_sum; let new_start = match prev { Some(prev) if interval_start < self.first.ts => { @@ -62,7 +62,7 @@ impl<'input> TimeWeightSummary<'input> { _ => self.first }; let new_end = match next { - Some(next) if self.last.ts < interval_start + interval_len => { + Some(next) => { let new_end = self.method .interpolate(self.last, Some(next.first), interval_start + interval_len)