Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ProjectionMask To Allow Nested Projection Pushdown #2581

Open
tustvold opened this issue May 21, 2022 · 17 comments
Open

Introduce ProjectionMask To Allow Nested Projection Pushdown #2581

tustvold opened this issue May 21, 2022 · 17 comments
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently projection indices are pushed down to scans as Vec<usize>. This creates some ambiguities:

To demonstrate how these problems intertwine, consider the case of

Struct {
   first: Struct {
      a: Integer,
      b: Integer,
   },
   second: Struct {
      c: Integer
   }
}

If I project ["first.a", "second.c", "first.b"] what is the resulting schema?

Describe the solution you'd like

I would like to propose we instead pushdown a leaf column mask, where leaf columns are fields with no children, as enumerated by a depth-first-scan of the schema tree. This avoids any ordering ambiguities, whilst also being relatively straightforward to implement and interpret.

I recently introduced a similar concept to the parquet reader apache/arrow-rs#1716. We could theoretically lift this into arrow-rs, potentially adding support to RecordBatch for it, and then use this in DataFusion.

Describe alternatives you've considered

We could not support nested pushdown

Additional context

Currently pushdown for nested types in ParquetExec is broken - #2453

Thoughts @andygrove @alamb

@tustvold tustvold added the enhancement New feature or request label May 21, 2022
@alamb
Copy link
Contributor

alamb commented May 23, 2022

I like the idea of a single Projection structure that understand how to select nested types.

We could theoretically lift this into arrow-rs

As long as we can maintain independence for parquet (aka one can select subsets of parquet files without having to use Arrow) that is great.

If we don't get the Projection into arrow-rs in time, we could also copy/paste the Projection code into DataFusion and provide a conversion back and forth during the interim period

@kesavkolla
Copy link

It gets little more indepth if the struct has members which are list of struct again. How will the schema pushdown will happen to them? Eg:

Struct Employee {
  name: String,
  departments: Vec<Department>,
}
Struct Department {
    id: u32,
    name: String,
}

How will the proection appear for this?

@tustvold
Copy link
Contributor Author

How will the projection appear for this?

In this case you have roots of

  1. name
  2. departments

And leaves of

  1. name
  2. department.id
  3. department.name

So if say you projected with leaves [1, 3] you would get a schema of

Struct Employee {
  name: String,
  departments: Vec<ProjectedDepartment>
}
struct ProjectedDepartment {
  name: String
}

Or in terms of the arrow datatype

DataType::Struct(vec![
  Field("name", DataType::Utf8, <nullable>),
  Field("departments", DataType::List(Box::new(
    Field::new("element", DataType::Struct(vec![
      Field::new("name", DataType::Utf8, <nullable>)
    ]), <nullable>)
  )), <nullable>)
])

Does that make sense?

@kesavkolla
Copy link

My question is will the projection goes to nested levels?

Eg: employee.departments[*].name, employee.departments[0].name

@tustvold
Copy link
Contributor Author

tustvold commented May 25, 2022

At least in the case of arrow and parquet, list indexing is more of a filter than a cheap projection - it requires rewriting the buffers.

Perhaps we could do the common case as described here, and potentially add list index pushdown as an extension once we have workloads that stand to benefit?? Or did you have a particular suggestion on how to handle it?

@kesavkolla
Copy link

I agree with your thought process. List indexing is not push down as much as the column itself. I am guessing at some point datafusion will have support at the SQL level for the list indexing.

@nl5887
Copy link
Contributor

nl5887 commented Jun 13, 2022

@tustvold is this something you're working on already?

@nl5887
Copy link
Contributor

nl5887 commented Jun 13, 2022

This PR is slightly related, as predicates aren't being pushed down currently:

#2724

@tustvold
Copy link
Contributor Author

@nl5887 I am not currently working on this, but would be happy to assist if you or someone else wanted to pick it up 😀

@nl5887
Copy link
Contributor

nl5887 commented Jun 24, 2022

@tustvold I think quite a lot needs to be changed. Most of the code will do column selection by name, whereas the relevant data of the sql parsing (the indexed field structure) is lost.

Correct me if I'm wrong, but I think the datafusion core column needs to be converted to an enum consisting of Column and IndexedField. The retrieval from the DFSchema needs to be done using the column itself instead of (qualified) name, and the required_columns shouldn't be derived from the output schema, but from the plan itself.

Probably a lot more needs to be done, but this is necessary to be able to be able to push down the projections.

Looking forward to your thoughts!

@tustvold
Copy link
Contributor Author

but I think the datafusion core column needs to be converted to an enum consisting of Column and IndexedField

As described above, list index pushdown is likely to yield limited benefits for any of the formats we currently support. As such I don't think we need to support it in projection pushdown as a first step.

whereas the relevant data of the sql parsing (the indexed field structure) is lost.

I'm not sure I follow what you mean, I would have expected list indexing to just be a different type of PhysicalExpr without the need to leak into the DFSchema at all?

@gatesn
Copy link

gatesn commented Jan 2, 2025

Is there a reason that the projection would be expressed as a mask over the nested fields, instead of perhaps being an arbitrary scalar expression?

I realize most table providers wouldn't support projection push-down of arbitrary expressions (although I hope to in Vortex: spiraldb/vortex#1782), but if a provider declared support for Column and GetItem expressions then they should be able to map that themselves into a field mask.

I wonder if Parquet could also relatively easily support push-down of some Cast exprs?

@tustvold
Copy link
Contributor Author

tustvold commented Jan 2, 2025

I think that would be conflating two separate ideas, which I think would get confusing very quickly. Projection is a way to quickly and efficiently discard columns, expression evaluation is a way to then process what remains.

Or to phrase it differently, the projection mask would be computed based on the expression(s) in the query, and the projection mask is then pushed down further, with these components able to just focus on what the expression wants not why it wants it.

@gatesn
Copy link

gatesn commented Jan 3, 2025

I'm not sure I agree that these are two separate ideas, rather, a generalization of the existing notion of projection.

Projection today is all about selecting some subset of columns as-is from a table. A projection expression generalizes this to selecting some subset of columns in some form from a table.

I also don't think it adds that much complexity. Any sufficiently advanced table provider already has the logic to compute the equivalent of a projection mask from an expression for filter push-down. The code paths would be very similar.

There are also real optimizations available here. For example, suppose I write an Arrow int8 column to Parquet. The Arrow schema is serialized into Parquet metadata so at read time the column is read back as int8. If a scalar expression tries to sum this column with an i32, e.g. SELECT col + 10i32, then DataFusion inserts an upcast. Today, this results in decoding the Parquet column (whose smallest physical integer type is int32) into an Arrow int32 array, then casting to an int8, then DataFusion casting back to an int32.

@alamb
Copy link
Contributor

alamb commented Jan 4, 2025

There are also real optimizations available here. For example, suppose I write an Arrow int8 column to Parquet. The Arrow schema is serialized into Parquet metadata so at read time the column is read back as int8. If a scalar expression tries to sum this column with an i32, e.g. SELECT col + 10i32, then DataFusion inserts an upcast. Today, this results in decoding the Parquet column (whose smallest physical integer type is int32) into an Arrow int32 array, then casting to an int8, then DataFusion casting back to an int32.

I tried to reproduce the issue you described and I could not

Specifically, I think in this case DataFusion actully casts the 10 to Int8 and evaluate that directly against the contents of the column. Here is what I tried:

DataFusion CLI v44.0.0
> copy (select arrow_cast(1, 'Int8') as x) to '/tmp/foo.parquet';
+-------+
| count |
+-------+
| 1     |
+-------+
1 row(s) fetched.
Elapsed 0.062 seconds.

> describe '/tmp/foo.parquet';
+-------------+-----------+-------------+
| column_name | data_type | is_nullable |
+-------------+-----------+-------------+
| x           | Int8      | NO          |
+-------------+-----------+-------------+
1 row(s) fetched.
Elapsed 0.012 seconds.

> explain select x = arrow_cast(10, 'Int32') from '/tmp/foo.parquet';
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                  |
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: /tmp/foo.parquet.x = Int8(10) AS /tmp/foo.parquet.x = arrow_cast(Int64(10),Utf8("Int32")) |
|               |   TableScan: /tmp/foo.parquet projection=[x]                                                          |
| physical_plan | ProjectionExec: expr=[x@0 = 10 as /tmp/foo.parquet.x = arrow_cast(Int64(10),Utf8("Int32"))]           |
|               |   ParquetExec: file_groups={1 group: [[tmp/foo.parquet]]}, projection=[x]                             |
|               |                                                                                                       |
+---------------+-------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.017 seconds.

Specifically this line:

| physical_plan | ProjectionExec: expr=[x@0 = 10 as /tmp/foo.parquet.x = arrow_cast(Int64(10),Utf8("Int32"))] |

the x@0 = 10 means the 10 was cast to Int8 to match the column type, not the other way around -- this is done by the UnwrapToCast AnalyzerPass

There may be more complex examples (e.g. with nested types) where the constant cast can't be unwrapped. I think we can probably improve the unwrapping logic for such cases

@gatesn
Copy link

gatesn commented Jan 7, 2025

Apologies, I should have checked the example value. 10_000 shows what I mean:

explain select x = cast(10000 AS int) from '/tmp/foo.parquet';
+---------------+---------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                              |
+---------------+---------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: CAST(/tmp/foo.parquet.x AS Int32) = Int32(10000) AS /tmp/foo.parquet.x = Int64(10000) |
|               |   TableScan: /tmp/foo.parquet projection=[x]                                                      |
| physical_plan | ProjectionExec: expr=[CAST(x@0 AS Int32) = 10000 as /tmp/foo.parquet.x = Int64(10000)]            |
|               |   ParquetExec: file_groups={1 group: [[tmp/foo.parquet]]}, projection=[x]                         |
|               |                                                                                                   |
+---------------+---------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.004 seconds.

A side note, but perhaps we're missing a rule somewhere to know that x can never = 10000 when it started out as a u8? Perhaps my change in #13736 that preserves min/max stats through cast expressions?

But we can see in the physical plan the DataFusion cast from x to Int32, even though x is stored as an Int32 inside Parquet, is read back into an Int32 Arrow array, and down-casted to an Int8 arrow array, all before being returned to DataFusion to be cast back up to Int32.

Admittedly, this could be solved by providing a "target type" in the projection mask, short of full generic projection expression push-down. But it remains interesting that many file formats have the ability to optimize some subset of projection expressions. Even the Parquet reader could push-down projection expressions over dictionary values prior to a full dictionary decode.

@alamb
Copy link
Contributor

alamb commented Jan 7, 2025

Apologies, I should have checked the example value. 10_000 shows what I mean:

Ah, yes, in this case the UnwrapCastInComparison rule can't convert 10,000 to an Int8 and so the conversion is done at runtime

I just ran across a seemingly similar request / PR in arrow-rs from @gruuya

A side note, but perhaps we're missing a rule somewhere to know that x can never = 10000 when it started out as a u8?

Yes I agree this would be a better plan.

Perhaps my change in #13736 that preserves min/max stats through cast expressions?

Indeed that would be quite sweet. We could do the conversion of x = 10000 to false based on:

  1. If the range of the type of x can not represent 10,000 there is no way it can be true
  2. If the known range of x (Make tests for simplify and Simplifer consistent #1376 style) can not represent 10,000 also there is no way it can be true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants