Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Move hive partitioning/multi-file handling outside of readers #20203

Merged
merged 15 commits into from
Dec 20, 2024
Merged
9 changes: 9 additions & 0 deletions crates/polars-expr/src/expressions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ impl PhysicalExpr for AggregationExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
}

fn is_scalar(&self) -> bool {
true
}
Expand Down Expand Up @@ -757,6 +761,11 @@ impl PhysicalExpr for AggQuantileExpr {
))
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.quantile.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-expr/src/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ impl PhysicalExpr for AliasExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.physical_expr.collect_live_columns(lv);
lv.insert(self.name.clone());
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
Ok(Field::new(
self.name.clone(),
Expand Down
45 changes: 45 additions & 0 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::expressions::{
AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups,
};

#[derive(Clone)]
pub struct ApplyExpr {
inputs: Vec<Arc<dyn PhysicalExpr>>,
function: SpecialEq<Arc<dyn ColumnsUdf>>,
Expand Down Expand Up @@ -426,6 +427,50 @@ impl PhysicalExpr for ApplyExpr {
}
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
for i in &self.inputs {
i.collect_live_columns(lv);
}
}
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if self.collect_groups == ApplyOptions::ElementWise {
let mut new_inputs = Vec::new();
for i in 0..self.inputs.len() {
match self.inputs[i].replace_elementwise_const_columns(const_columns) {
None => continue,
Some(new) => {
new_inputs.reserve(self.inputs.len());
new_inputs.extend(self.inputs[..i].iter().cloned());
new_inputs.push(new);
break;
},
}
}

// Only copy inputs if it is actually needed
if new_inputs.is_empty() {
return None;
}

new_inputs.extend(self.inputs[new_inputs.len()..].iter().map(|i| {
match i.replace_elementwise_const_columns(const_columns) {
None => i.clone(),
Some(new) => new,
}
}));

let mut slf = self.clone();
slf.inputs = new_inputs;
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
26 changes: 26 additions & 0 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::expressions::{
AggState, AggregationContext, PartitionedAggregation, PhysicalExpr, UpdateGroups,
};

#[derive(Clone)]
pub struct BinaryExpr {
left: Arc<dyn PhysicalExpr>,
op: Operator,
Expand Down Expand Up @@ -265,6 +266,31 @@ impl PhysicalExpr for BinaryExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.left.collect_live_columns(lv);
self.right.collect_live_columns(lv);
}
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
let rcc_left = self.left.replace_elementwise_const_columns(const_columns);
let rcc_right = self.right.replace_elementwise_const_columns(const_columns);

if rcc_left.is_some() || rcc_right.is_some() {
let mut slf = self.clone();
if let Some(left) = rcc_left {
slf.left = left;
}
if let Some(right) = rcc_right {
slf.right = right;
}
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl PhysicalExpr for CastExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema).map(|mut fld| {
fld.coerce(self.dtype.clone());
Expand Down
17 changes: 17 additions & 0 deletions crates/polars-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl PhysicalExpr for ColumnExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let out = match self.schema.get_full(&self.name) {
Some((idx, _, _)) => {
Expand Down Expand Up @@ -178,6 +179,22 @@ impl PhysicalExpr for ColumnExpr {
Some(self)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
lv.insert(self.name.clone());
}
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(av) = const_columns.get(&self.name) {
let lv = LiteralValue::from(av.clone());
let le = LiteralExpr::new(lv, self.expr.clone());
return Some(Arc::new(le));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
input_schema.get_field(&self.name).ok_or_else(|| {
polars_err!(
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-expr/src/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl PhysicalExpr for CountExpr {
Ok(AggregationContext::new(c, Cow::Borrowed(groups), true))
}

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
Ok(Field::new(PlSmallStr::from_static(LEN), IDX_DTYPE))
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl PhysicalExpr for FilterExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let s_f = || self.input.evaluate(df, state);
let predicate_f = || self.by.evaluate(df, state);
Expand Down Expand Up @@ -145,6 +146,11 @@ impl PhysicalExpr for FilterExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.by.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl PhysicalExpr for GatherExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let series = self.phys_expr.evaluate(df, state)?;
self.finish(df, state, series)
Expand Down Expand Up @@ -88,6 +89,11 @@ impl PhysicalExpr for GatherExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_expr.collect_live_columns(lv);
self.idx.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.phys_expr.to_field(input_schema)
}
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl PhysicalExpr for LiteralExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.1)
}

fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult<Column> {
self.as_column()
}
Expand Down Expand Up @@ -148,6 +149,8 @@ impl PhysicalExpr for LiteralExpr {
Some(self)
}

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
let dtype = self.0.get_datatype();
Ok(Field::new(PlSmallStr::from_static("literal"), dtype))
Expand Down
20 changes: 18 additions & 2 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,22 @@ pub trait PhysicalExpr: Send + Sync {
None
}

/// Get the variables that are used in the expression i.e. live variables.
/// This can contain duplicates.
fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>);

/// Replace columns that are known to be a constant value with their const value.
///
/// This should not replace values that are calculated non-elementwise e.g. col.max(),
/// col.std(), etc.
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
_ = const_columns;
None
}

/// Can take &dyn Statistics and determine of a file should be
/// read -> `true`
/// or not -> `false`
Expand Down Expand Up @@ -630,8 +646,8 @@ impl PhysicalIoExpr for PhysicalIoHelper {
.map(|c| c.take_materialized_series())
}

fn live_variables(&self) -> Option<Vec<PlSmallStr>> {
Some(expr_to_leaf_column_names(self.expr.as_expression()?))
fn collect_live_columns(&self, live_columns: &mut PlIndexSet<PlSmallStr>) {
self.expr.collect_live_columns(live_columns);
}

#[cfg(feature = "parquet")]
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-expr/src/expressions/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl PhysicalExpr for RollingExpr {
polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation");
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_function.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.function.to_field(input_schema, Context::Default)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ impl PhysicalExpr for SliceExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.offset.collect_live_columns(lv);
self.length.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-expr/src/expressions/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl PhysicalExpr for SortExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let series = self.physical_expr.evaluate(df, state)?;
series.sort_with(self.options)
Expand Down Expand Up @@ -104,6 +105,10 @@ impl PhysicalExpr for SortExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.physical_expr.collect_live_columns(lv);
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.physical_expr.to_field(input_schema)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-expr/src/expressions/sortby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl PhysicalExpr for SortByExpr {
fn as_expression(&self) -> Option<&Expr> {
Some(&self.expr)
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
let series_f = || self.input.evaluate(df, state);
if self.by.is_empty() {
Expand Down Expand Up @@ -374,6 +375,13 @@ impl PhysicalExpr for SortByExpr {
Ok(ac_in)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
for i in &self.by {
i.collect_live_columns(lv);
}
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.input.to_field(input_schema)
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-expr/src/expressions/ternary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ impl PhysicalExpr for TernaryExpr {
Some(self)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.predicate.collect_live_columns(lv);
self.truthy.collect_live_columns(lv);
self.falsy.collect_live_columns(lv);
}

fn is_scalar(&self) -> bool {
self.returns_scalar
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-expr/src/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,16 @@ impl PhysicalExpr for WindowExpr {
false
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
for i in &self.group_by {
i.collect_live_columns(lv);
}
if let Some((i, _)) = &self.order_by {
i.collect_live_columns(lv);
}
self.phys_function.collect_live_columns(lv);
}

#[allow(clippy::ptr_arg)]
fn evaluate_on_groups<'a>(
&self,
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ or set 'streaming'",

pub use options::{ParallelStrategy, ParquetOptions};
use polars_error::{ErrString, PolarsError};
pub use polars_parquet::arrow::read::infer_schema;
pub use polars_parquet::read::FileMetadata;
pub use read_impl::{create_sorting_map, try_set_sorted_flag};
#[cfg(feature = "cloud")]
pub use reader::ParquetAsyncReader;
Expand Down
Loading
Loading