Skip to content

Commit

Permalink
Add tests with Projection Pushdown enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 14, 2024
1 parent c7f0598 commit 481a0dd
Showing 1 changed file with 149 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,23 @@ mod tests {
)
}

/// Helper that creates an optimizer with the SplitExprByStatefulUDF rule registered, optimizes
/// the provided plan with said optimizer, and compares the optimized plan with
/// the provided expected plan.
fn assert_optimized_plan_eq_with_projection_pushdown(
plan: Arc<LogicalPlan>,
expected: Arc<LogicalPlan>,
) -> DaftResult<()> {
assert_optimized_plan_with_rules_eq(
plan,
expected,
vec![
Box::new(SplitActorPoolProjects {}),
Box::new(PushDownProjection::new()),
],
)
}

#[cfg(not(feature = "python"))]
fn create_stateful_udf(inputs: Vec<ExprRef>) -> ExprRef {
Expr::Function {
Expand Down Expand Up @@ -483,6 +500,28 @@ mod tests {
.arced();
assert_optimized_plan_eq(project_plan.clone(), expected.clone())?;

// With Projection Pushdown, elide intermediate Projects and also perform column pushdown
let expected = scan_plan.select(vec![col("a").alias("a")])?.build();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"),
stateful_project_expr.clone().alias(factored_column_name),
],
NUM_ACTORS,
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(
expected,
vec![
col("a"),
col(factored_column_name).alias("b"),
col(factored_column_name).alias("c"),
],
)?)
.arced();
assert_optimized_plan_eq_with_projection_pushdown(project_plan, expected)?;

Ok(())
}

Expand All @@ -499,12 +538,6 @@ mod tests {
.with_columns(vec![stacked_stateful_project_expr.clone().alias("b")], None)?
.build();

// Project([col("a")])
// --> ActorPoolProject([col("a"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")])
// --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")])
// --> Project([col("a"), col("__SplitExprByStatefulUDF_0-0_stateful_child__")])
// --> ActorPoolProject([col("a"), foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__")).alias("b")])
// --> Project([col("a"), col("b")])
let intermediate_name = "__SplitExprByStatefulUDF_0-0_stateful_child__";
let expected = scan_plan.select(vec![col("a")])?.build();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
Expand Down Expand Up @@ -542,8 +575,38 @@ mod tests {
.arced();
let expected =
LogicalPlan::Project(Project::try_new(expected, vec![col("a"), col("b")])?).arced();
assert_optimized_plan_eq(project_plan.clone(), expected.clone())?;

assert_optimized_plan_eq(project_plan, expected)?;
// With Projection Pushdown, elide intermediate Projects and also perform column pushdown
let expected = scan_plan.build();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name),
],
NUM_ACTORS,
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(
expected,
vec![col(intermediate_name), col("a")],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"),
create_stateful_udf(vec![col(intermediate_name)])
.clone()
.alias("b"),
],
NUM_ACTORS,
)?)
.arced();
assert_optimized_plan_eq_with_projection_pushdown(project_plan, expected)?;
Ok(())
}

Expand All @@ -565,13 +628,6 @@ mod tests {
.select(vec![stacked_stateful_project_expr.clone().alias("c")])?
.build();

// Project([col("a"), col("b")])
// --> ActorPoolProject([col("b"), foo(col("a")).alias("__SplitExprByStatefulUDF_0-0_stateful_child__")])
// --> ActorPoolProject([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), foo(col("b")).alias("__SplitExprByStatefulUDF_0-1_stateful_child__")])
// --> Project([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")])
// --> Project([col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")])
// --> ActorPoolProject([foo(col("__SplitExprByStatefulUDF_0-0_stateful_child__"), col("__SplitExprByStatefulUDF_0-1_stateful_child__")).alias("c")])
// --> Project([col("c")])
let intermediate_name_0 = "__SplitExprByStatefulUDF_0-0_stateful_child__";
let intermediate_name_1 = "__SplitExprByStatefulUDF_0-1_stateful_child__";
let expected = scan_plan.select(vec![col("a"), col("b")])?.build();
Expand Down Expand Up @@ -623,8 +679,44 @@ mod tests {
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced();
assert_optimized_plan_eq(project_plan.clone(), expected.clone())?;

assert_optimized_plan_eq(project_plan, expected)?;
// With Projection Pushdown, elide intermediate Projects and also perform column pushdown
let expected = scan_plan.build();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"), // TODO: This should be able to be pruned as well, but it seems Projection Pushdown isn't working as intended
col("b"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name_0),
],
NUM_ACTORS,
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col(intermediate_name_0),
create_stateful_udf(vec![col("b")])
.clone()
.alias(intermediate_name_1),
],
NUM_ACTORS,
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
create_stateful_udf(vec![col(intermediate_name_0), col(intermediate_name_1)])
.clone()
.alias("c"),
],
NUM_ACTORS,
)?)
.arced();
assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?;
Ok(())
}

Expand Down Expand Up @@ -709,8 +801,49 @@ mod tests {
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(expected, vec![col("c")])?).arced();
assert_optimized_plan_eq(project_plan.clone(), expected.clone())?;

assert_optimized_plan_eq(project_plan, expected)?;
// With Projection Pushdown, elide intermediate Projects and also perform column pushdown
let expected = scan_plan.build();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col("a"), // TODO: This should be pruned by Projection Pushdown, but isn't for some reason
col("b"),
create_stateful_udf(vec![col("a")])
.clone()
.alias(intermediate_name_0),
],
NUM_ACTORS,
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![
col(intermediate_name_0),
create_stateful_udf(vec![col("b")])
.clone()
.alias(intermediate_name_1),
],
NUM_ACTORS,
)?)
.arced();
let expected = LogicalPlan::Project(Project::try_new(
expected,
vec![col(intermediate_name_0)
.add(col(intermediate_name_1))
.alias(intermediate_name_2)],
)?)
.arced();
let expected = LogicalPlan::ActorPoolProject(ActorPoolProject::try_new(
expected,
vec![create_stateful_udf(vec![col(intermediate_name_2)])
.clone()
.alias("c")],
NUM_ACTORS,
)?)
.arced();
assert_optimized_plan_eq_with_projection_pushdown(project_plan.clone(), expected.clone())?;
Ok(())
}
}

0 comments on commit 481a0dd

Please sign in to comment.