-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect data gaps and track elapsed time in time_weight summaries #787
Comments
An SQL based proto-type for LOCF. assuming that the last value before a gap should be carried forward for the duration of the gapDetection setting. The sum does not count the last LOCF value carried forward; this must be handled by rollup and accessor methods. An easier to use solution should store the method and gap detection in the TWS type, and refuse to combine results where the gap detection setting do not match, similar to what toolkit does today for the method parameter. --- time-weight experiment
CREATE TABLE signal (time TIMESTAMPTZ, value DOUBLE PRECISION);
INSERT INTO signal VALUES ('2022-01-01T00:00:00Z', 1), ('2022-01-01T02:00:00Z', 2), ('2022-01-01T02:30:00Z', 0);
CREATE TYPE tws AS (
-- weighted sum
ws DOUBLE PRECISION,
-- elapsed duration
d INTERVAL,
-- first point
ft TIMESTAMPTZ,
fv DOUBLE PRECISION,
-- last point
lt TIMESTAMPTZ,
lv DOUBLE PRECISION
);
CREATE OR REPLACE FUNCTION tws_add_locf(s TWS, ts TIMESTAMPTZ, value DOUBLE PRECISION, gap INTERVAL) RETURNS TWS AS $$
DECLARE
d INTERVAL;
BEGIN
IF s IS NULL
THEN
RETURN ROW(CAST(0 AS DOUBLE PRECISION), CAST('PT0S' AS INTERVAL), ts, value, ts, value);
ELSE
d := ts - s.lt;
IF d > gap
THEN
d := gap;
END IF;
RETURN ROW(
s.ws + (s.lv*EXTRACT(epoch FROM d)),
s.d + d,
s.ft,
s.fv,
ts,
value);
END IF;
END
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE OR REPLACE AGGREGATE tws_locf (ts TIMESTAMPTZ, value DOUBLE PRECISION, gap INTERVAL) (
sfunc = tws_add_locf,
stype = tws
);
CREATE OR REPLACE FUNCTION tws_combine_locf(a TWS, b TWS, gap INTERVAL) RETURNS TWS AS $$
DECLARE
d INTERVAL;
BEGIN
CASE
WHEN a is NULL
THEN
return b;
WHEN b is NULL
THEN
return a;
ELSE
d := b.ft - a.ft;
IF d > gap
THEN
d := gap;
END IF;
RETURN ROW(
a.ws + b.ws + (a.lv*EXTRACT(epoch FROM d)),
a.d + b.d + d,
a.ft,
a.fv,
b.lt,
b.lv);
END CASE;
END
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE OR REPLACE AGGREGATE rollup_locf(s tws, gap INTERVAL) (
sfunc = tws_combine_locf,
stype = tws
);
SELECT time_bucket('PT3H', time) AS start_time, tws_locf(time, value, 'PT30M'::INTERVAL) AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;
SELECT time_bucket('PT3H', start_time) as start_time2, rollup_locf(s, 'PT30M'::INTERVAL) FROM (SELECT time_bucket('PT1H', time) AS start_time, tws_locf(time, value, 'PT30M'::INTERVAL) AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2; |
Second proto-type, storing gap detection setting and method in the summary, and also also handling linear aggregation (: --- time-weight experiment
CREATE TABLE signal (time TIMESTAMPTZ, value DOUBLE PRECISION);
INSERT INTO signal VALUES ('2022-01-01T00:00:00Z', 1), ('2022-01-01T02:00:00Z', 2), ('2022-01-01T02:30:00Z', 0);
CREATE TYPE tws_interpolation AS ENUM('linear', 'locf');
DROP TYPE tws CASCADE;
CREATE TYPE tws AS (
-- Identify aggregation type.
m tws_interpolation,
gap DOUBLE PRECISION,
-- Time-weighted sum.
ws DOUBLE PRECISION,
-- Aggregated duration.
d DOUBLE PRECISION,
-- First point.
ft TIMESTAMPTZ,
fv DOUBLE PRECISION,
-- Last point.
lt TIMESTAMPTZ,
lv DOUBLE PRECISION
);
CREATE OR REPLACE FUNCTION add_to_tws(s TWS, ts TIMESTAMPTZ, value DOUBLE PRECISION, gap_seconds DOUBLE PRECISION, method tws_interpolation) RETURNS TWS AS $$
DECLARE
d DOUBLE PRECISION;
BEGIN
CASE
WHEN s IS NULL
THEN
RETURN ROW(method, gap_seconds, CAST(0 AS DOUBLE PRECISION), CAST(0 AS DOUBLE PRECISION), ts, value, ts, value);
WHEN s.m != method
THEN
RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'interpolation method must match aggregation state interpolation method';
WHEN s.gap != gap_seconds
THEN
RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'gap detection must match aggregation state';
WHEN method = 'locf'
THEN
d := EXTRACT(epoch FROM (ts - s.lt));
IF d > gap_seconds
THEN
d := gap_seconds;
END IF;
RETURN ROW(
method,
gap_seconds,
s.ws + (s.lv*d),
s.d + d,
s.ft,
s.fv,
ts,
value);
WHEN method = 'linear'
THEN
d := EXTRACT(epoch FROM (ts - s.lt));
IF d > gap_seconds
THEN
d := 0;
END IF;
RETURN ROW(
method,
gap_seconds,
s.ws + ((s.lv+value)*d/2),
s.d + d,
s.ft,
s.fv,
ts,
value);
ELSE
RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'invalid interpolation method';
END CASE;
END
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE OR REPLACE AGGREGATE time_weight_gap(ts TIMESTAMPTZ, value DOUBLE PRECISION, gap_seconds DOUBLE PRECISION, method tws_interpolation) (
sfunc = add_to_tws,
stype = tws
);
CREATE OR REPLACE FUNCTION tws_combine(a TWS, b TWS) RETURNS TWS AS $$
DECLARE
d DOUBLE PRECISION;
BEGIN
CASE
WHEN a is NULL
THEN
return b;
WHEN b is NULL
THEN
return a;
WHEN a.m != b.m
THEN
RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'summary interpolation method must match';
WHEN a.gap != b.gap
THEN
RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'summary gap detection must match';
WHEN a.m = 'locf'
THEN
d := EXTRACT(epoch FROM (b.ft - a.lt));
IF d > a.gap
THEN
d := a.gap;
END IF;
RETURN ROW(
a.m,
a.gap,
a.ws + b.ws + (a.lv*d),
a.d + b.d + d,
a.ft,
a.fv,
b.lt,
b.lv);
WHEN a.m = 'linear'
THEN
d := EXTRACT(epoch FROM (b.ft - a.lt));
IF d > a.gap
THEN
d := 0;
END IF;
RETURN ROW(
a.m,
a.gap,
a.ws + b.ws + ((a.lv+b.fv)*d/2),
a.d + b.d + d,
a.ft,
a.fv,
b.lt,
b.lv);
ELSE
RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'invalid interpolation method';
END CASE;
END
$$ LANGUAGE plpgsql IMMUTABLE;
CREATE OR REPLACE AGGREGATE rollup(s tws) (
sfunc = tws_combine,
stype = tws
);
SELECT time_bucket('PT3H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'locf') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;
SELECT time_bucket('PT3H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'linear') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;
SELECT time_bucket('PT3H', start_time) as start_time2, rollup(s) FROM (SELECT time_bucket('PT1H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'locf') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;
SELECT time_bucket('PT3H', start_time) as start_time2, rollup(s) FROM (SELECT time_bucket('PT1H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'linear') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2; |
PS! Ideally we would like a similar solution for [compact] stats aggregation with similar logic to |
What's the functionality you would like to add
We would like bette handling of explicit data gaps for
time_weight()
summaries and accessors. That is, be able to exclude gaps in data within and between time buckets for the weighted sum, and let this affect theaverage
,integral
,interpolated_average
andinterpolated_integral
accessors. For this exclusion to work, we must also track the elapsed time, as we can no longer assume it to be last_time - first_time.Gaps can be detected in different ways. For us, we would like to declare a gap when there is more than duration D between two samples. There are other possible ways to detect gaps, such as look for explicit
NULL
values (which could e.g. be inserted by pre-processing the source time-series before aggregation). We don't mind which method is used as long as there is one method to detect them. There are some details here in how to treat the last value before a gap. Is it valid for duration D? Is it valid for a 0 duration because it's followed by a gap? Is this choice affected by the choice of LOCF or LINEAR algorithm? Let's not get into that just yet, but a choice needs to be made.What matters is that when you then calculate the
TimeWeightSummary
, we would like for the detected gaps to affect the weighted sum field. We would also want to propose adding an additional field which accurately tracks the elapsed time where the time spent in gaps are omitted.Finally, let
average
,integral
,interpolated_average
andinterpolated_integral
use the elapsed time value to report their numbers. Also allow an accessor for the elapsed_time value.How would the function be used
Allow the same time-weight summaries to be used to cover more cases:
We could e.g. query for the average and elapsed time.
Why should this feature be added?
We have been struggeling to find a consistent way of dealing with gaps (not filling them, but not counting them neither), using the Timescale two-step aggregations, such as [compact_]state_agg and time_weight. Especially gaps within an evaluation bucket, but also between them (with interpolate), they must be handled correctly.
For us, we define a gap if the distance between two points is larger than a "gap detection" interval D, where D is configurable attribute per data-source. I.e. something similar to the heartbeat_agg, except we are interested in weighted sums and state aggregations, not the gaps themselves. Ideally, if using locf, we consider the last value as valid for duration D before a gap, while for linear, we would start the gap at the last point. At least this is what we thought makes the most sense..
To explain why we care, consider a time_weight solution that should work generically, either the system logs a temperature where we want a weighted average, or power consumption (W) that should be integrated into kWh. We can not rely on inserting fake values into the source signal if we want the same time weight summary to allow collecting good data for both. I.e. without elapsed time and gap-detection for for power consumption, if we consider the system to not draw power when it doesn't log data, we would need to insert 0 values before each gap in the source signal before aggregation. If we however do the same for temperature, then we draw the TWA closer to 0, which makes the result invalid.
A much better solution, in our view, is to not pre-insert any fake data, but rather add explicit gap detection and tracking of elapsed time.
What scale is this useful at?
Especially useful in general purpose applications for time-series rather than tailor made systems were you can do what you want.
Drawbacks
Handling backwards compatibility may be a challenge since the solution is proposing changes to the TimeWeightSummary structure?
Open Questions
Are any questions we'd need to address before releasing this feature?
One must decide how to detect gaps. E.g. either look for explicit NULL values or look for durations longer than a specific period D between two time-stamps.
For either alternative, one must decide how the last value before a gap should be counted. In our view, this means considering the last value before a gap valid for the gap detection duration if using the
locf
algorithm. However, this comes with additional challenges for accessors when the query is returning results in time buckets, as you would ideally want to split the final value of each bucket into one or more successive buckets, if the next gap is beyond the end of the bucket.Alternatives
Use custom SQL aggregation functions and accessors.
The text was updated successfully, but these errors were encountered: