Skip to content

Commit

Permalink
Enable creating and inserting to empty external tables via SQL (apach…
Browse files Browse the repository at this point in the history
…e#7276)

* end to end sql test cases + fixes needed to pass

* remove not needed repartition statement
  • Loading branch information
devinjdangelo authored Aug 14, 2023
1 parent e253b8e commit 3bbf48a
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 13 deletions.
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,11 @@ impl DataSink for CsvSink {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = true;
let header = self.has_header;
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
Expand All @@ -567,6 +566,8 @@ impl DataSink for CsvSink {
object_store.clone(),
)
.await?;

serializers.push(Box::new(serializer));
writers.push(writer);
}
}
Expand Down
194 changes: 194 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ pub enum ListingTableInsertMode {
///Throw an error if insert into is attempted on this table
Error,
}

impl FromStr for ListingTableInsertMode {
type Err = DataFusionError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s_lower = s.to_lowercase();
match s_lower.as_str() {
"append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
"append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
"error" => Ok(ListingTableInsertMode::Error),
_ => Err(DataFusionError::Plan(format!(
"Unknown or unsupported insert mode {s}. Supported options are \
append_to_file, append_new_files, and error."
))),
}
}
}
/// Options for creating a [`ListingTable`]
#[derive(Clone, Debug)]
pub struct ListingOptions {
Expand Down Expand Up @@ -1607,6 +1623,124 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_csv_defaults() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
"WITH HEADER ROW \
OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_json_defaults() -> Result<()> {
helper_test_insert_into_sql(
"json",
FileCompressionType::UNCOMPRESSED,
"OPTIONS (insert_mode 'append_new_files')",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
None,
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
config_map.insert(
"datafusion.execution.parquet.compression".into(),
"zstd(5)".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_enabled".into(),
"false".into(),
);
config_map.insert(
"datafusion.execution.parquet.dictionary_page_size_limit".into(),
"100".into(),
);
config_map.insert(
"datafusion.execution.parquet.staistics_enabled".into(),
"none".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_statistics_size".into(),
"10".into(),
);
config_map.insert(
"datafusion.execution.parquet.max_row_group_size".into(),
"5".into(),
);
config_map.insert(
"datafusion.execution.parquet.created_by".into(),
"datafusion test".into(),
);
config_map.insert(
"datafusion.execution.parquet.column_index_truncate_length".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.data_page_row_count_limit".into(),
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"true".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_fpp".into(),
"0.01".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_ndv".into(),
"1000".into(),
);
config_map.insert(
"datafusion.execution.parquet.writer_version".into(),
"2.0".into(),
);
config_map.insert(
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
helper_test_insert_into_sql(
"parquet",
FileCompressionType::UNCOMPRESSED,
"",
Some(config_map),
)
.await?;
Ok(())
}

#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
Expand Down Expand Up @@ -2096,4 +2230,64 @@ mod tests {
// Return Ok if the function
Ok(())
}

/// tests insert into with end to end sql
/// create external table + insert into statements
async fn helper_test_insert_into_sql(
file_type: &str,
// TODO test with create statement options such as compression
_file_compression_type: FileCompressionType,
external_table_options: &str,
session_config_map: Option<HashMap<String, String>>,
) -> Result<()> {
// Create the initial context
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
}
None => SessionContext::new(),
};

// create table
let tmp_dir = TempDir::new()?;
let tmp_path = tmp_dir.into_path();
let str_path = tmp_path.to_str().expect("Temp path should convert to &str");
session_ctx
.sql(&format!(
"create external table foo(a varchar, b varchar, c int) \
stored as {file_type} \
location '{str_path}' \
{external_table_options}"
))
.await?
.collect()
.await?;

// insert data
session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
.await?
.collect()
.await?;

// check count
let batches = session_ctx
.sql("select * from foo")
.await?
.collect()
.await?;

let expected = vec![
"+-----+-----+---+",
"| a | b | c |",
"+-----+-----+---+",
"| foo | bar | 1 |",
"| foo | bar | 2 |",
"| foo | bar | 3 |",
"+-----+-----+---+",
];
assert_batches_eq!(expected, &batches);

Ok(())
}
}
17 changes: 16 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;

use super::listing::ListingTableInsertMode;

/// A `TableProviderFactory` capable of creating new `ListingTable`s
pub struct ListingTableFactory {}

Expand Down Expand Up @@ -131,13 +133,26 @@ impl TableProviderFactory for ListingTableFactory {
// look for 'infinite' as an option
let infinite_source = cmd.unbounded;

let explicit_insert_mode = cmd.options.get("insert_mode");
let insert_mode = match explicit_insert_mode {
Some(mode) => ListingTableInsertMode::from_str(mode),
None => match file_type {
FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
FileType::PARQUET => Ok(ListingTableInsertMode::AppendNewFiles),
FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles),
},
}?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_infinite_source(infinite_source)
.with_file_sort_order(cmd.order_exprs.clone());
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode);

let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
Expand Down
5 changes: 0 additions & 5 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
options,
} = statement;

// semantic checks
if file_type == "PARQUET" && !columns.is_empty() {
plan_err!("Column definitions can not be specified for PARQUET files.")?;
}

if file_type != "CSV"
&& file_type != "JSON"
&& file_compression_type != CompressionTypeVariant::UNCOMPRESSED
Expand Down
14 changes: 9 additions & 5 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1796,11 +1796,15 @@ fn create_external_table_with_compression_type() {
#[test]
fn create_external_table_parquet() {
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"Plan(\"Column definitions can not be specified for PARQUET files.\")",
format!("{err:?}")
);
let expected = "CreateExternalTable: Bare { table: \"t\" }";
quick_test(sql, expected);
}

#[test]
fn create_external_table_parquet_sort_order() {
let sql = "create external table foo(a varchar, b varchar, c timestamp) stored as parquet location '/tmp/foo' with order (c)";
let expected = "CreateExternalTable: Bare { table: \"foo\" }";
quick_test(sql, expected);
}

#[test]
Expand Down

0 comments on commit 3bbf48a

Please sign in to comment.