diff --git a/crates/sqlbuiltins/src/functions/table/object_store.rs b/crates/sqlbuiltins/src/functions/table/object_store.rs index 360c78ba4..ad465a9f7 100644 --- a/crates/sqlbuiltins/src/functions/table/object_store.rs +++ b/crates/sqlbuiltins/src/functions/table/object_store.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::{sync::Arc, vec}; use async_trait::async_trait; -use datafusion::arrow::datatypes::{DataType, Field}; +use datafusion::arrow::datatypes::{DataType, Field, Fields}; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::json::JsonFormat; @@ -11,7 +11,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; -use datafusion::logical_expr::{Signature, Volatility}; +use datafusion::logical_expr::{Signature, TypeSignature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFuncContextProvider}; @@ -37,6 +37,8 @@ pub struct ParquetOptionsReader; impl OptionReader for ParquetOptionsReader { type Format = ParquetFormat; + const OPTIONS: &'static [(&'static str, DataType)] = &[]; + fn read_options(_opts: &HashMap) -> Result { Ok(ParquetFormat::default()) } @@ -56,6 +58,13 @@ pub struct CsvOptionReader; impl OptionReader for CsvOptionReader { type Format = CsvFormat; + const OPTIONS: &'static [(&'static str, DataType)] = &[ + // Specify delimiter between fields. Default: ',' + ("delimiter", DataType::Utf8), + // Try to read a header. Default: true + ("has_header", DataType::Boolean), + ]; + fn read_options(opts: &HashMap) -> Result { let mut format = CsvFormat::default().with_schema_infer_max_rec(Some(20480)); @@ -71,6 +80,11 @@ impl OptionReader for CsvOptionReader { format = format.with_delimiter(delimiter); } + if let Some(header) = opts.get("has_header") { + let has_header: bool = header.clone().try_into()?; + format = format.with_has_header(has_header); + } + Ok(format) } } @@ -89,6 +103,8 @@ pub struct JsonOptionsReader; impl OptionReader for JsonOptionsReader { type Format = JsonFormat; + const OPTIONS: &'static [(&'static str, DataType)] = &[]; + fn read_options(_opts: &HashMap) -> Result { Ok(JsonFormat::default()) } @@ -105,6 +121,9 @@ pub const READ_JSON: ObjScanTableFunc = ObjScanTableFunc { pub trait OptionReader: Sync + Send + Sized { type Format: FileFormat + WithCompression + 'static; + /// List of options and their expected data types. + const OPTIONS: &'static [(&'static str, DataType)]; + /// Read user provided options, and construct a file format usign those options. fn read_options(opts: &HashMap) -> Result; } @@ -174,11 +193,24 @@ impl BuiltinFunction for ObjScanTableFunc { } fn signature(&self) -> Option { - Some(Signature::uniform( - 1, + let opts: Fields = Opts::OPTIONS + .iter() + .map(|opt| Field::new(opt.0, opt.1.clone(), false)) + .collect(); + + Some(Signature::one_of( vec![ - DataType::Utf8, - DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))), + // read_csv('path') + TypeSignature::Exact(vec![DataType::Utf8]), + // read_csv('path', ...options) + TypeSignature::Exact(vec![DataType::Utf8, DataType::Struct(opts.clone())]), + // read_csv(['path1', 'path2']) + TypeSignature::Exact(vec![DataType::new_list(DataType::Utf8, false)]), + // read_csv(['path1', 'path2'], options) + TypeSignature::Exact(vec![ + DataType::new_list(DataType::Utf8, false), + DataType::Struct(opts), + ]), ], Volatility::Stable, )) diff --git a/testdata/csv/headerless.csv b/testdata/csv/headerless.csv new file mode 100644 index 000000000..db32d7d74 --- /dev/null +++ b/testdata/csv/headerless.csv @@ -0,0 +1,2 @@ +1,hello,world,3.9 +2,HELLO,WORLD,4.9 diff --git a/testdata/sqllogictests/catalog/functions.slt b/testdata/sqllogictests/catalog/functions.slt index ce8a55206..f2ecdd95b 100644 --- a/testdata/sqllogictests/catalog/functions.slt +++ b/testdata/sqllogictests/catalog/functions.slt @@ -53,7 +53,7 @@ select from glare_catalog.functions where function_name = 'read_parquet'; ---- -read_parquet table Utf8/List t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s). +read_parquet table Utf8,Utf8, ,List,List, t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s). # Assert an arbitrary glaredb table function exists (using an alias). query TTTTTT @@ -67,7 +67,7 @@ select from glare_catalog.functions where function_name = 'parquet_scan'; ---- -parquet_scan table Utf8/List t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s). +parquet_scan table Utf8,Utf8, ,List,List, t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s). # 'array_to_string' is a tricky one since we're aliasing 'array_to_string' to # 'pg_catalog.array_to_string'. A more correct implementation would return two diff --git a/testdata/sqllogictests/functions/read_csv.slt b/testdata/sqllogictests/functions/read_csv.slt index d26e6c5c6..2fcf28542 100644 --- a/testdata/sqllogictests/functions/read_csv.slt +++ b/testdata/sqllogictests/functions/read_csv.slt @@ -1,5 +1,11 @@ # Tests `read_csv` +# Assert options show up in catalog +query T +select parameters from glare_catalog.functions where function_name = 'read_csv'; +---- +Utf8,Utf8, delimiter: Utf8, has_header: Boolean,List,List, delimiter: Utf8, has_header: Boolean + # Absolute path query I select count(*) from read_csv('file://${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv') @@ -65,7 +71,7 @@ select * from read_csv( # Alternative delimiters -query ITR +query ITR rowsort select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';'); ---- 1 hello, world 3.9 @@ -74,3 +80,26 @@ select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';'); # Invalid delimiter (longer than one byte) statement error delimiters for CSV must fit in one byte \(e.g. ','\) select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';;'); + +# Headers + +query ITTR rowsort +select * from read_csv('./testdata/csv/headerless.csv', has_header => false); +---- +1 hello world 3.9 +2 HELLO WORLD 4.9 + +query ITTR rowsort +select * from read_csv('./testdata/csv/headerless.csv', has_header => 'false'); +---- +1 hello world 3.9 +2 HELLO WORLD 4.9 + +statement error Invalid parameter value hello, expected a boolean +select * from read_csv('./testdata/csv/headerless.csv', has_header => 'hello'); + +query I +select count(*) from read_csv('./testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv', has_header => true) +---- +102 +