-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable creating and inserting to empty external tables via SQL #7276
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -217,6 +217,22 @@ pub enum ListingTableInsertMode { | |
///Throw an error if insert into is attempted on this table | ||
Error, | ||
} | ||
|
||
impl FromStr for ListingTableInsertMode { | ||
type Err = DataFusionError; | ||
fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
let s_lower = s.to_lowercase(); | ||
match s_lower.as_str() { | ||
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile), | ||
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles), | ||
"error" => Ok(ListingTableInsertMode::Error), | ||
_ => Err(DataFusionError::Plan(format!( | ||
"Unknown or unsupported insert mode {s}. Supported options are \ | ||
append_to_file, append_new_files, and error." | ||
))), | ||
} | ||
} | ||
} | ||
/// Options for creating a [`ListingTable`] | ||
#[derive(Clone, Debug)] | ||
pub struct ListingOptions { | ||
|
@@ -1607,6 +1623,124 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_sql_csv_defaults() -> Result<()> { | ||
helper_test_insert_into_sql( | ||
"csv", | ||
FileCompressionType::UNCOMPRESSED, | ||
"OPTIONS (insert_mode 'append_new_files')", | ||
None, | ||
) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> { | ||
helper_test_insert_into_sql( | ||
"csv", | ||
FileCompressionType::UNCOMPRESSED, | ||
"WITH HEADER ROW \ | ||
OPTIONS (insert_mode 'append_new_files')", | ||
None, | ||
) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_sql_json_defaults() -> Result<()> { | ||
helper_test_insert_into_sql( | ||
"json", | ||
FileCompressionType::UNCOMPRESSED, | ||
"OPTIONS (insert_mode 'append_new_files')", | ||
None, | ||
) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_sql_parquet_defaults() -> Result<()> { | ||
helper_test_insert_into_sql( | ||
"parquet", | ||
FileCompressionType::UNCOMPRESSED, | ||
"", | ||
None, | ||
) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> { | ||
let mut config_map: HashMap<String, String> = HashMap::new(); | ||
config_map.insert( | ||
"datafusion.execution.parquet.compression".into(), | ||
"zstd(5)".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.dictionary_enabled".into(), | ||
"false".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.dictionary_page_size_limit".into(), | ||
"100".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.staistics_enabled".into(), | ||
"none".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.max_statistics_size".into(), | ||
"10".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.max_row_group_size".into(), | ||
"5".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.created_by".into(), | ||
"datafusion test".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.column_index_truncate_length".into(), | ||
"50".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.data_page_row_count_limit".into(), | ||
"50".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.bloom_filter_enabled".into(), | ||
"true".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.bloom_filter_fpp".into(), | ||
"0.01".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.bloom_filter_ndv".into(), | ||
"1000".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.writer_version".into(), | ||
"2.0".into(), | ||
); | ||
config_map.insert( | ||
"datafusion.execution.parquet.write_batch_size".into(), | ||
"5".into(), | ||
); | ||
helper_test_insert_into_sql( | ||
"parquet", | ||
FileCompressionType::UNCOMPRESSED, | ||
"", | ||
Some(config_map), | ||
) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> { | ||
let mut config_map: HashMap<String, String> = HashMap::new(); | ||
|
@@ -2096,4 +2230,64 @@ mod tests { | |
// Return Ok if the function | ||
Ok(()) | ||
} | ||
|
||
/// tests insert into with end to end sql | ||
/// create external table + insert into statements | ||
async fn helper_test_insert_into_sql( | ||
file_type: &str, | ||
// TODO test with create statement options such as compression | ||
_file_compression_type: FileCompressionType, | ||
external_table_options: &str, | ||
session_config_map: Option<HashMap<String, String>>, | ||
) -> Result<()> { | ||
// Create the initial context | ||
let session_ctx = match session_config_map { | ||
Some(cfg) => { | ||
let config = SessionConfig::from_string_hash_map(cfg)?; | ||
SessionContext::with_config(config) | ||
} | ||
None => SessionContext::new(), | ||
}; | ||
|
||
// create table | ||
let tmp_dir = TempDir::new()?; | ||
let tmp_path = tmp_dir.into_path(); | ||
let str_path = tmp_path.to_str().expect("Temp path should convert to &str"); | ||
session_ctx | ||
.sql(&format!( | ||
"create external table foo(a varchar, b varchar, c int) \ | ||
stored as {file_type} \ | ||
location '{str_path}' \ | ||
{external_table_options}" | ||
)) | ||
.await? | ||
.collect() | ||
.await?; | ||
|
||
// insert data | ||
session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)") | ||
.await? | ||
.collect() | ||
.await?; | ||
|
||
// check count | ||
let batches = session_ctx | ||
.sql("select * from foo") | ||
.await? | ||
.collect() | ||
.await?; | ||
|
||
let expected = vec![ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is very cool. |
||
"+-----+-----+---+", | ||
"| a | b | c |", | ||
"+-----+-----+---+", | ||
"| foo | bar | 1 |", | ||
"| foo | bar | 2 |", | ||
"| foo | bar | 3 |", | ||
"+-----+-----+---+", | ||
]; | ||
assert_batches_eq!(expected, &batches); | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1796,11 +1796,15 @@ fn create_external_table_with_compression_type() { | |
#[test] | ||
fn create_external_table_parquet() { | ||
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'"; | ||
let err = logical_plan(sql).expect_err("query should have failed"); | ||
assert_eq!( | ||
"Plan(\"Column definitions can not be specified for PARQUET files.\")", | ||
format!("{err:?}") | ||
); | ||
let expected = "CreateExternalTable: Bare { table: \"t\" }"; | ||
quick_test(sql, expected); | ||
} | ||
|
||
#[test] | ||
fn create_external_table_parquet_sort_order() { | ||
let sql = "create external table foo(a varchar, b varchar, c timestamp) stored as parquet location '/tmp/foo' with order (c)"; | ||
let expected = "CreateExternalTable: Bare { table: \"foo\" }"; | ||
quick_test(sql, expected); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 I tried to run this locally it doesn't quite work
But it is getting very close. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that error I added intentionally. Inserting to a sorted table would work, but there is nothing to enforce that the sort order is preserved yet. So, my concern is a user inserts unsorted data to a sorted table, and then subsequent queries return incorrect results surprisingly. We could add this as an issue to the streaming writes epic (support inserts to a sorted listingtable). |
||
} | ||
|
||
#[test] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use sqllogictest to test this as well --
Perhaps add to https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/sqllogictests/test_files/insert.slt
I can't remember how temporary directories work in sqllogictests though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually worked on this in #7283 which I just opened for copy.slt. Once those updates are in, I can cut another PR to add additional tests to
insert.slt