Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow reading csv files without a header #2380

Merged
merged 5 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::marker::PhantomData;
use std::{sync::Arc, vec};

use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::arrow::datatypes::{DataType, Field, Fields};
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::{Signature, Volatility};
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
use datafusion_ext::errors::{ExtensionError, Result};
use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFuncContextProvider};

Expand All @@ -37,6 +37,8 @@ pub struct ParquetOptionsReader;
impl OptionReader for ParquetOptionsReader {
type Format = ParquetFormat;

const OPTIONS: &'static [(&'static str, DataType)] = &[];

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(ParquetFormat::default())
}
Expand All @@ -56,6 +58,13 @@ pub struct CsvOptionReader;
impl OptionReader for CsvOptionReader {
type Format = CsvFormat;

const OPTIONS: &'static [(&'static str, DataType)] = &[
// Specify delimiter between fields. Default: ','
("delimiter", DataType::Utf8),
// Try to read a header. Default: true
("has_header", DataType::Boolean),
];

fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
let mut format = CsvFormat::default().with_schema_infer_max_rec(Some(20480));

Expand All @@ -71,6 +80,11 @@ impl OptionReader for CsvOptionReader {
format = format.with_delimiter(delimiter);
}

if let Some(header) = opts.get("has_header") {
let has_header: bool = header.clone().try_into()?;
format = format.with_has_header(has_header);
}

Ok(format)
}
}
Expand All @@ -89,6 +103,8 @@ pub struct JsonOptionsReader;
impl OptionReader for JsonOptionsReader {
type Format = JsonFormat;

const OPTIONS: &'static [(&'static str, DataType)] = &[];

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(JsonFormat::default())
}
Expand All @@ -105,6 +121,9 @@ pub const READ_JSON: ObjScanTableFunc<JsonOptionsReader> = ObjScanTableFunc {
pub trait OptionReader: Sync + Send + Sized {
type Format: FileFormat + WithCompression + 'static;

/// List of options and their expected data types.
const OPTIONS: &'static [(&'static str, DataType)];

/// Read user provided options, and construct a file format usign those options.
fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format>;
}
Expand Down Expand Up @@ -174,11 +193,24 @@ impl<Opts: OptionReader> BuiltinFunction for ObjScanTableFunc<Opts> {
}

fn signature(&self) -> Option<Signature> {
Some(Signature::uniform(
1,
let opts: Fields = Opts::OPTIONS
.iter()
.map(|opt| Field::new(opt.0, opt.1.clone(), false))
.collect();

Some(Signature::one_of(
vec![
DataType::Utf8,
DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
// read_csv('path')
TypeSignature::Exact(vec![DataType::Utf8]),
// read_csv('path', ...options)
TypeSignature::Exact(vec![DataType::Utf8, DataType::Struct(opts.clone())]),
// read_csv(['path1', 'path2'])
TypeSignature::Exact(vec![DataType::new_list(DataType::Utf8, false)]),
// read_csv(['path1', 'path2'], options)
TypeSignature::Exact(vec![
DataType::new_list(DataType::Utf8, false),
DataType::Struct(opts),
]),
],
Volatility::Stable,
))
Expand Down
2 changes: 2 additions & 0 deletions testdata/csv/headerless.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1,hello,world,3.9
2,HELLO,WORLD,4.9
4 changes: 2 additions & 2 deletions testdata/sqllogictests/catalog/functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ select
from glare_catalog.functions
where function_name = 'read_parquet';
----
read_parquet table Utf8/List<Utf8> t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).
read_parquet table Utf8,Utf8, ,List<Utf8>,List<Utf8>, t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).

# Assert an arbitrary glaredb table function exists (using an alias).
query TTTTTT
Expand All @@ -67,7 +67,7 @@ select
from glare_catalog.functions
where function_name = 'parquet_scan';
----
parquet_scan table Utf8/List<Utf8> t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).
parquet_scan table Utf8,Utf8, ,List<Utf8>,List<Utf8>, t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).

# 'array_to_string' is a tricky one since we're aliasing 'array_to_string' to
# 'pg_catalog.array_to_string'. A more correct implementation would return two
Expand Down
31 changes: 30 additions & 1 deletion testdata/sqllogictests/functions/read_csv.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Tests `read_csv`

# Assert options show up in catalog
query T
select parameters from glare_catalog.functions where function_name = 'read_csv';
----
Utf8,Utf8, delimiter: Utf8, has_header: Boolean,List<Utf8>,List<Utf8>, delimiter: Utf8, has_header: Boolean

# Absolute path
query I
select count(*) from read_csv('file://${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv')
Expand Down Expand Up @@ -65,7 +71,7 @@ select * from read_csv(

# Alternative delimiters

query ITR
query ITR rowsort
select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';');
----
1 hello, world 3.9
Expand All @@ -74,3 +80,26 @@ select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';');
# Invalid delimiter (longer than one byte)
statement error delimiters for CSV must fit in one byte \(e.g. ','\)
select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';;');

# Headers

query ITTR rowsort
select * from read_csv('./testdata/csv/headerless.csv', has_header => false);
----
1 hello world 3.9
2 HELLO WORLD 4.9

query ITTR rowsort
select * from read_csv('./testdata/csv/headerless.csv', has_header => 'false');
----
1 hello world 3.9
2 HELLO WORLD 4.9

statement error Invalid parameter value hello, expected a boolean
select * from read_csv('./testdata/csv/headerless.csv', has_header => 'hello');

query I
select count(*) from read_csv('./testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv', has_header => true)
----
102

Loading