diff --git a/extension/src/gauge_agg.rs b/extension/src/gauge_agg.rs index 99d6b9aa7..dc24c4828 100644 --- a/extension/src/gauge_agg.rs +++ b/extension/src/gauge_agg.rs @@ -45,6 +45,10 @@ mod toolkit_experimental { } ron_inout_funcs!(GaugeSummary); + + // hack to allow us to qualify names with "toolkit_experimental" + // so that pgx generates the correct SQL + pub(crate) use crate::accessors::toolkit_experimental::*; } use toolkit_experimental::*; @@ -92,7 +96,7 @@ impl GaugeSummaryTransState { // TODO build method should check validity // check bounds only after we've combined all the points, so we aren't doing it all the time. if !summary.bounds_valid() { - panic!("counter bounds invalid") + panic!("Metric bounds invalid") } self.summary_buffer.push(summary.build()); } @@ -284,7 +288,7 @@ fn gauge_agg_final_inner( Some(st) => { // there are some edge cases that this should prevent, but I'm not sure it's necessary, we do check the bounds in the functions that use them. if !st.bounds_valid() { - panic!("counter bounds invalid") + panic!("Metric bounds invalid") } Some(GaugeSummary::from(st)) } @@ -362,39 +366,234 @@ extension_sql!( ], ); -#[pg_extern( - name = "delta", - strict, - immutable, - parallel_safe, - schema = "toolkit_experimental" -)] -fn gauge_agg_delta(summary: GaugeSummary) -> f64 { +// TODO Reconsider using the same pg_type for counter and gauge aggregates to avoid duplicating all these functions. + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_delta(sketch: GaugeSummary, _accessor: toolkit_experimental::AccessorDelta) -> f64 { + delta(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn delta(summary: GaugeSummary) -> f64 { MetricSummary::from(summary).delta() } -#[pg_extern( - name = "idelta_left", - strict, - immutable, - parallel_safe, - schema = "toolkit_experimental" -)] -fn gauge_agg_idelta_left(summary: GaugeSummary) -> f64 { +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_gauge_agg_rate( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorRate, +) -> Option { + rate(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn rate(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).rate() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_time_delta( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorTimeDelta, +) -> f64 { + time_delta(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn time_delta(summary: GaugeSummary) -> f64 { + MetricSummary::from(summary).time_delta() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_irate_left( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorIRateLeft, +) -> Option { + irate_left(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn irate_left(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).irate_left() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_irate_right( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorIRateRight, +) -> Option { + irate_right(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn irate_right(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).irate_right() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_idelta_left( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorIDeltaLeft, +) -> f64 { + idelta_left(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn idelta_left(summary: GaugeSummary) -> f64 { MetricSummary::from(summary).idelta_left() } -#[pg_extern( - name = "idelta_right", - strict, - immutable, - parallel_safe, - schema = "toolkit_experimental" -)] -fn gauge_agg_idelta_right(summary: GaugeSummary) -> f64 { +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_idelta_right( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorIDeltaRight, +) -> f64 { + idelta_right(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn idelta_right(summary: GaugeSummary) -> f64 { MetricSummary::from(summary).idelta_right() } +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_with_bounds( + sketch: GaugeSummary, + accessor: toolkit_experimental::AccessorWithBounds, +) -> GaugeSummary<'static> { + let mut builder = GaugeSummaryBuilder::from(MetricSummary::from(sketch)); + builder.set_bounds(accessor.bounds()); + builder.build().into() +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn with_bounds(summary: GaugeSummary, bounds: tstzrange) -> GaugeSummary { + // TODO dedup with previous by using apply_bounds + unsafe { + let ptr = bounds.0 as *mut pg_sys::varlena; + let mut builder = GaugeSummaryBuilder::from(MetricSummary::from(summary)); + builder.set_bounds(get_range(ptr)); + builder.build().into() + } +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_extrapolated_delta( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorExtrapolatedDelta, +) -> Option { + extrapolated_delta(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn extrapolated_delta(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).prometheus_delta().unwrap() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_extrapolated_rate( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorExtrapolatedRate, +) -> Option { + extrapolated_rate(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn extrapolated_rate(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).prometheus_rate().unwrap() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_num_elements( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorNumElements, +) -> i64 { + num_elements(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn num_elements(summary: GaugeSummary) -> i64 { + MetricSummary::from(summary).stats.n as i64 +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_num_changes( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorNumChanges, +) -> i64 { + num_changes(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn num_changes(summary: GaugeSummary) -> i64 { + MetricSummary::from(summary).num_changes as i64 +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_slope( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorSlope, +) -> Option { + slope(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn slope(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).stats.slope() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_intercept( + sketch: GaugeSummary, + _accessor: toolkit_experimental::AccessorIntercept, +) -> Option { + intercept(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn intercept(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).stats.intercept() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_corr(sketch: GaugeSummary, _accessor: toolkit_experimental::AccessorCorr) -> Option { + corr(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn corr(summary: GaugeSummary) -> Option { + MetricSummary::from(summary).stats.corr() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +fn arrow_zero_time( + sketch: GaugeSummary, + __accessor: toolkit_experimental::AccessorZeroTime, +) -> Option { + gauge_zero_time(sketch) +} + +#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")] +fn gauge_zero_time(summary: GaugeSummary) -> Option { + Some(((MetricSummary::from(summary).stats.x_intercept()? * 1_000_000.0) as i64).into()) +} + impl From> for MetricSummary { fn from(pg: GaugeSummary<'_>) -> Self { Self {