diff --git a/Changelog.md b/Changelog.md index 3dda7e7e..eb3b6bb3 100644 --- a/Changelog.md +++ b/Changelog.md @@ -17,6 +17,8 @@ This changelog should be updated as part of a PR if the work is worth noting (mo - [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Support specifying a range to `duration_in` to specify a time range to get states in for state aggregates - [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Removed `next` parameter from interpolated state aggregate functions - [#692](https://github.com/timescale/timescaledb-toolkit/pull/692): Renamed `state_agg` to `compact_state_agg` and `timeline_agg` to `state_agg` +- [#699](https://github.com/timescale/timescaledb-toolkit/pull/699): `interpolated_duration_in`/`duration_in`/`interpolated_state_periods`/`state_periods` have the first two arguments swapped: now the aggregate is first and the state is second +- [#699](https://github.com/timescale/timescaledb-toolkit/pull/699): `into_values`/`into_int_values` now returns a table with intervals instead of microseconds #### Shout-outs diff --git a/docs/state_agg.md b/docs/state_agg.md index dbf10e99..1126fbf4 100644 --- a/docs/state_agg.md +++ b/docs/state_agg.md @@ -46,7 +46,7 @@ INSERT INTO states_test_5 VALUES Compute the amount of time spent in a state as INTERVAL. ```SQL -SELECT toolkit_experimental.duration_in('ERROR', toolkit_experimental.compact_state_agg(ts, state)) FROM states_test; +SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'ERROR') FROM states_test; ``` ```output interval @@ -54,7 +54,7 @@ SELECT toolkit_experimental.duration_in('ERROR', toolkit_experimental.compact_st 00:00:03 ``` ```SQL -SELECT toolkit_experimental.duration_in(2, toolkit_experimental.compact_state_agg(ts, state)) FROM states_test_4; +SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 2) FROM states_test_4; ``` ```output interval @@ -67,7 +67,7 @@ Extract as number of seconds: ```SQL SELECT EXTRACT(epoch FROM - toolkit_experimental.duration_in('ERROR', toolkit_experimental.compact_state_agg(ts, state)) + toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'ERROR') )::INTEGER FROM states_test; ``` @@ -79,7 +79,7 @@ FROM states_test; #### duration_in for a range ```SQL -SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, state), '2020-01-01 00:01:00+00', '2 days') FROM states_test; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'OK', '2020-01-01 00:01:00+00', '2 days') FROM states_test; ``` ```output duration_in @@ -87,7 +87,7 @@ SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, 00:00:57 ``` ```SQL -SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, state), '2020-01-01 00:01:00+00', NULL) FROM states_test; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'OK', '2020-01-01 00:01:00+00', NULL) FROM states_test; ``` ```output duration_in @@ -95,7 +95,7 @@ SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, 00:00:57 ``` ```SQL -SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, state), '2020-01-01 00:01:00+00') FROM states_test; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'OK', '2020-01-01 00:01:00+00') FROM states_test; ``` ```output duration_in @@ -103,7 +103,7 @@ SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, 00:00:57 ``` ```SQL -SELECT toolkit_experimental.duration_in(51351, toolkit_experimental.state_agg(ts, state), '2020-01-01 00:01:00+00', '2 days') FROM states_test_4; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 51351, '2020-01-01 00:01:00+00', '2 days') FROM states_test_4; ``` ```output duration_in @@ -111,7 +111,7 @@ SELECT toolkit_experimental.duration_in(51351, toolkit_experimental.state_agg(ts 00:00:57 ``` ```SQL -SELECT toolkit_experimental.duration_in(51351, toolkit_experimental.state_agg(ts, state), '2020-01-01 00:01:00+00', NULL) FROM states_test_4; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 51351, '2020-01-01 00:01:00+00', NULL) FROM states_test_4; ``` ```output duration_in @@ -120,7 +120,7 @@ SELECT toolkit_experimental.duration_in(51351, toolkit_experimental.state_agg(ts ``` ```SQL -SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, state), '2020-01-01 00:00:15+00', '30 seconds') FROM states_test; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'OK', '2020-01-01 00:00:15+00', '30 seconds') FROM states_test; ``` ```output duration_in @@ -129,7 +129,7 @@ SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, ``` ```SQL -SELECT toolkit_experimental.duration_in('OK', toolkit_experimental.state_agg(ts, state), '2020-01-01 00:00:15+00', '1 minute 1 second') FROM states_test; +SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'OK', '2020-01-01 00:00:15+00', '1 minute 1 second') FROM states_test; ``` ```output duration_in @@ -147,23 +147,23 @@ SELECT state, duration FROM toolkit_experimental.into_values( ```output state | duration -------+----------- - ERROR | 3000000 - OK | 106000000 - START | 11000000 - STOP | 0 + ERROR | 00:00:03 + OK | 00:01:46 + START | 00:00:11 + STOP | 00:00:00 ``` ```SQL SELECT state, duration FROM toolkit_experimental.into_int_values( - (SELECT toolkit_experimental.compact_state_agg(ts, state) FROM states_test_4)) + (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_4)) ORDER BY state, duration; ``` ```output state | duration -------+----------- - -9 | 0 - 2 | 3000000 - 4 | 11000000 -51351 | 106000000 + -9 | 00:00:00 + 2 | 00:00:03 + 4 | 00:00:11 +51351 | 00:01:46 ``` ### state_timeline @@ -217,8 +217,8 @@ START | 2019-12-31 00:00:00+00 | 2019-12-31 00:00:11+00 ```SQL SELECT start_time, end_time FROM toolkit_experimental.state_periods( - 'OK', - (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test) + (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test), + 'OK' ) ORDER BY start_time; ``` @@ -232,8 +232,8 @@ start_time | end_time ```SQL SELECT start_time, end_time FROM toolkit_experimental.state_periods( - 51351, - (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_4) + (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_4), + 51351 ) ORDER BY start_time; ``` @@ -247,8 +247,8 @@ start_time | end_time ```SQL SELECT start_time, end_time FROM toolkit_experimental.state_periods( - 'ANYTHING', - (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test) + (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test), + 'ANYTHING' ) ORDER BY start_time; ``` @@ -338,8 +338,8 @@ ERROR | 2020-01-01 00:01:00+00 | 2020-01-01 00:01:03+00 ```SQL SELECT start_time, end_time FROM toolkit_experimental.interpolated_state_periods( - 'OK', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test), + 'OK', '2019-12-31', '1 days', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_3) ) @@ -354,8 +354,8 @@ start_time | end_time ```SQL SELECT start_time, end_time FROM toolkit_experimental.interpolated_state_periods( - 'START', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test), + 'START', '2019-12-31', '5 days', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_3) ) @@ -369,8 +369,8 @@ start_time | end_time ```SQL SELECT start_time, end_time FROM toolkit_experimental.interpolated_state_periods( - 'STOP', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test), + 'STOP', '2019-12-31', '1 days', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_2) ) @@ -385,8 +385,8 @@ start_time | end_time ```SQL SELECT start_time, end_time FROM toolkit_experimental.interpolated_state_periods( - 'STOP', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test), + 'STOP', '2019-12-31', '5 days', (SELECT toolkit_experimental.state_agg(ts, state) FROM states_test_2) ) @@ -408,8 +408,8 @@ WITH buckets AS (SELECT FROM states_test GROUP BY date_trunc('minute', ts)) SELECT toolkit_experimental.duration_in( - 'START', - toolkit_experimental.rollup(buckets.sa) + toolkit_experimental.rollup(buckets.sa), + 'START' ) FROM buckets; ``` @@ -426,8 +426,8 @@ WITH buckets AS (SELECT FROM states_test GROUP BY date_trunc('minute', ts)) SELECT toolkit_experimental.duration_in( - 'OK', - toolkit_experimental.rollup(buckets.sa) + toolkit_experimental.rollup(buckets.sa), + 'OK' ) FROM buckets; ``` diff --git a/extension/src/raw.rs b/extension/src/raw.rs index 099b4b67..40daf8fd 100644 --- a/extension/src/raw.rs +++ b/extension/src/raw.rs @@ -128,12 +128,27 @@ impl From for Interval { time: interval, ..Default::default() }; - unsafe { + let interval = unsafe { let ptr = pg_sys::palloc(std::mem::size_of::()) as *mut pg_sys::Interval; *ptr = interval; Interval(pg_sys::Datum::from(ptr)) - } + }; + // Now we have a valid Interval in at least one sense. But we have the + // microseconds in the `time` field and `day` and `month` are both 0, + // which is legal. However, directly converting one of these to TEXT + // comes out quite ugly if the number of microseconds is greater than 1 day: + // 8760:02:00 + // Should be: + // 365 days 00:02:00 + // How does postgresql do it? It happens in src/backend/utils/adt/timestamp.c:timestamp_mi: + // result->time = dt1 - dt2; + // result = DatumGetIntervalP(DirectFunctionCall1(interval_justify_hours, + // IntervalPGetDatum(result))); + // So if we want the same behavior, we need to call interval_justify_hours too: + let function_args = vec![Some(pg_sys::Datum::from(interval))]; + unsafe { pgx::direct_function_call(pg_sys::interval_justify_hours, function_args) } + .expect("interval_justify_hours does not return None") } } diff --git a/extension/src/state_aggregate.rs b/extension/src/state_aggregate.rs index fab49ddc..3ab5ed4f 100644 --- a/extension/src/state_aggregate.rs +++ b/extension/src/state_aggregate.rs @@ -736,8 +736,8 @@ impl CompactStateAggTransState { } fn duration_in_inner<'a>( - state: Option, aggregate: Option>, + state: Option, range: Option<(i64, Option)>, // start and interval ) -> crate::raw::Interval { let time: i64 = if let Some((start, interval)) = range { @@ -776,43 +776,18 @@ fn duration_in_inner<'a>( } else { state.and_then(|state| aggregate?.get(state)).unwrap_or(0) }; - let interval = pg_sys::Interval { - time, - ..Default::default() - }; - let interval: *const pg_sys::Interval = to_palloc(interval); - // Now we have a valid Interval in at least one sense. But we have the - // microseconds in the `time` field and `day` and `month` are both 0, - // which is legal. However, directly converting one of these to TEXT - // comes out quite ugly if the number of microseconds is greater than 1 day: - // 8760:02:00 - // Should be: - // 365 days 00:02:00 - // How does postgresql do it? It happens in src/backend/utils/adt/timestamp.c:timestamp_mi: - // result->time = dt1 - dt2; - // result = DatumGetIntervalP(DirectFunctionCall1(interval_justify_hours, - // IntervalPGetDatum(result))); - // So if we want the same behavior, we need to call interval_justify_hours too: - let function_args = vec![Some(pg_sys::Datum::from(interval))]; - unsafe { pgx::direct_function_call(pg_sys::interval_justify_hours, function_args) } - .expect("interval_justify_hours does not return None") + time.into() } #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] -pub fn duration_in<'a>( - state: String, - aggregate: Option>, -) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() +pub fn duration_in<'a>(agg: Option>, state: String) -> crate::raw::Interval { + if let Some(ref agg) = agg { + agg.assert_str() }; - duration_in_inner( - aggregate.as_ref().and_then(|aggregate| { - StateEntry::try_from_existing_str(aggregate.states_as_str(), &state) - }), - aggregate, - None, - ) + let state = agg + .as_ref() + .and_then(|agg| StateEntry::try_from_existing_str(agg.states_as_str(), &state)); + duration_in_inner(agg, state, None) } #[pg_extern( @@ -821,14 +796,11 @@ pub fn duration_in<'a>( name = "duration_in", schema = "toolkit_experimental" )] -pub fn duration_in_int<'a>( - state: i64, - aggregate: Option>, -) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() +pub fn duration_in_int<'a>(agg: Option>, state: i64) -> crate::raw::Interval { + if let Some(ref agg) = agg { + agg.assert_int() }; - duration_in_inner(Some(StateEntry::from_integer(state)), aggregate, None) + duration_in_inner(agg, Some(StateEntry::from_integer(state)), None) } #[pg_extern( @@ -837,11 +809,11 @@ pub fn duration_in_int<'a>( name = "duration_in", schema = "toolkit_experimental" )] -pub fn duration_in_tl<'a>(state: String, aggregate: Option>) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() +pub fn duration_in_tl<'a>(agg: Option>, state: String) -> crate::raw::Interval { + if let Some(ref agg) = agg { + agg.assert_str() }; - duration_in(state, aggregate.map(StateAgg::as_compact_state_agg)) + duration_in(agg.map(StateAgg::as_compact_state_agg), state) } #[pg_extern( @@ -850,13 +822,13 @@ pub fn duration_in_tl<'a>(state: String, aggregate: Option>) -> cra name = "duration_in", schema = "toolkit_experimental" )] -pub fn duration_in_tl_int<'a>(state: i64, aggregate: Option>) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() +pub fn duration_in_tl_int<'a>(agg: Option>, state: i64) -> crate::raw::Interval { + if let Some(ref agg) = agg { + agg.assert_int() }; duration_in_inner( + agg.map(StateAgg::as_compact_state_agg), Some(StateEntry::from_integer(state)), - aggregate.map(StateAgg::as_compact_state_agg), None, ) } @@ -868,24 +840,21 @@ pub fn duration_in_tl_int<'a>(state: i64, aggregate: Option>) -> cr schema = "toolkit_experimental" )] pub fn duration_in_range<'a>( + agg: Option>, state: String, - aggregate: Option>, start: TimestampTz, interval: default!(Option, "NULL"), ) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() + if let Some(ref agg) = agg { + agg.assert_str() }; - let aggregate = aggregate.map(StateAgg::as_compact_state_agg); + let agg = agg.map(StateAgg::as_compact_state_agg); let interval = interval.map(|interval| crate::datum_utils::interval_to_ms(&start, &interval)); let start = start.into(); - duration_in_inner( - aggregate.as_ref().and_then(|aggregate| { - StateEntry::try_from_existing_str(aggregate.states_as_str(), &state) - }), - aggregate, - Some((start, interval)), - ) + let state = agg + .as_ref() + .and_then(|agg| StateEntry::try_from_existing_str(agg.states_as_str(), &state)); + duration_in_inner(agg, state, Some((start, interval))) } #[pg_extern( @@ -895,26 +864,26 @@ pub fn duration_in_range<'a>( schema = "toolkit_experimental" )] pub fn duration_in_range_int<'a>( + agg: Option>, state: i64, - aggregate: Option>, start: TimestampTz, interval: default!(Option, "NULL"), ) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() + if let Some(ref agg) = agg { + agg.assert_int() }; let interval = interval.map(|interval| crate::datum_utils::interval_to_ms(&start, &interval)); let start = start.into(); duration_in_inner( + agg.map(StateAgg::as_compact_state_agg), Some(StateEntry::from_integer(state)), - aggregate.map(StateAgg::as_compact_state_agg), Some((start, interval)), ) } fn interpolated_duration_in_inner<'a>( - state: Option, aggregate: Option>, + state: Option, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, @@ -948,24 +917,24 @@ fn interpolated_duration_in_inner<'a>( let new_agg = aggregate.interpolate(start, interval, prev); let state_entry = state.and_then(|state| state.try_existing_entry(new_agg.states_as_str())); - duration_in_inner(state_entry, Some(new_agg), range) + duration_in_inner(Some(new_agg), state_entry, range) } } } #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn interpolated_duration_in<'a>( + agg: Option>, state: String, - aggregate: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, ) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() + if let Some(ref agg) = agg { + agg.assert_str() }; interpolated_duration_in_inner( + agg, Some(MaterializedState::String(state)), - aggregate, start, interval, prev, @@ -979,18 +948,18 @@ pub fn interpolated_duration_in<'a>( schema = "toolkit_experimental" )] pub fn interpolated_duration_in_tl<'a>( + agg: Option>, state: String, - aggregate: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, ) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() + if let Some(ref agg) = agg { + agg.assert_str() }; interpolated_duration_in( + agg.map(StateAgg::as_compact_state_agg), state, - aggregate.map(StateAgg::as_compact_state_agg), start, interval, prev.map(StateAgg::as_compact_state_agg), @@ -1004,18 +973,18 @@ pub fn interpolated_duration_in_tl<'a>( name = "interpolated_duration_in" )] pub fn interpolated_duration_in_int<'a>( + agg: Option>, state: i64, - aggregate: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, ) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() + if let Some(ref agg) = agg { + agg.assert_int() }; interpolated_duration_in_inner( + agg, Some(MaterializedState::Integer(state)), - aggregate, start, interval, prev, @@ -1029,18 +998,18 @@ pub fn interpolated_duration_in_int<'a>( schema = "toolkit_experimental" )] pub fn interpolated_duration_in_tl_int<'a>( + agg: Option>, state: i64, - aggregate: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, ) -> crate::raw::Interval { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() + if let Some(ref agg) = agg { + agg.assert_int() }; interpolated_duration_in_int( + agg.map(StateAgg::as_compact_state_agg), state, - aggregate.map(StateAgg::as_compact_state_agg), start, interval, prev.map(StateAgg::as_compact_state_agg), @@ -1050,6 +1019,8 @@ pub fn interpolated_duration_in_tl_int<'a>( fn duration_in_bad_args_inner() -> ! { panic!("The start and interval parameters cannot be used for duration_in with a compact state aggregate") } + +#[allow(unused_variables)] // can't underscore-prefix since argument names are used by pgx #[pg_extern( immutable, parallel_safe, @@ -1057,13 +1028,14 @@ fn duration_in_bad_args_inner() -> ! { schema = "toolkit_experimental" )] pub fn duration_in_bad_args<'a>( - _state: String, - _aggregate: Option>, - _start: TimestampTz, - _interval: crate::raw::Interval, + agg: Option>, + state: String, + start: TimestampTz, + interval: crate::raw::Interval, ) -> crate::raw::Interval { duration_in_bad_args_inner() } +#[allow(unused_variables)] // can't underscore-prefix since argument names are used by pgx #[pg_extern( immutable, parallel_safe, @@ -1071,10 +1043,10 @@ pub fn duration_in_bad_args<'a>( schema = "toolkit_experimental" )] pub fn duration_in_int_bad_args<'a>( - _state: i64, - _aggregate: Option>, - _start: TimestampTz, - _interval: crate::raw::Interval, + agg: Option>, + state: i64, + start: TimestampTz, + interval: crate::raw::Interval, ) -> crate::raw::Interval { duration_in_bad_args_inner() } @@ -1082,26 +1054,38 @@ pub fn duration_in_int_bad_args<'a>( #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn into_values<'a>( agg: CompactStateAgg<'a>, -) -> TableIterator<'a, (pgx::name!(state, String), pgx::name!(duration, i64))> { +) -> TableIterator< + 'a, + ( + pgx::name!(state, String), + pgx::name!(duration, crate::raw::Interval), + ), +> { agg.assert_str(); let states: String = agg.states_as_str().to_owned(); - TableIterator::new( - agg.durations - .clone() - .into_iter() - .map(move |record| (record.state.as_str(&states).to_string(), record.duration)), - ) + TableIterator::new(agg.durations.clone().into_iter().map(move |record| { + ( + record.state.as_str(&states).to_string(), + record.duration.into(), + ) + })) } #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn into_int_values<'a>( agg: CompactStateAgg<'a>, -) -> TableIterator<'a, (pgx::name!(state, i64), pgx::name!(duration, i64))> { +) -> TableIterator< + 'a, + ( + pgx::name!(state, i64), + pgx::name!(duration, crate::raw::Interval), + ), +> { agg.assert_int(); TableIterator::new( agg.durations .clone() .into_iter() - .map(move |record| (record.state.as_integer(), record.duration)) + .map(move |record| (record.state.as_integer(), record.duration.into())) .collect::>() .into_iter(), // make map panic now instead of at iteration time ) @@ -1113,10 +1097,16 @@ pub fn into_int_values<'a>( schema = "toolkit_experimental" )] pub fn into_values_tl<'a>( - aggregate: StateAgg<'a>, -) -> TableIterator<'a, (pgx::name!(state, String), pgx::name!(duration, i64))> { - aggregate.assert_str(); - into_values(aggregate.as_compact_state_agg()) + agg: StateAgg<'a>, +) -> TableIterator< + 'a, + ( + pgx::name!(state, String), + pgx::name!(duration, crate::raw::Interval), + ), +> { + agg.assert_str(); + into_values(agg.as_compact_state_agg()) } #[pg_extern( immutable, @@ -1125,10 +1115,16 @@ pub fn into_values_tl<'a>( schema = "toolkit_experimental" )] pub fn into_values_tl_int<'a>( - aggregate: StateAgg<'a>, -) -> TableIterator<'a, (pgx::name!(state, i64), pgx::name!(duration, i64))> { - aggregate.assert_int(); - into_int_values(aggregate.as_compact_state_agg()) + agg: StateAgg<'a>, +) -> TableIterator< + 'a, + ( + pgx::name!(state, i64), + pgx::name!(duration, crate::raw::Interval), + ), +> { + agg.assert_int(); + into_int_values(agg.as_compact_state_agg()) } fn state_timeline_inner<'a>( @@ -1220,7 +1216,7 @@ pub fn state_int_timeline<'a>( #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn interpolated_state_timeline<'a>( - aggregate: Option>, + agg: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, @@ -1232,17 +1228,17 @@ pub fn interpolated_state_timeline<'a>( pgx::name!(end_time, TimestampTz), ), > { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() + if let Some(ref agg) = agg { + agg.assert_str() }; - match aggregate { + match agg { None => pgx::error!( "when interpolating data between grouped data, all groups must contain some data" ), - Some(aggregate) => { + Some(agg) => { let interval = crate::datum_utils::interval_to_ms(&start, &interval); TableIterator::new( - state_timeline_inner(aggregate.as_compact_state_agg().interpolate( + state_timeline_inner(agg.as_compact_state_agg().interpolate( start.into(), interval, prev.map(StateAgg::as_compact_state_agg), @@ -1255,7 +1251,7 @@ pub fn interpolated_state_timeline<'a>( } #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn interpolated_int_state_timeline<'a>( - aggregate: Option>, + agg: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, @@ -1267,17 +1263,17 @@ pub fn interpolated_int_state_timeline<'a>( pgx::name!(end_time, TimestampTz), ), > { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() + if let Some(ref agg) = agg { + agg.assert_int() }; - match aggregate { + match agg { None => pgx::error!( "when interpolating data between grouped data, all groups must contain some data" ), - Some(aggregate) => { + Some(agg) => { let interval = crate::datum_utils::interval_to_ms(&start, &interval); TableIterator::new( - state_int_timeline_inner(aggregate.as_compact_state_agg().interpolate( + state_int_timeline_inner(agg.as_compact_state_agg().interpolate( start.into(), interval, prev.map(StateAgg::as_compact_state_agg), @@ -1323,8 +1319,8 @@ fn state_periods_inner<'a>( #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn state_periods<'a>( - state: String, agg: StateAgg<'a>, + state: String, ) -> TableIterator< 'a, ( @@ -1343,8 +1339,8 @@ pub fn state_periods<'a>( name = "state_periods" )] pub fn state_int_periods<'a>( - state: i64, agg: StateAgg<'a>, + state: i64, ) -> TableIterator< 'a, ( @@ -1360,8 +1356,8 @@ pub fn state_int_periods<'a>( } fn interpolated_state_periods_inner<'a>( - state: MaterializedState, aggregate: Option>, + state: MaterializedState, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, @@ -1395,8 +1391,8 @@ fn interpolated_state_periods_inner<'a>( } #[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")] pub fn interpolated_state_periods<'a>( + agg: Option>, state: String, - aggregate: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, @@ -1407,16 +1403,10 @@ pub fn interpolated_state_periods<'a>( pgx::name!(end_time, TimestampTz), ), > { - if let Some(ref aggregate) = aggregate { - aggregate.assert_str() + if let Some(ref agg) = agg { + agg.assert_str() }; - interpolated_state_periods_inner( - MaterializedState::String(state), - aggregate, - start, - interval, - prev, - ) + interpolated_state_periods_inner(agg, MaterializedState::String(state), start, interval, prev) } #[pg_extern( immutable, @@ -1425,8 +1415,8 @@ pub fn interpolated_state_periods<'a>( name = "interpolated_state_periods" )] pub fn interpolated_state_periods_int<'a>( + agg: Option>, state: i64, - aggregate: Option>, start: TimestampTz, interval: crate::raw::Interval, prev: Option>, @@ -1437,12 +1427,12 @@ pub fn interpolated_state_periods_int<'a>( pgx::name!(end_time, TimestampTz), ), > { - if let Some(ref aggregate) = aggregate { - aggregate.assert_int() + if let Some(ref agg) = agg { + agg.assert_int() }; interpolated_state_periods_inner( + agg, MaterializedState::Integer(state), - aggregate, start, interval, prev, @@ -1511,14 +1501,6 @@ struct Record { time: i64, } -fn to_palloc(value: T) -> *const T { - unsafe { - let ptr = pg_sys::palloc(std::mem::size_of::()) as *mut T; - *ptr = value; - ptr - } -} - #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { @@ -1546,7 +1528,7 @@ mod tests { "365 days 00:02:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state), '2020-01-01', '1 day')::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one', '2020-01-01', '1 day')::TEXT FROM test", &str ) ); @@ -1569,7 +1551,7 @@ mod tests { "365 days 00:02:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1577,7 +1559,7 @@ mod tests { "365 days 00:02:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1602,7 +1584,7 @@ mod tests { "00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1610,7 +1592,7 @@ mod tests { "365 days 00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1636,7 +1618,7 @@ mod tests { "365 days 00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1644,7 +1626,7 @@ mod tests { "00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1653,7 +1635,7 @@ mod tests { "365 days 00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1661,7 +1643,7 @@ mod tests { "00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1687,7 +1669,7 @@ mod tests { "365 days 00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1695,7 +1677,7 @@ mod tests { "00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1722,7 +1704,7 @@ mod tests { "00:02:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1730,7 +1712,7 @@ mod tests { "365 days", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1754,7 +1736,7 @@ mod tests { "00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1762,7 +1744,7 @@ mod tests { "365 days 00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1786,7 +1768,7 @@ mod tests { "00:01:00", select_one!( client, - "SELECT toolkit_experimental.duration_in('two', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'two')::TEXT FROM test", &str ) ); @@ -1812,7 +1794,7 @@ insert into test select '2020-01-02 UTC'::timestamptz + make_interval(days=>v), "2 days", select_one!( client, - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state))::TEXT FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one')::TEXT FROM test", &str ) ); @@ -1885,9 +1867,9 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat assert_eq!( client .select( - r#"SELECT toolkit_experimental.duration_in('ERROR', states)::TEXT as error, - toolkit_experimental.duration_in('START', states)::TEXT as start, - toolkit_experimental.duration_in('STOPPED', states)::TEXT as stopped + r#"SELECT toolkit_experimental.duration_in(states, 'ERROR')::TEXT as error, + toolkit_experimental.duration_in(states, 'START')::TEXT as start, + toolkit_experimental.duration_in(states, 'STOPPED')::TEXT as stopped FROM (SELECT toolkit_experimental.compact_state_agg(ts, state) as states FROM test) as foo"#, None, None, @@ -1899,9 +1881,9 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat assert_eq!( client .select( - r#"SELECT toolkit_experimental.duration_in('ERROR', states)::TEXT as error, - toolkit_experimental.duration_in('START', states)::TEXT as start, - toolkit_experimental.duration_in('STOPPED', states)::TEXT as stopped + r#"SELECT toolkit_experimental.duration_in(states, 'ERROR')::TEXT as error, + toolkit_experimental.duration_in(states, 'START')::TEXT as start, + toolkit_experimental.duration_in(states, 'STOPPED')::TEXT as stopped FROM (SELECT toolkit_experimental.state_agg(ts, state) as states FROM test) as foo"#, None, None, @@ -1952,8 +1934,8 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat let mut durations = client.select( r#"SELECT toolkit_experimental.interpolated_duration_in( - 'three', agg, + 'three', '2019-12-31 0:00'::timestamptz + (bucket * '1 day'::interval), '1 day'::interval, LAG(agg) OVER (ORDER BY bucket) )::TEXT FROM ( @@ -1977,8 +1959,8 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat let mut durations = client.select( r#"SELECT toolkit_experimental.interpolated_duration_in( + agg, 'three', - agg, '2019-12-31 0:00'::timestamptz + (bucket * '1 day'::interval), '1 day'::interval, LAG(agg) OVER (ORDER BY bucket) )::TEXT FROM ( @@ -2002,8 +1984,8 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat let mut durations = client.select( r#"SELECT toolkit_experimental.interpolated_duration_in( + agg, 10003, - agg, '2019-12-31 0:00'::timestamptz + (bucket * '1 day'::interval), '1 day'::interval, LAG(agg) OVER (ORDER BY bucket) )::TEXT FROM ( @@ -2040,12 +2022,12 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat None, ); client.select( - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_state_agg(ts, state)) FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.compact_state_agg(ts, state), 'one') FROM test", None, None, ); client.select( - "SELECT toolkit_experimental.duration_in('one', toolkit_experimental.state_agg(ts, state)) FROM test", + "SELECT toolkit_experimental.duration_in(toolkit_experimental.state_agg(ts, state), 'one') FROM test", None, None, ); @@ -2075,8 +2057,8 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat let mut durations = client.select( r#"SELECT toolkit_experimental.interpolated_duration_in( - 'running', - agg, + agg, + 'running', '2019-12-31 0:00'::timestamptz + (bucket * '1 day'::interval), '1 day'::interval, LAG(agg) OVER (ORDER BY bucket) )::TEXT FROM ( @@ -2097,8 +2079,8 @@ SELECT toolkit_experimental.duration_in('one', toolkit_experimental.compact_stat let mut durations = client.select( r#"SELECT toolkit_experimental.interpolated_duration_in( - 'running', - agg, + agg, + 'running', '2019-12-31 0:00'::timestamptz + (bucket * '1 day'::interval), '1 day'::interval, LAG(agg) OVER (ORDER BY bucket) )::TEXT FROM (