Skip to content

Commit 6a61304

Browse files
dqkqdalamb
andauthored
fix: ignore DataType::Null in possible types during csv type inference (#17796)
* fix: ignore `DataType::Null` in possible types during csv type inference * refactor: apply suggession, simplify inferring csv data types * docs: add comment to testcase * chore: update test data type for empty table * test: folder contains empty csv files (with header) and normal files * Update datafusion/datasource-csv/src/file_format.rs * fmt --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent e0222f0 commit 6a61304

File tree

6 files changed

+124
-14
lines changed

6 files changed

+124
-14
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ mod tests {
6060
use futures::stream::BoxStream;
6161
use futures::StreamExt;
6262
use insta::assert_snapshot;
63+
use object_store::chunked::ChunkedStore;
6364
use object_store::local::LocalFileSystem;
6465
use object_store::path::Path;
6566
use object_store::{
@@ -104,6 +105,14 @@ mod tests {
104105
}
105106

106107
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
108+
self.get_opts(location, GetOptions::default()).await
109+
}
110+
111+
async fn get_opts(
112+
&self,
113+
location: &Path,
114+
_opts: GetOptions,
115+
) -> object_store::Result<GetResult> {
107116
let bytes = self.bytes_to_repeat.clone();
108117
let len = bytes.len() as u64;
109118
let range = 0..len * self.max_iterations;
@@ -130,14 +139,6 @@ mod tests {
130139
})
131140
}
132141

133-
async fn get_opts(
134-
&self,
135-
_location: &Path,
136-
_opts: GetOptions,
137-
) -> object_store::Result<GetResult> {
138-
unimplemented!()
139-
}
140-
141142
async fn get_ranges(
142143
&self,
143144
_location: &Path,
@@ -470,6 +471,59 @@ mod tests {
470471
Ok(())
471472
}
472473

474+
#[tokio::test]
475+
async fn test_infer_schema_stream_null_chunks() -> Result<()> {
476+
let session_ctx = SessionContext::new();
477+
let state = session_ctx.state();
478+
479+
// a stream where each line is read as a separate chunk,
480+
// data type for each chunk is inferred separately.
481+
// +----+-----+----+
482+
// | c1 | c2 | c3 |
483+
// +----+-----+----+
484+
// | 1 | 1.0 | | type: Int64, Float64, Null
485+
// | | | | type: Null, Null, Null
486+
// +----+-----+----+
487+
let chunked_object_store = Arc::new(ChunkedStore::new(
488+
Arc::new(VariableStream::new(
489+
Bytes::from(
490+
r#"c1,c2,c3
491+
1,1.0,
492+
,,
493+
"#,
494+
),
495+
1,
496+
)),
497+
1,
498+
));
499+
let object_meta = ObjectMeta {
500+
location: Path::parse("/")?,
501+
last_modified: DateTime::default(),
502+
size: u64::MAX,
503+
e_tag: None,
504+
version: None,
505+
};
506+
507+
let csv_format = CsvFormat::default().with_has_header(true);
508+
let inferred_schema = csv_format
509+
.infer_schema(
510+
&state,
511+
&(chunked_object_store as Arc<dyn ObjectStore>),
512+
&[object_meta],
513+
)
514+
.await?;
515+
516+
let actual_fields: Vec<_> = inferred_schema
517+
.fields()
518+
.iter()
519+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
520+
.collect();
521+
522+
// ensure null chunks don't skew type inference
523+
assert_eq!(vec!["c1: Int64", "c2: Float64", "c3: Null"], actual_fields);
524+
Ok(())
525+
}
526+
473527
#[rstest(
474528
file_compression_type,
475529
case(FileCompressionType::UNCOMPRESSED),
@@ -822,6 +876,48 @@ mod tests {
822876
Ok(())
823877
}
824878

879+
/// Read multiple csv files (some are empty) with header
880+
///
881+
/// some_empty_with_header
882+
/// ├── a_empty.csv
883+
/// ├── b.csv
884+
/// └── c_nulls_column.csv
885+
///
886+
/// a_empty.csv:
887+
/// c1,c2,c3
888+
///
889+
/// b.csv:
890+
/// c1,c2,c3
891+
/// 1,1,1
892+
/// 2,2,2
893+
///
894+
/// c_nulls_column.csv:
895+
/// c1,c2,c3
896+
/// 3,3,
897+
#[tokio::test]
898+
async fn test_csv_some_empty_with_header() -> Result<()> {
899+
let ctx = SessionContext::new();
900+
ctx.register_csv(
901+
"some_empty_with_header",
902+
"tests/data/empty_files/some_empty_with_header",
903+
CsvReadOptions::new().has_header(true),
904+
)
905+
.await?;
906+
907+
let query = "select sum(c3) from some_empty_with_header;";
908+
let query_result = ctx.sql(query).await?.collect().await?;
909+
910+
assert_snapshot!(batches_to_string(&query_result),@r"
911+
+--------------------------------+
912+
| sum(some_empty_with_header.c3) |
913+
+--------------------------------+
914+
| 3 |
915+
+--------------------------------+
916+
");
917+
918+
Ok(())
919+
}
920+
825921
#[tokio::test]
826922
async fn test_csv_extension_compressed() -> Result<()> {
827923
// Write compressed CSV files
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
c1,c2,c3
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
c1,c2,c3
2+
1,1,1
3+
2,2,2
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
c1,c2,c3
2+
3,3,

datafusion/datasource-csv/src/file_format.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -582,20 +582,28 @@ impl CsvFormat {
582582
}
583583
}
584584

585-
let schema = build_schema_helper(column_names, &column_type_possibilities);
585+
let schema = build_schema_helper(column_names, column_type_possibilities);
586586
Ok((schema, total_records_read))
587587
}
588588
}
589589

590-
fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schema {
590+
fn build_schema_helper(names: Vec<String>, types: Vec<HashSet<DataType>>) -> Schema {
591591
let fields = names
592592
.into_iter()
593593
.zip(types)
594-
.map(|(field_name, data_type_possibilities)| {
594+
.map(|(field_name, mut data_type_possibilities)| {
595595
// ripped from arrow::csv::reader::infer_reader_schema_with_csv_options
596596
// determine data type based on possible types
597597
// if there are incompatible types, use DataType::Utf8
598+
599+
// ignore nulls, to avoid conflicting datatypes (e.g. [nulls, int]) being inferred as Utf8.
600+
data_type_possibilities.remove(&DataType::Null);
601+
598602
match data_type_possibilities.len() {
603+
// Return Null for columns with only nulls / empty files
604+
// This allows schema merging to work when reading folders
605+
// such files along with normal files.
606+
0 => Field::new(field_name, DataType::Null, true),
599607
1 => Field::new(
600608
field_name,
601609
data_type_possibilities.iter().next().unwrap().clone(),

datafusion/sqllogictest/test_files/ddl.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -707,9 +707,9 @@ CREATE EXTERNAL TABLE empty STORED AS CSV LOCATION '../core/tests/data/empty.csv
707707
query TTI
708708
select column_name, data_type, ordinal_position from information_schema.columns where table_name='empty';;
709709
----
710-
c1 Utf8 0
711-
c2 Utf8 1
712-
c3 Utf8 2
710+
c1 Null 0
711+
c2 Null 1
712+
c3 Null 2
713713

714714

715715
## should allow any type of exprs as values

0 commit comments

Comments
 (0)