Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): Allowing setting sort order of parquet files without specifying the schema #12466

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Factory for creating ListingTables with default options

use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -27,7 +28,7 @@ use crate::datasource::listing::{
use crate::execution::context::SessionState;

use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::{arrow_datafusion_err, DataFusionError};
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
use datafusion_common::{config_datafusion_err, Result};
use datafusion_expr::CreateExternalTable;

Expand Down Expand Up @@ -113,19 +114,39 @@ impl TableProviderFactory for ListingTableFactory {
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone());
.with_table_partition_cols(table_partition_cols);

options
.validate_partitions(session_state, &table_path)
.await?;

let resolved_schema = match provided_schema {
None => options.infer_schema(session_state, &table_path).await?,
// We will need to check the table columns against the schema
// this is done so that we can do an ORDER BY for external table creation
// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
let schema = options.infer_schema(session_state, &table_path).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. Thank you @devanbenz -- perfect

let df_schema = schema.clone().to_dfschema()?;
let column_refs: HashSet<_> = cmd
.order_exprs
.iter()
.flat_map(|sort| sort.iter())
.flat_map(|s| s.expr.column_refs())
.collect();

for column in &column_refs {
if !df_schema.has_column(column) {
return plan_err!("Column {column} is not in schema");
}
}

schema
}
Some(s) => s,
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
.with_schema(resolved_schema);
let provider = ListingTable::try_new(config)?
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());
Expand Down
25 changes: 20 additions & 5 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,14 +1136,29 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: &DFSchemaRef,
planner_context: &mut PlannerContext,
) -> Result<Vec<Vec<SortExpr>>> {
// Ask user to provide a schema if schema is empty.
let mut all_results = vec![];
if !order_exprs.is_empty() && schema.fields().is_empty() {
return plan_err!(
"Provide a schema before specifying the order while creating a table."
);
let mut results = vec![];
for expr in order_exprs {
for ordered_expr in expr {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've noticed that a lot of this codebase prefers maps over for loops like this. I personally think for loops are easier to read but I can modify this to use a map and collect instead if that its preferred. Not sure if either is "more performant".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to use map / collect to follow the same conventions, but I don't think it is required

It also took me a while to get used to the map/collect pattern. At first I thought it was just functional language hipster stuff, but then I realized that it is often a key optimization (When possible, collect can figure out how but the target container is and do a single allocation rather than having to grow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good I think that I'll modify this to use a map/collect so I can be hip (and get a single allocation) 😎

let order_expr = ordered_expr.expr.to_owned();
let order_expr = self.sql_expr_to_logical_expr(
order_expr,
schema,
planner_context,
)?;
let nulls_first = ordered_expr.nulls_first.unwrap_or(true);
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
let asc = ordered_expr.asc.unwrap_or(true);
let sort_expr = SortExpr::new(order_expr, asc, nulls_first);
results.push(sort_expr);
}
let sort_results = &results;
all_results.push(sort_results.to_owned());
}

return Ok(all_results);
}

let mut all_results = vec![];
for expr in order_exprs {
// Convert each OrderByExpr to a SortExpr:
let expr_vec =
Expand Down
7 changes: 7 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,13 @@ fn create_external_table_parquet_no_schema() {
quick_test(sql, expected);
}

#[test]
fn create_external_table_parquet_no_schema_sort_order() {
let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet' WITH ORDER (id)";
let expected = "CreateExternalTable: Bare { table: \"t\" }";
quick_test(sql, expected);
}

#[test]
fn equijoin_explicit_syntax() {
let sql = "SELECT id, order_id \
Expand Down
10 changes: 10 additions & 0 deletions datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,13 @@ OPTIONS (
format.delimiter '|',
has_header false,
compression gzip);

# Create an external parquet table and infer schema to order by

# query should succeed
statement ok
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test that shows the table is actually ordered correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do


# query should fail with bad column
statement error
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason this will fail is that there is already a table named t -- so it is probably good to check the actual error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

7 changes: 0 additions & 7 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -653,13 +653,6 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te
query error DataFusion error: Error during planning: Column a is not in schema
CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';


# Create external table with DDL ordered columns without schema
# When schema is missing the query is expected to fail
query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\.
CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';


# Sort with duplicate sort expressions
# Table is sorted multiple times on the same column name and should not fail
statement ok
Expand Down