Skip to content

Commit

Permalink
feat: Allow reading csv files without a header (#2380)
Browse files Browse the repository at this point in the history
```
> select * from read_csv('./testdata/csv/headerless.csv', header => 'false');
┌──────────┬──────────┬──────────┬──────────┐
│ column_1 │ column_2 │ column_3 │ column_4 │
│       ── │ ──       │ ──       │       ── │
│    Int64 │ Utf8     │ Utf8     │  Float64 │
╞══════════╪══════════╪══════════╪══════════╡
│        1 │ hello    │ world    │  3.90000 │
│        2 │ HELLO    │ WORLD    │  4.90000 │
└──────────┴──────────┴──────────┴──────────┘
```

Also does things with the signature. Closes
#2375

---------

Co-authored-by: universalmind303 <cory.grinstead@gmail.com>
  • Loading branch information
scsmithr and universalmind303 authored Jan 8, 2024
1 parent 62f8bb8 commit f9963d1
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 9 deletions.
44 changes: 38 additions & 6 deletions crates/sqlbuiltins/src/functions/table/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::marker::PhantomData;
use std::{sync::Arc, vec};

use async_trait::async_trait;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::arrow::datatypes::{DataType, Field, Fields};
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::{Signature, Volatility};
use datafusion::logical_expr::{Signature, TypeSignature, Volatility};
use datafusion_ext::errors::{ExtensionError, Result};
use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFuncContextProvider};

Expand All @@ -37,6 +37,8 @@ pub struct ParquetOptionsReader;
impl OptionReader for ParquetOptionsReader {
type Format = ParquetFormat;

const OPTIONS: &'static [(&'static str, DataType)] = &[];

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(ParquetFormat::default())
}
Expand All @@ -56,6 +58,13 @@ pub struct CsvOptionReader;
impl OptionReader for CsvOptionReader {
type Format = CsvFormat;

const OPTIONS: &'static [(&'static str, DataType)] = &[
// Specify delimiter between fields. Default: ','
("delimiter", DataType::Utf8),
// Try to read a header. Default: true
("has_header", DataType::Boolean),
];

fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
let mut format = CsvFormat::default().with_schema_infer_max_rec(Some(20480));

Expand All @@ -71,6 +80,11 @@ impl OptionReader for CsvOptionReader {
format = format.with_delimiter(delimiter);
}

if let Some(header) = opts.get("has_header") {
let has_header: bool = header.clone().try_into()?;
format = format.with_has_header(has_header);
}

Ok(format)
}
}
Expand All @@ -89,6 +103,8 @@ pub struct JsonOptionsReader;
impl OptionReader for JsonOptionsReader {
type Format = JsonFormat;

const OPTIONS: &'static [(&'static str, DataType)] = &[];

fn read_options(_opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format> {
Ok(JsonFormat::default())
}
Expand All @@ -105,6 +121,9 @@ pub const READ_JSON: ObjScanTableFunc<JsonOptionsReader> = ObjScanTableFunc {
pub trait OptionReader: Sync + Send + Sized {
type Format: FileFormat + WithCompression + 'static;

/// List of options and their expected data types.
const OPTIONS: &'static [(&'static str, DataType)];

/// Read user provided options, and construct a file format usign those options.
fn read_options(opts: &HashMap<String, FuncParamValue>) -> Result<Self::Format>;
}
Expand Down Expand Up @@ -174,11 +193,24 @@ impl<Opts: OptionReader> BuiltinFunction for ObjScanTableFunc<Opts> {
}

fn signature(&self) -> Option<Signature> {
Some(Signature::uniform(
1,
let opts: Fields = Opts::OPTIONS
.iter()
.map(|opt| Field::new(opt.0, opt.1.clone(), false))
.collect();

Some(Signature::one_of(
vec![
DataType::Utf8,
DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
// read_csv('path')
TypeSignature::Exact(vec![DataType::Utf8]),
// read_csv('path', ...options)
TypeSignature::Exact(vec![DataType::Utf8, DataType::Struct(opts.clone())]),
// read_csv(['path1', 'path2'])
TypeSignature::Exact(vec![DataType::new_list(DataType::Utf8, false)]),
// read_csv(['path1', 'path2'], options)
TypeSignature::Exact(vec![
DataType::new_list(DataType::Utf8, false),
DataType::Struct(opts),
]),
],
Volatility::Stable,
))
Expand Down
2 changes: 2 additions & 0 deletions testdata/csv/headerless.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
1,hello,world,3.9
2,HELLO,WORLD,4.9
4 changes: 2 additions & 2 deletions testdata/sqllogictests/catalog/functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ select
from glare_catalog.functions
where function_name = 'read_parquet';
----
read_parquet table Utf8/List<Utf8> t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).
read_parquet table Utf8,Utf8, ,List<Utf8>,List<Utf8>, t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).

# Assert an arbitrary glaredb table function exists (using an alias).
query TTTTTT
Expand All @@ -67,7 +67,7 @@ select
from glare_catalog.functions
where function_name = 'parquet_scan';
----
parquet_scan table Utf8/List<Utf8> t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).
parquet_scan table Utf8,Utf8, ,List<Utf8>,List<Utf8>, t SELECT * FROM read_parquet('./my_data.parquet') Returns a table by scanning the given Parquet file(s).

# 'array_to_string' is a tricky one since we're aliasing 'array_to_string' to
# 'pg_catalog.array_to_string'. A more correct implementation would return two
Expand Down
31 changes: 30 additions & 1 deletion testdata/sqllogictests/functions/read_csv.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Tests `read_csv`

# Assert options show up in catalog
query T
select parameters from glare_catalog.functions where function_name = 'read_csv';
----
Utf8,Utf8, delimiter: Utf8, has_header: Boolean,List<Utf8>,List<Utf8>, delimiter: Utf8, has_header: Boolean

# Absolute path
query I
select count(*) from read_csv('file://${PWD}/testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv')
Expand Down Expand Up @@ -65,7 +71,7 @@ select * from read_csv(

# Alternative delimiters

query ITR
query ITR rowsort
select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';');
----
1 hello, world 3.9
Expand All @@ -74,3 +80,26 @@ select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';');
# Invalid delimiter (longer than one byte)
statement error delimiters for CSV must fit in one byte \(e.g. ','\)
select * from read_csv('./testdata/csv/delimiter.csv', delimiter => ';;');

# Headers

query ITTR rowsort
select * from read_csv('./testdata/csv/headerless.csv', has_header => false);
----
1 hello world 3.9
2 HELLO WORLD 4.9

query ITTR rowsort
select * from read_csv('./testdata/csv/headerless.csv', has_header => 'false');
----
1 hello world 3.9
2 HELLO WORLD 4.9

statement error Invalid parameter value hello, expected a boolean
select * from read_csv('./testdata/csv/headerless.csv', has_header => 'hello');

query I
select count(*) from read_csv('./testdata/sqllogictests_datasources_common/data/bikeshare_stations.csv', has_header => true)
----
102

0 comments on commit f9963d1

Please sign in to comment.