Skip to content

Commit

Permalink
Merge #394
Browse files Browse the repository at this point in the history
394: Copy almost all the counter_agg functions for gauge_agg. r=epgts a=epgts

Leave TODO about deduplicating it all.

Co-authored-by: Eric Gillespie <epg@timescale.com>
  • Loading branch information
bors[bot] and epgts authored Apr 25, 2022
2 parents 3c3e9c0 + 7305373 commit ab8dca0
Showing 1 changed file with 225 additions and 26 deletions.
251 changes: 225 additions & 26 deletions extension/src/gauge_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ mod toolkit_experimental {
}

ron_inout_funcs!(GaugeSummary);

// hack to allow us to qualify names with "toolkit_experimental"
// so that pgx generates the correct SQL
pub(crate) use crate::accessors::toolkit_experimental::*;
}

use toolkit_experimental::*;
Expand Down Expand Up @@ -92,7 +96,7 @@ impl GaugeSummaryTransState {
// TODO build method should check validity
// check bounds only after we've combined all the points, so we aren't doing it all the time.
if !summary.bounds_valid() {
panic!("counter bounds invalid")
panic!("Metric bounds invalid")
}
self.summary_buffer.push(summary.build());
}
Expand Down Expand Up @@ -284,7 +288,7 @@ fn gauge_agg_final_inner(
Some(st) => {
// there are some edge cases that this should prevent, but I'm not sure it's necessary, we do check the bounds in the functions that use them.
if !st.bounds_valid() {
panic!("counter bounds invalid")
panic!("Metric bounds invalid")
}
Some(GaugeSummary::from(st))
}
Expand Down Expand Up @@ -362,39 +366,234 @@ extension_sql!(
],
);

#[pg_extern(
name = "delta",
strict,
immutable,
parallel_safe,
schema = "toolkit_experimental"
)]
fn gauge_agg_delta(summary: GaugeSummary) -> f64 {
// TODO Reconsider using the same pg_type for counter and gauge aggregates to avoid duplicating all these functions.

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_delta(sketch: GaugeSummary, _accessor: toolkit_experimental::AccessorDelta) -> f64 {
delta(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn delta(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).delta()
}

#[pg_extern(
name = "idelta_left",
strict,
immutable,
parallel_safe,
schema = "toolkit_experimental"
)]
fn gauge_agg_idelta_left(summary: GaugeSummary) -> f64 {
#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_gauge_agg_rate(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorRate,
) -> Option<f64> {
rate(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn rate(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).rate()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_time_delta(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorTimeDelta,
) -> f64 {
time_delta(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn time_delta(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).time_delta()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_irate_left(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIRateLeft,
) -> Option<f64> {
irate_left(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn irate_left(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).irate_left()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_irate_right(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIRateRight,
) -> Option<f64> {
irate_right(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn irate_right(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).irate_right()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_idelta_left(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIDeltaLeft,
) -> f64 {
idelta_left(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn idelta_left(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).idelta_left()
}

#[pg_extern(
name = "idelta_right",
strict,
immutable,
parallel_safe,
schema = "toolkit_experimental"
)]
fn gauge_agg_idelta_right(summary: GaugeSummary) -> f64 {
#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_idelta_right(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIDeltaRight,
) -> f64 {
idelta_right(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn idelta_right(summary: GaugeSummary) -> f64 {
MetricSummary::from(summary).idelta_right()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_with_bounds(
sketch: GaugeSummary,
accessor: toolkit_experimental::AccessorWithBounds,
) -> GaugeSummary<'static> {
let mut builder = GaugeSummaryBuilder::from(MetricSummary::from(sketch));
builder.set_bounds(accessor.bounds());
builder.build().into()
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn with_bounds(summary: GaugeSummary, bounds: tstzrange) -> GaugeSummary {
// TODO dedup with previous by using apply_bounds
unsafe {
let ptr = bounds.0 as *mut pg_sys::varlena;
let mut builder = GaugeSummaryBuilder::from(MetricSummary::from(summary));
builder.set_bounds(get_range(ptr));
builder.build().into()
}
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_extrapolated_delta(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorExtrapolatedDelta,
) -> Option<f64> {
extrapolated_delta(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn extrapolated_delta(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).prometheus_delta().unwrap()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_extrapolated_rate(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorExtrapolatedRate,
) -> Option<f64> {
extrapolated_rate(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn extrapolated_rate(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).prometheus_rate().unwrap()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_num_elements(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorNumElements,
) -> i64 {
num_elements(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn num_elements(summary: GaugeSummary) -> i64 {
MetricSummary::from(summary).stats.n as i64
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_num_changes(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorNumChanges,
) -> i64 {
num_changes(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn num_changes(summary: GaugeSummary) -> i64 {
MetricSummary::from(summary).num_changes as i64
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_slope(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorSlope,
) -> Option<f64> {
slope(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn slope(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).stats.slope()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_intercept(
sketch: GaugeSummary,
_accessor: toolkit_experimental::AccessorIntercept,
) -> Option<f64> {
intercept(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn intercept(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).stats.intercept()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_corr(sketch: GaugeSummary, _accessor: toolkit_experimental::AccessorCorr) -> Option<f64> {
corr(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn corr(summary: GaugeSummary) -> Option<f64> {
MetricSummary::from(summary).stats.corr()
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
fn arrow_zero_time(
sketch: GaugeSummary,
__accessor: toolkit_experimental::AccessorZeroTime,
) -> Option<crate::raw::TimestampTz> {
gauge_zero_time(sketch)
}

#[pg_extern(strict, immutable, parallel_safe, schema = "toolkit_experimental")]
fn gauge_zero_time(summary: GaugeSummary) -> Option<crate::raw::TimestampTz> {
Some(((MetricSummary::from(summary).stats.x_intercept()? * 1_000_000.0) as i64).into())
}

impl From<GaugeSummary<'_>> for MetricSummary {
fn from(pg: GaugeSummary<'_>) -> Self {
Self {
Expand Down

0 comments on commit ab8dca0

Please sign in to comment.