Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate partitions columns in CREATE EXTERNAL TABLE if table already exists. #9912

Merged
merged 14 commits into from
Apr 5, 2024
Merged
107 changes: 107 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;

/// Configuration for creating a [`ListingTable`]
Expand Down Expand Up @@ -438,6 +439,112 @@ impl ListingOptions {

self.format.infer_schema(state, &store, &files).await
}

/// Infers the partition columns stored in `LOCATION` and compares
/// them with the columns provided in `PARTITIONED BY` to help prevent
/// accidental corrupts of partitioned tables.
///
/// Allows specifying partial partitions.
pub async fn validate_partitions(
&self,
state: &SessionState,
table_path: &ListingTableUrl,
) -> Result<()> {
if self.table_partition_cols.is_empty() {
return Ok(());
}

if !table_path.is_collection() {
return plan_err!(
"Can't create a partitioned table backed by a single file, \
perhaps the URL is missing a trailing slash?"
);
}

let inferred = self.infer_partitions(state, table_path).await?;

// no partitioned files found on disk
if inferred.is_empty() {
return Ok(());
}

let table_partition_names = self
.table_partition_cols
.iter()
.map(|(col_name, _)| col_name.clone())
.collect_vec();
MohamedAbdeen21 marked this conversation as resolved.
Show resolved Hide resolved

if inferred.len() < table_partition_names.len() {
return plan_err!(
"Inferred partitions to be {:?}, but got {:?}",
inferred,
table_partition_names
);
}

// match prefix to allow creating tables with partial partitions
for (idx, col) in table_partition_names.iter().enumerate() {
if &inferred[idx] != col {
return plan_err!(
"Inferred partitions to be {:?}, but got {:?}",
inferred,
table_partition_names
);
}
}

Ok(())
}

/// Infer the partitioning at the given path on the provided object store.
/// For performance reasons, it doesn't read all the files on disk
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

/// and therefore may fail to detect invalid partitioning.
async fn infer_partitions(
&self,
state: &SessionState,
table_path: &ListingTableUrl,
) -> Result<Vec<String>> {
let store = state.runtime_env().object_store(table_path)?;

// only use 10 files for inference
// This can fail to detect inconsistent partition keys
// A DFS traversal approach of the store can help here
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
.take(10)
.try_collect()
.await?;

let stripped_path_parts = files.iter().map(|file| {
table_path
.strip_prefix(&file.location)
.unwrap()
.collect_vec()
});

let partition_keys = stripped_path_parts
.map(|path_parts| {
path_parts
.into_iter()
.rev()
.skip(1) // get parents only; skip the file itself
.rev()
.map(|s| s.split('=').take(1).collect())
Copy link
Contributor

@Jefffrey Jefffrey Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the partition column name containing a = a cheeky edge case we need to consider here?

Edit: related #9269 (comment)

Copy link
Contributor Author

@MohamedAbdeen21 MohamedAbdeen21 Apr 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need to consider this. Now that you mention it, I realize that I never actually thought about how these column names are written to disk. I'll have to take a look on how other systems do it before suggesting anything.

After looking at the linked issue, I think this should be addressed in a follow-up since it may require changing other parts of DF first.

.collect_vec()
})
.collect_vec();

match partition_keys.into_iter().all_equal_value() {
Ok(v) => Ok(v),
Err(None) => Ok(vec![]),
Err(Some(diff)) => {
let mut sorted_diff = [diff.0, diff.1];
sorted_diff.sort();
plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
}
}
}
}

/// Reads data from one or more files via an
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ impl TableProviderFactory for ListingTableFactory {
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone());

options.validate_partitions(state, &table_path).await?;

let resolved_schema = match provided_schema {
None => options.infer_schema(state, &table_path).await?,
Some(s) => s,
Expand Down
12 changes: 10 additions & 2 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow::datatypes::{ArrowNativeType, UInt16Type};
use arrow_array::{ArrayRef, DictionaryArray, RecordBatch, RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, Statistics};
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::LexOrdering;

use log::warn;
Expand Down Expand Up @@ -256,9 +256,17 @@ impl PartitionColumnProjector {
file_batch.columns().len()
);
}

let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
let p_value =
partition_values
.get(pidx)
.ok_or(DataFusionError::Execution(
"Invalid partitioning found on disk".to_string(),
))?;

let mut partition_value = Cow::Borrowed(p_value);

// check if user forgot to dict-encode the partition value
let field = self.projected_schema.field(sidx);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ impl SessionContext {
table_ref: impl Into<TableReference<'a>>,
provider: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table_ref = table_ref.into();
let table_ref: TableReference = table_ref.into();
let table = table_ref.table().to_owned();
self.state
.read()
Expand Down
93 changes: 92 additions & 1 deletion datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,95 @@ statement error DataFusion error: Invalid or Unsupported Configuration: Config v
CREATE EXTERNAL TABLE csv_table (column1 int)
STORED AS CSV
LOCATION 'foo.csv'
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')

# Partitioned table on a single file
query error DataFusion error: Error during planning: Can't create a partitioned table backed by a single file, perhaps the URL is missing a trailing slash\?
CREATE EXTERNAL TABLE single_file_partition(c1 int)
PARTITIONED BY (p2 string, p1 string)
STORED AS CSV
LOCATION 'foo.csv';

# Wrong partition order error

statement ok
CREATE EXTERNAL TABLE partitioned (c1 int)
PARTITIONED BY (p1 string, p2 string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';

query ITT
INSERT INTO partitioned VALUES (1, 'x', 'y');
----
1

query error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2", "p1"\]
CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
PARTITIONED BY (p2 string, p1 string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';

statement error DataFusion error: Error during planning: Inferred partitions to be \["p1", "p2"\], but got \["p2"\]
CREATE EXTERNAL TABLE wrong_order_partitioned (c1 int)
PARTITIONED BY (p2 string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';

# But allows partial partition selection

statement ok
CREATE EXTERNAL TABLE partial_partitioned (c1 int)
PARTITIONED BY (p1 string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/';

query IT
SELECT * FROM partial_partitioned;
----
1 x

statement ok
CREATE EXTERNAL TABLE inner_partition (c1 int)
PARTITIONED BY (p2 string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/bad_partitioning/p1=x/';

query IT
SELECT * FROM inner_partition;
----
1 y

# Simulate manual creation of invalid (mixed) partitions on disk

statement ok
CREATE EXTERNAL TABLE test(name string)
PARTITIONED BY (year string, month string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';

statement ok
-- passes the partition check since the previous statement didn't write to disk
CREATE EXTERNAL TABLE test2(name string)
PARTITIONED BY (month string, year string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';

query TTT
-- creates year -> month partitions
INSERT INTO test VALUES('name', '2024', '03');
----
1

query TTT
-- creates month -> year partitions.
-- now table have both partitions (year -> month and month -> year)
INSERT INTO test2 VALUES('name', '2024', '03');
----
1

statement error DataFusion error: Error during planning: Found mixed partition values on disk \[\["month", "year"\], \["year", "month"\]\]
-- fails to infer as partitions are not consistent
CREATE EXTERNAL TABLE test3(name string)
PARTITIONED BY (month string, year string)
STORED AS parquet
LOCATION 'test_files/scratch/create_external_table/manual_partitioning/';
Loading