Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): Allowing setting sort order of parquet files without specifying the schema #12466

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

devanbenz
Copy link
Contributor

@devanbenz devanbenz commented Sep 14, 2024

Which issue does this PR close?

Closes #7317

Rationale for this change

This allows for setting the order upon creation of tables using parquet files without having to specify the schema. Since parquet already has the schema readily available in the metadata this is a relatively quick fix that will enable downstream usage to be less cumbersome, specifically, when setting up reproduction of issues.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the sql SQL Planner label Sep 14, 2024
@devanbenz devanbenz marked this pull request as draft September 14, 2024 17:12
@devanbenz devanbenz marked this pull request as ready for review September 14, 2024 19:06
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devanbenz -- I am sorry I thought i had left a review of thsi PR before but apparently I had not hit submit

@@ -1028,8 +1030,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.into_iter()
.collect();

let schema = self.build_schema(columns)?;
let df_schema = schema.to_dfschema_ref()?;
let df_schema = match file_type.as_str() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry for the delayed feeback @devanbenz -- I swear I typed this feedback but i must not have clicked "submit"

Basically my concerns about this approach are twofold:

  1. This code assumes the parquet file is on the local filesystem (when for many systems it may be on remote object storage)
  2. It also adds a dependency in sql parsing to the parquet format. Since parquet has quite a few dependencies, this new dependency is likely non ideal for systems that are using DataFusion for sql parsing (like dask-sql for example)

Perhaps you could delay the creation of the ORDER BY until the table provider is resolved?

The table provider: https://github.com/apache/datafusion/blob/2521043ddcb3895a2010b8e328f3fa10f77fc094/datafusion/expr/src/planner.rs#L35-L34

Once the table provider is resolved then the schema's table can be known

Another benefit of this approach is that it would work for all formats, not just parquet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry for the delayed feeback @devanbenz -- I swear I typed this feedback but i must not have clicked "submit"

Thats alright -happens to me all the time 😅

Perhaps you could delay the creation of the ORDER BY until the table provider is resolved?

Sounds good, I like this idea. 👍

@devanbenz
Copy link
Contributor Author

Converting to a draft until I have the final implementation done 👍

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Sep 19, 2024
…ecifying the schema

This PR allows for the following SQL query to be passed without a schema

create external table cpu stored as parquet location 'cpu.parquet' with order (time);

closes apache#7317
@devanbenz
Copy link
Contributor Author

devanbenz commented Sep 19, 2024

@alamb I have this working but I'm unsure if the original implementation is working as expected. Shouldn't the times be descending in this first selection?

> create external table cpu(time timestamp) stored as parquet location '/Users/devan/Downloads/cpu.parquet' with order (time desc);

0 row(s) fetched. 
Elapsed 0.013 seconds.

> select * from cpu
;
+---------------------+
| time                |
+---------------------+
| 2023-03-01T00:00:00 |
| 2023-03-02T00:00:00 |
+---------------------+
2 row(s) fetched. 
Elapsed 0.018 seconds.

> drop table cpu;
0 row(s) fetched. 
Elapsed 0.002 seconds.

> create external table cpu(time timestamp) stored as parquet location '/Users/devan/Downloads/cpu.parquet' with order (time asc);
0 row(s) fetched. 
Elapsed 0.004 seconds.

> select * from cpu;
+---------------------+
| time                |
+---------------------+
| 2023-03-01T00:00:00 |
| 2023-03-02T00:00:00 |
+---------------------+
2 row(s) fetched. 
Elapsed 0.008 seconds.

Note: this is using the already existing code

EDIT: Reading through the documentation I see:

It’s important to understand that using the WITH ORDER clause in the CREATE EXTERNAL TABLE statement only specifies the order in which the data should be read from the external file. If the data in the file is not already sorted according to the specified order, then the results may not be correct.

So it sounds like this is working as expected then 🫡

@devanbenz devanbenz marked this pull request as ready for review September 19, 2024 14:21
);
let mut results = vec![];
for expr in order_exprs {
for ordered_expr in expr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that a lot of this codebase prefers maps over for loops like this. I personally think for loops are easier to read but I can modify this to use a map and collect instead if that its preferred. Not sure if either is "more performant".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to use map / collect to follow the same conventions, but I don't think it is required

It also took me a while to get used to the map/collect pattern. At first I thought it was just functional language hipster stuff, but then I realized that it is often a key optimization (When possible, collect can figure out how but the target container is and do a single allocation rather than having to grow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good I think that I'll modify this to use a map/collect so I can be hip (and get a single allocation) 😎

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank yoU @devanbenz -- I had this checked out to review and I wanted to write a few more tests. Rather than explaining the tests in words and then having you have to push them I figured I would just push them directly

);
let mut results = vec![];
for expr in order_exprs {
for ordered_expr in expr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to use map / collect to follow the same conventions, but I don't think it is required

It also took me a while to get used to the map/collect pattern. At first I thought it was just functional language hipster stuff, but then I realized that it is often a key optimization (When possible, collect can figure out how but the target container is and do a single allocation rather than having to grow)

// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
let schema = options.infer_schema(session_state, &table_path).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Thank you @devanbenz -- perfect


# query should fail with bad column
statement error
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason this will fail is that there is already a table named t -- so it is probably good to check the actual error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# query should succeed
statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test that shows the table is actually ordered correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do

statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id DESC NULLS FIRST);

## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 DESC NULLS FIRST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test shows one small bug (the output ordering is DESC not DESC NULLS FIRST)

I think this is due to the fact that this PR computes nulls first like this:

                   let nulls_first = ordered_expr.nulls_first.unwrap_or(true);

But the SQL planner computes it like this:

// when asc is true, by default nulls last to be consistent with postgres
// postgres rule: https://www.postgresql.org/docs/current/queries-order.html
nulls_first.unwrap_or(!asc),

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @devanbenz -- I think there is one very small bug to fix in this PR, but then it will be good to go.

I am happy to fix the bug too and push to your branch; Just let me know

datafusion/sql/src/statement.rs Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Sep 20, 2024

Another test I thought of is testing ordering with more than one column WITH ORDER (a, b)

devanbenz and others added 2 commits September 20, 2024 10:27
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it -- thank you so much @devanbenz

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sql SQL Planner sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allowing setting sort order of parquet files without specifying the schema
2 participants