Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow declaring partition columns in PARTITION BY clause, backwards compatible #9599

Merged
merged 11 commits into from
Mar 30, 2024
59 changes: 55 additions & 4 deletions datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub(crate) type LexOrdering = Vec<OrderByExpr>;
/// [ WITH HEADER ROW ]
/// [ DELIMITER <char> ]
/// [ COMPRESSION TYPE <GZIP | BZIP2 | XZ | ZSTD> ]
/// [ PARTITIONED BY (<column list>) ]
/// [ PARTITIONED BY (<column_definition list> | <column list>) ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// [ WITH ORDER (<ordered column list>)
/// [ OPTIONS (<key_value_list>) ]
/// LOCATION <literal>
Expand Down Expand Up @@ -693,7 +693,7 @@ impl<'a> DFParser<'a> {
self.parser
.parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
let table_name = self.parser.parse_object_name(true)?;
let (columns, constraints) = self.parse_columns()?;
let (mut columns, constraints) = self.parse_columns()?;

#[derive(Default)]
struct Builder {
Expand Down Expand Up @@ -754,7 +754,30 @@ impl<'a> DFParser<'a> {
Keyword::PARTITIONED => {
self.parser.expect_keyword(Keyword::BY)?;
ensure_not_set(&builder.table_partition_cols, "PARTITIONED BY")?;
builder.table_partition_cols = Some(self.parse_partitions()?);
// Expects either list of column names (col_name [, col_name]*)
// or list of column definitions (col_name datatype [, col_name datatype]* )
// use the token after the name to decide which parsing rule to use
// Note that mixing both names and definitions is not allowed
let peeked = self.parser.peek_nth_token(2);
if peeked == Token::Comma || peeked == Token::RParen {
Copy link
Contributor Author

@MohamedAbdeen21 MohamedAbdeen21 Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works fine but feels hacky. Considering replacing this if with a more robust function that tries to apply a parsing rule and falls back (undo consumed tokens) when rule fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not immediately clear why this logic works. I think at a minimum we should add a comment explaining the reasoning of this condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with sqlparser crate, but I don't think it allows rewinding tokens, we will have to implement a parsing rule that only uses peeks, which sounds really unnecessary. Will add a comment for now and maybe can find a better way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@MohamedAbdeen21 MohamedAbdeen21 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks promising. I'll open a follow up PR if it works out. Thanks for pointing it out.

Edit: misread the docs, we need a version of consume_tokens that returns true without actually consuming the tokens, because we need to parse the tokens later.

Also, would be perfect if it can match a pattern to catch mixing syntax in the clause.

// list of column names
builder.table_partition_cols = Some(self.parse_partitions()?)
} else {
// list of column defs
let (cols, cons) = self.parse_columns()?;
builder.table_partition_cols = Some(
cols.iter().map(|col| col.name.to_string()).collect(),
);

columns.extend(cols);

if !cons.is_empty() {
return Err(ParserError::ParserError(
"Constraints on Partition Columns are not supported"
.to_string(),
));
}
}
}
Keyword::OPTIONS => {
ensure_not_set(&builder.options, "OPTIONS")?;
Expand Down Expand Up @@ -1167,9 +1190,37 @@ mod tests {
});
expect_parse_ok(sql, expected)?;

// Error cases: partition column does not support type
// positive case: column definiton allowed in 'partition by' clause
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: "t".into(),
columns: vec![
make_column_def("c1", DataType::Int(None)),
make_column_def("p1", DataType::Int(None)),
],
file_type: "CSV".to_string(),
has_header: false,
delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string()],
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
unbounded: false,
options: HashMap::new(),
constraints: vec![],
});
expect_parse_ok(sql, expected)?;

// negative case: mixed column defs and column names in `PARTITIONED BY` clause
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int, c1) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected a data type name, found: )");

// negative case: mixed column defs and column names in `PARTITIONED BY` clause
let sql =
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (c1, p1 int) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");

// positive case: additional options (one entry) can be specified
Expand Down
10 changes: 9 additions & 1 deletion datafusion/sqllogictest/test_files/create_external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,17 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH LOCATION 'foo.csv';
statement error DataFusion error: SQL error: ParserError\("Unexpected token FOOBAR"\)
CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 'foo.csv';

# Missing partition column
MohamedAbdeen21 marked this conversation as resolved.
Show resolved Hide resolved
statement error DataFusion error: Arrow error: Schema error: Unable to get field named "c2". Valid fields: \["c1"\]
create EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (c2) LOCATION 'foo.csv'

# Duplicate Column in `PARTITIONED BY` clause
statement error DataFusion error: Schema error: Schema contains duplicate unqualified field name c1
create EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV PARTITIONED BY (c1 int) LOCATION 'foo.csv'

# Conflicting options
statement error DataFusion error: Invalid or Unsupported Configuration: Config value "column_index_truncate_length" not found on CsvOptions
CREATE EXTERNAL TABLE csv_table (column1 int)
STORED AS CSV
LOCATION 'foo.csv'
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123')
36 changes: 30 additions & 6 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ LOCATION '../../testing/data/csv/aggregate_test_100.csv'


statement ok
create table dictionary_encoded_values as values
create table dictionary_encoded_values as values
('a', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('b', arrow_cast('bar', 'Dictionary(Int32, Utf8)'));

query TTT
Expand All @@ -55,13 +55,13 @@ statement ok
CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
a varchar,
b varchar,
)
)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b);

query TT
insert into dictionary_encoded_parquet_partitioned
insert into dictionary_encoded_parquet_partitioned
select * from dictionary_encoded_values
----
2
Expand All @@ -76,21 +76,21 @@ statement ok
CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned(
a varchar,
b varchar,
)
)
STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
PARTITIONED BY (b);

query TT
insert into dictionary_encoded_arrow_partitioned
insert into dictionary_encoded_arrow_partitioned
select * from dictionary_encoded_values
----
2

statement ok
CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback(
a varchar,
)
)
STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/';

Expand Down Expand Up @@ -185,6 +185,30 @@ select * from partitioned_insert_test_verify;
1
2

statement ok
CREATE EXTERNAL TABLE
partitioned_insert_test_hive(c bigint)
STORED AS csv
LOCATION 'test_files/scratch/insert_to_external/insert_to_partitioned'
PARTITIONED BY (a string, b string);

query ITT
INSERT INTO partitioned_insert_test_hive VALUES (3,30,300);
----
1

query ITT
SELECT * FROM partitioned_insert_test_hive order by a,b,c;
----
1 10 100
1 10 200
1 20 100
2 20 100
1 20 200
2 20 200
3 30 300


statement ok
CREATE EXTERNAL TABLE
partitioned_insert_test_json(a string, b string)
Expand Down
Loading