Skip to content

Commit

Permalink
Merge #597
Browse files Browse the repository at this point in the history
597: Revert now unneeded pgx workaround r=Smittyvb a=Smittyvb

We don't need copy everything into memory owned by Rust anymore, since pgx 0.5.4 fixed the bug that made doing that necessary (pgcentralfoundation/pgrx#784).

This reverts commit fe8b386.

I didn't add anything to the changelog since this an internal change that doesn't affect end users.

Co-authored-by: Smitty <smitty@timescale.com>
  • Loading branch information
bors[bot] and syvb authored Oct 31, 2022
2 parents 62581ba + 723e9aa commit d432bec
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 93 deletions.
109 changes: 41 additions & 68 deletions extension/src/frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,11 +976,11 @@ extension_sql!(
name = "into_values",
schema = "toolkit_experimental"
)]
pub fn freq_iter<'a, 'b>(
pub fn freq_iter<'a>(
agg: SpaceSavingAggregate<'a>,
ty: AnyElement,
) -> TableIterator<
'b,
'a,
(
name!(value, AnyElement),
name!(min_freq, f64),
Expand All @@ -992,22 +992,15 @@ pub fn freq_iter<'a, 'b>(
pgx::error!("mischatched types")
}
let counts = agg.counts.slice().iter().zip(agg.overcounts.slice().iter());
TableIterator::new(
agg.datums
.clone()
.into_iter()
.zip(counts)
.map_while(move |(value, (&count, &overcount))| {
let total = agg.values_seen as f64;
let value =
AnyElement::from_polymorphic_datum(value, false, agg.type_oid).unwrap();
let min_freq = (count - overcount) as f64 / total;
let max_freq = count as f64 / total;
Some((value, min_freq, max_freq))
})
.collect::<Vec<_>>()
.into_iter(),
)
TableIterator::new(agg.datums.clone().into_iter().zip(counts).map_while(
move |(value, (&count, &overcount))| {
let total = agg.values_seen as f64;
let value = AnyElement::from_polymorphic_datum(value, false, agg.type_oid).unwrap();
let min_freq = (count - overcount) as f64 / total;
let max_freq = count as f64 / total;
Some((value, min_freq, max_freq))
},
))
}
}

Expand All @@ -1017,31 +1010,25 @@ pub fn freq_iter<'a, 'b>(
name = "into_values",
schema = "toolkit_experimental"
)]
pub fn freq_bigint_iter<'a, 'b>(
pub fn freq_bigint_iter<'a>(
agg: SpaceSavingBigIntAggregate<'a>,
) -> TableIterator<
'b,
'a,
(
name!(value, i64),
name!(min_freq, f64),
name!(max_freq, f64),
),
> {
let counts = agg.counts.slice().iter().zip(agg.overcounts.slice().iter());
TableIterator::new(
agg.datums
.clone()
.into_iter()
.zip(counts)
.map_while(move |(value, (&count, &overcount))| {
let total = agg.values_seen as f64;
let min_freq = (count - overcount) as f64 / total;
let max_freq = count as f64 / total;
Some((value, min_freq, max_freq))
})
.collect::<Vec<_>>()
.into_iter(),
)
TableIterator::new(agg.datums.clone().into_iter().zip(counts).map_while(
move |(value, (&count, &overcount))| {
let total = agg.values_seen as f64;
let min_freq = (count - overcount) as f64 / total;
let max_freq = count as f64 / total;
Some((value, min_freq, max_freq))
},
))
}

#[pg_extern(
Expand All @@ -1050,32 +1037,26 @@ pub fn freq_bigint_iter<'a, 'b>(
name = "into_values",
schema = "toolkit_experimental"
)]
pub fn freq_text_iter<'a, 'b>(
pub fn freq_text_iter<'a>(
agg: SpaceSavingTextAggregate<'a>,
) -> TableIterator<
'b,
'a,
(
name!(value, String),
name!(min_freq, f64),
name!(max_freq, f64),
),
> {
let counts = agg.counts.slice().iter().zip(agg.overcounts.slice().iter());
TableIterator::new(
agg.datums
.clone()
.into_iter()
.zip(counts)
.map_while(move |(value, (&count, &overcount))| {
let total = agg.values_seen as f64;
let data = unsafe { varlena_to_string(value.cast_mut_ptr()) };
let min_freq = (count - overcount) as f64 / total;
let max_freq = count as f64 / total;
Some((data, min_freq, max_freq))
})
.collect::<Vec<_>>()
.into_iter(),
)
TableIterator::new(agg.datums.clone().into_iter().zip(counts).map_while(
move |(value, (&count, &overcount))| {
let total = agg.values_seen as f64;
let data = unsafe { varlena_to_string(value.cast_mut_ptr()) };
let min_freq = (count - overcount) as f64 / total;
let max_freq = count as f64 / total;
Some((data, min_freq, max_freq))
},
))
}

fn validate_topn_for_topn_agg(
Expand Down Expand Up @@ -1134,9 +1115,7 @@ pub fn topn(agg: SpaceSavingAggregate<'_>, n: i32, ty: AnyElement) -> SetOfItera
// TODO Shouldn't failure to convert to AnyElement cause error, not early stop?
.map_while(move |value| unsafe {
AnyElement::from_polymorphic_datum(value, false, type_oid)
})
.collect::<Vec<_>>()
.into_iter(),
}),
)
}

Expand Down Expand Up @@ -1170,17 +1149,13 @@ pub fn topn_bigint(agg: SpaceSavingBigIntAggregate<'_>, n: i32) -> SetOfIterator
);
let min_freq = if agg.topn == 0 { agg.freq_param } else { 0. };

SetOfIterator::new(
TopNIterator::new(
agg.datums.clone().into_iter(),
agg.counts.clone().into_vec(),
agg.values_seen as f64,
n,
min_freq,
)
.collect::<Vec<_>>()
.into_iter(),
)
SetOfIterator::new(TopNIterator::new(
agg.datums.clone().into_iter(),
agg.counts.clone().into_vec(),
agg.values_seen as f64,
n,
min_freq,
))
}

#[pg_extern(
Expand Down Expand Up @@ -1221,9 +1196,7 @@ pub fn topn_text(agg: SpaceSavingTextAggregate<'_>, n: i32) -> SetOfIterator<Str
n,
min_freq,
)
.map(|value| unsafe { varlena_to_string(value.cast_mut_ptr()) })
.collect::<Vec<_>>()
.into_iter(),
.map(|value| unsafe { varlena_to_string(value.cast_mut_ptr()) }),
)
}

Expand Down
21 changes: 7 additions & 14 deletions extension/src/state_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,22 +374,15 @@ pub fn interpolated_duration_in<'a>(
}

#[pg_extern(immutable, parallel_safe, schema = "toolkit_experimental")]
pub fn into_values<'a, 'b>(
pub fn into_values<'a>(
agg: StateAgg<'a>,
) -> TableIterator<'b, (pgx::name!(state, String), pgx::name!(duration, i64))> {
) -> TableIterator<'a, (pgx::name!(state, String), pgx::name!(duration, i64))> {
let states: String = agg.states_as_str().to_owned();
TableIterator::new(
agg.durations
.clone()
.into_iter()
.map(move |record| {
let beg = record.state_beg as usize;
let end = record.state_end as usize;
(states[beg..end].to_owned(), record.duration)
})
.collect::<Vec<_>>()
.into_iter(),
)
TableIterator::new(agg.durations.clone().into_iter().map(move |record| {
let beg = record.state_beg as usize;
let end = record.state_end as usize;
(states[beg..end].to_owned(), record.duration)
}))
}

#[derive(Clone, Debug, Deserialize, Eq, FlatSerializable, PartialEq, Serialize)]
Expand Down
8 changes: 3 additions & 5 deletions extension/src/time_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,13 @@ pub static TIMEVECTOR_OID: once_cell::sync::Lazy<pg_sys::Oid> =
once_cell::sync::Lazy::new(Timevector_TSTZ_F64::type_oid);

#[pg_extern(immutable, parallel_safe)]
pub fn unnest<'a, 'b>(
pub fn unnest<'a>(
series: Timevector_TSTZ_F64<'a>,
) -> TableIterator<'b, (name!(time, crate::raw::TimestampTz), name!(value, f64))> {
) -> TableIterator<'a, (name!(time, crate::raw::TimestampTz), name!(value, f64))> {
TableIterator::new(
series
.into_iter()
.map(|points| (points.ts.into(), points.val))
.collect::<Vec<_>>()
.into_iter(),
.map(|points| (points.ts.into(), points.val)),
)
}

Expand Down
4 changes: 1 addition & 3 deletions extension/src/time_vector/pipeline/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ pub fn trace_lambda<'a>(
SetOfIterator::new(
trace
.into_iter()
.map(move |(e, v)| format!("{:>width$}: {:?}", e, v, width = col1_size))
.collect::<Vec<_>>()
.into_iter(),
.map(move |(e, v)| format!("{:>width$}: {:?}", e, v, width = col1_size)),
)
}

Expand Down
4 changes: 1 addition & 3 deletions extension/src/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ pub fn generate_periodic_normal_series(
* periodic_magnitude;
let error = distribution.sample(&mut rng);
(time.into(), base + error)
})
.collect::<Vec<_>>()
.into_iter(),
}),
)
}

Expand Down

0 comments on commit d432bec

Please sign in to comment.