Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy almost all the counter_agg functions for gauge_agg. #394

Merged
merged 1 commit into from
Apr 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 225 additions & 26 deletions extension/src/gauge_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ mod toolkit_experimental {
}

ron_inout_funcs!(GaugeSummary);

// hack to allow us to qualify names with "toolkit_experimental"
// so that pgx generates the correct SQL
pub(crate) use crate::accessors::toolkit_experimental::*;
}

use toolkit_experimental::*;
Expand Down Expand Up @@ -92,7 +96,7 @@ impl GaugeSummaryTransState {
// TODO build method should check validity
// check bounds only after we've combined all the points, so we aren't doing it all the time.
if !summary.bounds_valid() {
panic!("counter bounds invalid")
panic!("Metric bounds invalid")
}
self.summary_buffer.push(summary.build());
}
Expand Down Expand Up @@ -284,7 +288,7 @@ fn gauge_agg_final_inner(
Some(st) => {
// there are some edge cases that this should prevent, but I'm not sure it's necessary, we do check the bounds in the functions that use them.
if !st.bounds_valid() {
panic!("counter bounds invalid")
panic!("Metric bounds invalid")
}
Some(GaugeSummary::from(st))
}
Expand Down Expand Up @@ -362,39 +366,234 @@ extension_sql!(
],
);

#[pg_extern(
name = "delta",
strict,
immutable,
parallel_safe,
schema = "toolkit_experimental"
)]
fn gauge_agg_delta(summary: GaugeSummary) -> f64 {
// TODO Reconsider using the same pg_type for counter and gauge aggregates to avoid duplicating all these functions.

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_delta(sketch: GaugeSummary, _accessor: toolkit_experimental::AccessorDelta) -> f64 {
delta(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn delta(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).delta()
}

#[pg_extern(
name = "idelta_left",
strict,
immutable,
parallel_safe,
schema = "toolkit_experimental"
)]
fn gauge_agg_idelta_left(summary: GaugeSummary) -> f64 {
#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_gauge_agg_rate(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorRate,
) -> Option<f64> {
rate(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn rate(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).rate()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_time_delta(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorTimeDelta,
) -> f64 {
time_delta(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn time_delta(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).time_delta()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_irate_left(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIRateLeft,
) -> Option<f64> {
irate_left(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn irate_left(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).irate_left()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_irate_right(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIRateRight,
) -> Option<f64> {
irate_right(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn irate_right(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).irate_right()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_idelta_left(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIDeltaLeft,
) -> f64 {
idelta_left(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn idelta_left(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).idelta_left()
}

#[pg_extern(
name = "idelta_right",
strict,
immutable,
parallel_safe,
schema = "toolkit_experimental"
)]
fn gauge_agg_idelta_right(summary: GaugeSummary) -> f64 {
#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_idelta_right(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIDeltaRight,
) -> f64 {
idelta_right(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn idelta_right(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).idelta_right()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_with_bounds(
sketch: GaugeSummary,
accessor: toolkit_experimental::AccessorWithBounds,
) -> GaugeSummary<'static> {
let mut builder = GaugeSummaryBuilder::from(MetricSummary::from(sketch));
builder.set_bounds(accessor.bounds());
builder.build().into()
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn with_bounds(summary: GaugeSummary, bounds: tstzrange) -> GaugeSummary {
// TODO dedup with previous by using apply_bounds
unsafe {
let ptr = bounds.0 as *mut pg_sys::varlena;
let mut builder = GaugeSummaryBuilder::from(MetricSummary::from(summary));
builder.set_bounds(get_range(ptr));
builder.build().into()
}
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_extrapolated_delta(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorExtrapolatedDelta,
) -> Option<f64> {
extrapolated_delta(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn extrapolated_delta(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).prometheus_delta().unwrap()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_extrapolated_rate(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorExtrapolatedRate,
) -> Option<f64> {
extrapolated_rate(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn extrapolated_rate(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).prometheus_rate().unwrap()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_num_elements(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorNumElements,
) -> i64 {
num_elements(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn num_elements(summary: GaugeSummary) -> i64 {
MetricSummary::from(summary).stats.n as i64
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_num_changes(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorNumChanges,
) -> i64 {
num_changes(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn num_changes(summary: GaugeSummary) -> i64 {
MetricSummary::from(summary).num_changes as i64
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_slope(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorSlope,
) -> Option<f64> {
slope(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn slope(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).stats.slope()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_intercept(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIntercept,
) -> Option<f64> {
intercept(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn intercept(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).stats.intercept()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_corr(sketch: GaugeSummary, _accessor: toolkit_experimental::AccessorCorr) -> Option<f64> {
corr(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn corr(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).stats.corr()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_zero_time(
sketch: GaugeSummary,
__accessor: toolkit_experimental::AccessorZeroTime,
) -> Option<crate::raw::TimestampTz> {
gauge_zero_time(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn gauge_zero_time(summary: GaugeSummary) -> Option<crate::raw::TimestampTz> {
Some(((MetricSummary::from(summary).stats.x_intercept()? * 1_000_000.0) as i64).into())
}

impl From<GaugeSummary<'_>> for MetricSummary {
fn from(pg: GaugeSummary<'_>) -> Self {
Self {
Expand Down