Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(substrait): modular substrait producer #13931

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

vbarua
Copy link
Contributor

@vbarua vbarua commented Dec 27, 2024

Which issue does this PR close?

Closes #13901

Rationale for this change

This is the producer equivalent to the consumer changes in #13803

Improves the reusability of the Substrait Producer for users that wish to customize how DataFusion relations and expression are converted into Substrait.

This is especially useful for controlling how UserDefinedLogicalNodes are converted into Substrait.

What changes are included in this PR?

Refactoring

  • Relation and expression handling code has been extracted into a series of from_* functions (i.e. from_projection, from_filter, from_between) to aid re-use.
  • A SubstraitProducer trait has been introduced with default methods that handle relations and expression using the above functions.
  • All conversion methods take as their first argument a &mut impl SubstraitProducer
  • SubstraitPlanningState has been fully removed it is no longer used anywhere.

Code Changes

The conversion of joins has been simplified to no longer require a column offset when converting the join condition. This allowed for the removal of the col_ref_offset argument from all methods that used it, simplifying the API.

Are these changes tested?

These changes refactor existing code and leverage their tests.

A test was added to verify the behaviour when converting joins, as there is a code change there.

Are there any user-facing changes?

  • The top-level to_substrait_plan function now consumes a &SessionState directly, instead of a &dyn SubstraitPlanningState which most users should not notice.
  • Public functions like to_substrait_rel and to_substrait_rex have had their API change, but this should not affect most users.
  • The SubstraitPlanningState has been entirely removed. It's functionality has been superceded by the SubstraitConsumer and SubstraitProducer traits.

@vbarua vbarua force-pushed the vbarua/modular-substrait-producer branch from a8273c0 to 4a464cb Compare December 27, 2024 19:42

fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> {
to_substrait_rel(self, plan)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this is the SubstraitProducer, I used consume as the verb for the API as it consumes DataFusion and produces Substrait.

I though about using produce_plan, produce_projection, etc but found that pattern a little weird reading-wise.

For example does produce_between create a Substrait Between expression (which does not exist), or does it convert a Between expression into a Substrait equivalent. Because DataFusion relations and expressions don't map 1-1 with Substrait, I found it easier to think of this as consuming DataFusion. Just my 2 cents.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that "produce" doesn't make sense here, as it's more logical to think of the functions in terms of processing DF concepts rather than in producing Substrait things. However, the "consume" in producer can be a bit confusing w.r.t "consumer" - would it make sense to use some alternative, like "from" (which is already used for the functions) or "handle", "process", or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually a lint that stops me from using from. I think handle makes sense, I'll switch to that.

self.extensions
}

fn consume_extension(&mut self, plan: &Extension) -> Result<Box<Rel>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following was copied from the existing code for handling LogicalPlan::Extension nodes found later on.

state: &dyn SubstraitPlanningState,
) -> Result<Box<Plan>> {
let mut extensions = Extensions::default();
pub fn to_substrait_plan(plan: &LogicalPlan, state: &SessionState) -> Result<Box<Plan>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The public API stays mostly the same, taking a &SessionState instead of a &dyn SubstraitPlanningState which most users shouldn't notice.

maintain_singular_struct: false,
});
pub fn from_table_scan(
_producer: &mut impl SubstraitProducer,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently isn't used. However, in the future we're likely going to want to use this producer when converting the DataFusion schema into Substrait, especially after the logical type work lands and we can potentially add user-define logical types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely

Indicates to me we shouldn't add it there yet, since there's a risk it won't be used :) And I think it'll be fine to add it later - it'll be an API break, but only for those customizing the usage, and at least it'll be a clear break.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there's a risk it won't be used :)

Looking at this again, the presences of the filter field on the TableScan

pub filters: Vec<Expr>,

means that we can upgrade my "likely" to 100% as we'll need the producer to convert the filter expressions.

} else {
Operator::Eq
};
let join_on = to_substrait_join_expr(producer, &join.on, eq_op, &in_join_schema)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here has changed slightly. to_substrait_join_expr now takes the output schema of the join which makes it easier to process the join condition. More details below.

};
Ok(Box::new(Rel {
rel_type: Some(rel_type),
}))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved up into the DefaultSubstraitProducer

extensions,
)?;
let l = producer.consume_expr(left, join_schema)?;
let r = producer.consume_expr(right, join_schema)?;
Copy link
Contributor Author

@vbarua vbarua Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need to track the column offset explicitly.

The column offset code was added as part of #6135 to handle queries like

SELECT d1.b, d2.c
FROM data d1
JOIN data d2 ON d1.b = d2.e

which caused issue because the left and right inputs both had the same name. This could potentially cause column name references in DataFusion to converted incorrectly into Substrait column indices in some cases. Additionally, there were issues with duplicate schema errors.

However, the introduction and usage of

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For
/// Substrait the names don't matter since it only refers to columns by indices, however DataFusion
/// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names).
fn requalify_sides_if_needed(
which was needed because different tables might have columns with the same name, now means that we can concatenate the left and right schemas of a join without issues. Then if the DataFusion column references are unambiguous, we can simply look up the columns in the join schema to get the correct index.

This was the only place were the column offset was used. Removing this here allowed me to remove the col_ref_offset argument from a number of functions, which IMO simplifies the API substantially.

For further verification, a test has been added for in in roundtrip_logical_plan.rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how the requalify_sides_if_needed (added by me in #11049, just for reference) affects the need for this handling, given it's on the consumer side and this is on the producer. https://github.com/apache/datafusion/pull/6135/files#r1215611954 seems to indicate the re-added test doesn't catch the issue. Does this change affect the produced substrait plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change affect the produced substrait plan?

Checking with the test I added, it does not actually.

Taking a step back as well. When I was updating this code I was curious why the code in #6135 was added, and I may have misunderstood what it was trying to fix.

What I did notice when refactoring this code was that we were already concatenating the schemas and then using that to convert the filter on the join

let in_join_schema = join.left.schema().join(join.right.schema())?;
let join_filter = match &join.filter {
Some(filter) => Some(to_substrait_rex(
state,
filter,
&Arc::new(in_join_schema),
0,
extensions,
)?),
None => None,
};

If it can be used to convert the filter, which itself can contain expressions referencing either side of the join, then it should be possible to use that schema to convert the expressions in the join condition as well. Based on this, I removed the column offset code.

As I understand it, something like

SELECT *
FROM foo
  JOIN foo ON id = id

where foo has an id column, is rejected with

Schema error: Ambiguous reference to unqualified field table_name

If we qualify both side

SELECT *
FROM foo
  JOIN foo ON l.id = r.id

it works fine. If DataFusion rejects queries where the column name references is ambiguous, it should be possible to look up the column in the combined schema generally.

You're work in #11049 made it possible to read plans where both sides of the join had columns with the same name, which would otherwise fail. That probably affected the testing code, but not the producer behaviour.

Expr::Negative(arg) => ("negate", arg),
expr => not_impl_err!("Unsupported expression: {expr:?}")?,
};
to_substrait_unary_scalar_fn(producer, fn_name, arg, schema)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated the handling of unary expression like Not, IsNull, IsNotNull etc into a single function for improved readability.

@vbarua vbarua marked this pull request as ready for review December 27, 2024 21:40
@vbarua
Copy link
Contributor Author

vbarua commented Dec 27, 2024

@Blizzara, @ccciudatu I would appreciate if y'all could take a look when you have an opportunity.

Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks @vbarua! I left some comments or thoughts, but nothing major.

datafusion/substrait/src/logical_plan/producer.rs Outdated Show resolved Hide resolved

fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> {
to_substrait_rel(self, plan)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that "produce" doesn't make sense here, as it's more logical to think of the functions in terms of processing DF concepts rather than in producing Substrait things. However, the "consume" in producer can be a bit confusing w.r.t "consumer" - would it make sense to use some alternative, like "from" (which is already used for the functions) or "handle", "process", or something?


fn consume_scalar_function(
&mut self,
scalar_fn: &expr::ScalarFunction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is to de-conflict with Substrait's ScalarFunction, which is imported? 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is indeed the case.

maintain_singular_struct: false,
});
pub fn from_table_scan(
_producer: &mut impl SubstraitProducer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likely

Indicates to me we shouldn't add it there yet, since there's a risk it won't be used :) And I think it'll be fine to add it later - it'll be an API break, but only for those customizing the usage, and at least it'll be a clear break.

if e.produce_one_row {
return not_impl_err!("Producing a row from empty relation is unsupported");
}
#[allow(deprecated)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think previously it was allowed on even higher level so this is fine, but ooc, what's deprecated in all these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deprecation warnings are for fields on the generated protobufs. For example here it's the values field on the VirtualFields.

My intent with moving the #[allow(deprecated)] to the statement declarations was to more tightly associate it with the code with the deprecated fields.

Comment on lines 1430 to 1432
let substrait_expr = producer.consume_expr(expr.as_ref(), schema)?;
let substrait_low = producer.consume_expr(low.as_ref(), schema)?;
let substrait_high = producer.consume_expr(high.as_ref(), schema)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR and probs better not to change now to keep diff small(er), but I think there's no reason to duplicate these below, they could just happen above the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probs better not to change now to keep diff small(er),

I agree. Do small code improvements also require full issue to be linked to them?

datafusion/substrait/src/logical_plan/producer.rs Outdated Show resolved Hide resolved
datafusion/substrait/src/logical_plan/producer.rs Outdated Show resolved Hide resolved

fn consume_extension(&mut self, plan: &Extension) -> Result<Box<Rel>> {
let extension_bytes = self
.state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the only use for SessionState in the DefaultSubstraitProducer, so presumably it wouldn't need the full state to operate with... But given users have the option of making their own producer if they care, maybe that's fine and better to just have the state here for future needs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this makes me want to only store the SerializerRegistry. If we need the state (or other data) we can always add it in later.

Part of the reason to switch to the producer trait is that we can modify the internal details of the DefaultSubstraitConsumer without it impacting users.

@@ -571,6 +571,21 @@ async fn roundtrip_self_implicit_cross_join() -> Result<()> {
roundtrip("SELECT left.a left_a, left.b, right.a right_a, right.c FROM data AS left, data AS right").await
}

#[tokio::test]
async fn self_join_introduces_aliases() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is adding back this test, right? I seem to have argued back then that it is unnecessary given the roundtrip_self_join test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is adding that test back with the SubqueryAlias, yes. If you think it's redundant with roundtrip_self_join I'm happy to remove it.

@alamb
Copy link
Contributor

alamb commented Dec 30, 2024

This PR is looking nice -- @vbarua and @Blizzara let me know when it is ready for a final review.

@vbarua
Copy link
Contributor Author

vbarua commented Dec 30, 2024

Thanks for the feedback @Blizzara. Went ahead and made changes based on what you suggested, and also answered some questions.

Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM! 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[substrait] customizable producer
3 participants