Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixing interpolation logic #514

Merged
merged 1 commit into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions extension/src/counter_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl<'input> CounterSummary<'input> {
prev: Option<CounterSummary>,
next: Option<CounterSummary>,
) -> CounterSummary<'static> {
let prev = prev.map(|summary| {
let prev = if self.first.ts > interval_start {
epgts marked this conversation as resolved.
Show resolved Hide resolved
prev.map(|summary| {
let first = if summary.last.val > self.first.val {
TSPoint{ ts: summary.last.ts, val: 0.}
} else {
Expand All @@ -101,7 +102,10 @@ impl<'input> CounterSummary<'input> {
time_weighted_average::TimeWeightMethod::Linear
.interpolate(first, Some(self.first), interval_start)
.expect("unable to interpolate lower bound")
});
})
} else {
None
};

let next = next.map(|summary| {
let last = if self.last.val > summary.first.val {
Expand Down Expand Up @@ -1221,6 +1225,7 @@ mod tests {
deltas.next().unwrap()[1].value(),
Some(35. + 30. - 27.5)
);
assert!(deltas.next().is_none());

let mut rates = client.select(
r#"SELECT
Expand Down Expand Up @@ -1255,6 +1260,58 @@ mod tests {
rates.next().unwrap()[1].value(),
Some((35. + 30. - 27.5)/(16. * 60. * 60.))
);
assert!(rates.next().is_none());
});
}

#[pg_test]
fn interpolated_delta_with_aligned_point() {
Spi::execute(|client| {
client.select(
"CREATE TABLE test(time timestamptz, value double precision, bucket timestamptz)",
None,
None,
);
client.select(
r#"INSERT INTO test VALUES
('2020-1-1 10:00'::timestamptz, 10.0, '2020-1-1'::timestamptz),
('2020-1-1 12:00'::timestamptz, 40.0, '2020-1-1'::timestamptz),
('2020-1-1 16:00'::timestamptz, 20.0, '2020-1-1'::timestamptz),
('2020-1-2 0:00'::timestamptz, 15.0, '2020-1-2'::timestamptz),
('2020-1-2 12:00'::timestamptz, 50.0, '2020-1-2'::timestamptz),
('2020-1-2 20:00'::timestamptz, 25.0, '2020-1-2'::timestamptz)"#,
None,
None,
);

let mut deltas = client.select(
r#"SELECT
toolkit_experimental.interpolated_delta(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket)
) FROM (
SELECT bucket, counter_agg(time, value) as agg
FROM test
GROUP BY bucket
) s
ORDER BY bucket"#,
None,
None,
);
// Day 1, start at 10, interpolated end of day is 15 (after reset), reset at 40 and 20
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(15. + 40. + 20. - 10.)
);
// Day 2, start is 15, end is 25, reset at 50
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(25. + 50. - 15.)
);
assert!(deltas.next().is_none());
});
}

Expand Down
65 changes: 60 additions & 5 deletions extension/src/gauge_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ mod toolkit_experimental {
let prev = prev.map(MetricSummary::from);
let next = next.map(MetricSummary::from);

let prev = prev.map(|summary|
time_weighted_average::TimeWeightMethod::Linear
.interpolate(summary.last, Some(this.first), interval_start)
.expect("unable to interpolate lower bound")
);
let prev = if this.first.ts > interval_start {
prev.map(|summary|
time_weighted_average::TimeWeightMethod::Linear
.interpolate(summary.last, Some(this.first), interval_start)
.expect("unable to interpolate lower bound")
)
} else {
None
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a test for gauge_agg too? One of these days I need to get back to deduplicating these...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, copied the counter_agg test here. We should look at deduplicating this for the next release or the following one (probably as part of stabilizing gauge_agg).


let next = next.map(|summary|
time_weighted_average::TimeWeightMethod::Linear
Expand Down Expand Up @@ -1052,6 +1056,57 @@ mod tests {
});
}

#[pg_test]
fn guage_agg_interpolated_delta_with_aligned_point() {
Spi::execute(|client| {
client.select(
"CREATE TABLE test(time timestamptz, value double precision, bucket timestamptz)",
None,
None,
);
client.select(
r#"INSERT INTO test VALUES
('2020-1-1 10:00'::timestamptz, 10.0, '2020-1-1'::timestamptz),
('2020-1-1 12:00'::timestamptz, 40.0, '2020-1-1'::timestamptz),
('2020-1-1 16:00'::timestamptz, 20.0, '2020-1-1'::timestamptz),
('2020-1-2 0:00'::timestamptz, 15.0, '2020-1-2'::timestamptz),
('2020-1-2 12:00'::timestamptz, 50.0, '2020-1-2'::timestamptz),
('2020-1-2 20:00'::timestamptz, 25.0, '2020-1-2'::timestamptz)"#,
None,
None,
);

let mut deltas = client.select(
r#"SELECT
toolkit_experimental.interpolated_delta(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket)
) FROM (
SELECT bucket, toolkit_experimental.gauge_agg(time, value) as agg
FROM test
GROUP BY bucket
) s
ORDER BY bucket"#,
None,
None,
);
// Day 1, start at 10, interpolated end of day is 15 (after reset)
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(15. - 10.)
);
// Day 2, start is 15, end is 25
assert_eq!(
deltas.next().unwrap()[1].value(),
Some(25. - 15.)
);
assert!(deltas.next().is_none());
});
}

// TODO https://github.com/timescale/timescaledb-toolkit/issues/362
// TODO why doesn't this catch the error under github actions?
// #[pg_test(error = "returned Datum was NULL")]
Expand Down
4 changes: 2 additions & 2 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<'input> TimeWeightSummary<'input> {
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
) -> TimeWeightSummary<'static> {
assert!(interval_start <= self.first.ts && interval_start + interval_len >= self.last.ts);
assert!(interval_start <= self.first.ts && interval_start + interval_len > self.last.ts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test still passes without this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just fixing the assert to reflect that the aggregate is exclusive on the upper bound. If this assert fails, then the caller is trying to call an interpolated function using an interval that's smaller than their bucketing interval. It might still be useful to add a test that we handle invalid input correctly, but pg_test still has trouble with failure tests.

let mut new_sum = self.weighted_sum;
let new_start = match prev {
Some(prev) if interval_start < self.first.ts => {
Expand All @@ -62,7 +62,7 @@ impl<'input> TimeWeightSummary<'input> {
_ => self.first
};
let new_end = match next {
Some(next) if self.last.ts < interval_start + interval_len => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed if should never have failed on valid input (and with the assert fixed is guaranteed to not fail).

Some(next) => {
let new_end =
self.method
.interpolate(self.last, Some(next.first), interval_start + interval_len)
Expand Down