Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add assert_err and assert_warn #20100

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ impl Display for ErrString {

#[derive(Debug, thiserror::Error)]
pub enum PolarsError {
#[error("{0}")]
AssertionFailed(ErrString),
#[error("not found: {0}")]
ColumnNotFound(ErrString),
#[error("{0}")]
Expand Down Expand Up @@ -207,6 +209,7 @@ impl PolarsError {
pub fn wrap_msg<F: FnOnce(&str) -> String>(&self, func: F) -> Self {
use PolarsError::*;
match self {
AssertionFailed(msg) => AssertionFailed(func(msg).into()),
ColumnNotFound(msg) => ColumnNotFound(func(msg).into()),
ComputeError(msg) => ComputeError(func(msg).into()),
Duplicate(msg) => Duplicate(func(msg).into()),
Expand Down
14 changes: 14 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,20 @@ impl LazyFrame {
})),
)
}

pub fn assert(self, name: Option<PlSmallStr>, predicate: Expr, flags: AssertFlags) -> Self {
LazyFrame::from_logical_plan(
DslPlan::MapFunction {
input: Arc::new(self.logical_plan),
function: DslFunction::Assert {
name,
predicate,
flags,
},
},
self.opt_state,
)
}
}

/// Utility struct for lazy group_by operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub(crate) fn insert_streaming_nodes(
},
// Streamable functions will be converted
lp @ MapFunction { input, function } => {
if function.is_streamable() {
if function.is_streamable(expr_arena) {
state.streamable = true;
state.operators_sinks.push(PipelineNode::Operator(root));
stack.push(StackFrame::new(*input, state, current_idx))
Expand Down
14 changes: 13 additions & 1 deletion crates/polars-mem-engine/src/executors/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use super::*;

pub(crate) struct UdfExec {
pub(crate) input: Box<dyn Executor>,

pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) function: FunctionIR,
}

Expand All @@ -21,6 +23,16 @@ impl Executor for UdfExec {
} else {
Cow::Borrowed("")
};
state.record(|| self.function.evaluate(df), profile_name)
state.record(
|| {
let exprs = self
.exprs
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<PolarsResult<Vec<_>>>()?;
self.function.evaluate(df, &exprs)
},
profile_name,
)
}
}
24 changes: 23 additions & 1 deletion crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,30 @@ fn create_physical_plan_impl(
MapFunction {
input, function, ..
} => {
let exprs = function.get_exprs();
let exprs = if !exprs.is_empty() {
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
let mut state = ExpressionConversionState::new(
POOL.current_num_threads() > exprs.len(),
state.expr_depth,
);
create_physical_expressions_from_irs(
&exprs,
Context::Default,
expr_arena,
&input_schema,
&mut state,
)?
} else {
Vec::new()
};
let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?;
Ok(Box::new(executors::UdfExec { input, function }))

Ok(Box::new(executors::UdfExec {
input,
function,
exprs,
}))
},
ExtContext {
input, contexts, ..
Expand Down
12 changes: 8 additions & 4 deletions crates/polars-pipe/src/executors/operators/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub struct FunctionOperator {

impl FunctionOperator {
pub(crate) fn new(function: FunctionIR) -> Self {
if matches!(function, FunctionIR::Assert { .. }) {
panic!("Assertions are not supported in streaming engine");
}

FunctionOperator {
n_threads: POOL.current_num_threads(),
function,
Expand All @@ -27,9 +31,9 @@ impl FunctionOperator {
}

fn execute_no_expanding(&mut self, chunk: &DataChunk) -> PolarsResult<OperatorResult> {
Ok(OperatorResult::Finished(
chunk.with_data(self.function.evaluate(chunk.data.clone())?),
))
Ok(OperatorResult::Finished(chunk.with_data(
self.function.evaluate(chunk.data.clone(), &[])?,
)))
}

// Combine every two `(offset, len)` pairs so that we double the chunk size
Expand Down Expand Up @@ -74,7 +78,7 @@ impl Operator for FunctionOperator {
}
if let Some((offset, len)) = self.offsets.pop_front() {
let df = chunk.data.slice(offset as i64, len);
let output = self.function.evaluate(df)?;
let output = self.function.evaluate(df, &[])?;
if output.height() * 2 < chunk.data.height()
&& output.height() * 2 < chunk_size_ambition
{
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
return run_conversion(lp, ctxt, "stats");
},
_ => {
let function = function.into_function_ir(&input_schema)?;
let function = function.into_function_ir(&input_schema, ctxt.expr_arena)?;
IR::MapFunction { input, function }
},
}
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-plan/src/plans/expr_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl Borrow<Node> for ExprIR {

impl ExprIR {
pub fn new(node: Node, output_name: OutputName) -> Self {
debug_assert!(!output_name.is_none());
ExprIR { output_name, node }
}

Expand Down Expand Up @@ -151,8 +150,7 @@ impl ExprIR {
self.node = node;
}

#[cfg(feature = "cse")]
pub(crate) fn set_alias(&mut self, name: PlSmallStr) {
pub fn set_alias(&mut self, name: PlSmallStr) {
self.output_name = OutputName::Alias(name)
}

Expand Down
25 changes: 24 additions & 1 deletion crates/polars-plan/src/plans/functions/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ pub enum DslFunction {
/// FillValue
FillNan(Expr),
Drop(DropFunction),
Assert {
name: Option<PlSmallStr>,
predicate: Expr,
flags: AssertFlags,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -94,7 +99,11 @@ pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>
}

impl DslFunction {
pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {
pub(crate) fn into_function_ir(
self,
input_schema: &Schema,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<FunctionIR> {
let function = match self {
#[cfg(feature = "pivot")]
DslFunction::Unpivot { args } => {
Expand Down Expand Up @@ -144,6 +153,20 @@ impl DslFunction {
},
#[cfg(feature = "python")]
DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),
DslFunction::Assert {
name,
predicate,
flags,
} => {
let predicate = to_expr_ir_ignore_alias(predicate, expr_arena)?;
let expr_format = predicate.display(expr_arena).to_string().into();
FunctionIR::Assert {
name,
predicate: AexprNode::new(predicate.node()),
flags,
expr_format,
}
},
DslFunction::Stats(_)
| DslFunction::FillNan(_)
| DslFunction::Drop(_)
Expand Down
Loading
Loading