-
Notifications
You must be signed in to change notification settings - Fork 1
Merge our internal changes and fix conflicts for DF branch 50 #13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9707a8a
88f58bf
2d5364e
2c35f17
608ee58
3cc3fca
5383d30
13f6aca
d357c7a
cbd3dbc
deecef1
57bf8d6
c431f0f
fa581d0
555ef6b
0e3c9e0
a4153bf
8ae4a95
1ae2702
38f39f5
f7740af
22473d9
f0f6e81
f3e7004
c976a89
ffff7a1
63bad11
e3ea7d1
8f10fdf
755b26a
9d287bd
6146600
26058ac
6e1e0d1
af26638
d290676
d518b51
e5431f1
c103d08
e9fb062
51d0dea
ee7b658
3766da9
2b5cec2
08b3ce0
8b3cd7b
76d833a
b494e97
65c8560
ec4862f
d5ca830
1c92803
112e9eb
0877c99
048a125
68f2903
b8699d9
2e5b5e2
3be582f
a28f2cd
e443304
d0b0211
fe4a4ca
dfb339d
656092e
d2b8c15
2d1062f
738816d
d029200
378ce3b
c76c1f0
b5dfdbe
33a32d4
a13a6fe
88c42dc
e5e5c48
6851d8e
054d193
1ded6ef
273d37a
afb9099
45dd3f9
e4dd102
9cfb9cd
f6ec4c3
c7fbb3f
ee28aa7
52e4ef8
f05b128
d1a6e9a
374fcec
930608a
66ae588
292641c
a6068c2
0d04475
f43df3f
25058de
c46f7a9
deaf2e2
f1b1bd8
7dd5e6e
2eca4c0
8baa05d
9b2fbbb
63c2ebc
ed718c0
0bb16fa
09ff8f7
1545f2d
ca5b0fb
d8c3e03
2099882
ff8418c
2c7836a
9191f39
d358db4
6e71350
c6b8211
5a99099
cefa63a
1f47d46
95aadb9
70a3c94
a93e81e
6a3d4f8
91e2904
b571c3b
1a2f8dc
be0276d
63c54ea
b7f9828
d0b757b
5b9219d
5aa43e5
5918ef8
cae4095
86c8754
253e49c
194d952
ca5d44b
faca92d
a0fc642
ca8cd34
e3c2493
8588da4
238d58b
e16c24f
acd9ddf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -539,7 +539,7 @@ config_namespace! { | |
|
||
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, | ||
/// and `Binary/BinaryLarge` with `BinaryView`. | ||
pub schema_force_view_types: bool, default = true | ||
pub schema_force_view_types: bool, default = false | ||
|
||
/// (reading) If true, parquet reader will read columns of | ||
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. | ||
|
@@ -2521,6 +2521,10 @@ config_namespace! { | |
// The input regex for Nulls when loading CSVs. | ||
pub null_regex: Option<String>, default = None | ||
pub comment: Option<u8>, default = None | ||
// Whether to allow truncated rows when parsing. | ||
// By default this is set to false and will error if the CSV rows have different lengths. | ||
// When set to true then it will allow records with less than the expected number of columns | ||
pub truncated_rows: Option<bool>, default = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our csv truncated_rows support, will be included in DF 51.0.0, but not for DF 50.0.0. |
||
} | ||
} | ||
|
||
|
@@ -2613,6 +2617,15 @@ impl CsvOptions { | |
self | ||
} | ||
|
||
/// Whether to allow truncated rows when parsing. | ||
/// By default this is set to false and will error if the CSV rows have different lengths. | ||
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. | ||
/// If the record’s schema is not nullable, then it will still return an error. | ||
pub fn with_truncated_rows(mut self, allow: bool) -> Self { | ||
self.truncated_rows = Some(allow); | ||
self | ||
} | ||
|
||
/// The delimiter character. | ||
pub fn delimiter(&self) -> u8 { | ||
self.delimiter | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,7 +48,7 @@ mod tests { | |
use datafusion_physical_plan::{collect, ExecutionPlan}; | ||
|
||
use arrow::array::{ | ||
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, | ||
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, | ||
}; | ||
use arrow::compute::concat_batches; | ||
use arrow::csv::ReaderBuilder; | ||
|
@@ -1256,4 +1256,181 @@ mod tests { | |
.build_decoder(); | ||
DecoderDeserializer::new(CsvDecoder::new(decoder)) | ||
} | ||
|
||
fn csv_deserializer_with_truncated( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our csv truncated_rows support, will be included in DF 51.0.0, but not for DF 50.0.0. |
||
batch_size: usize, | ||
schema: &Arc<Schema>, | ||
) -> impl BatchDeserializer<Bytes> { | ||
// using Arrow's ReaderBuilder and enabling truncated_rows | ||
let decoder = ReaderBuilder::new(schema.clone()) | ||
.with_batch_size(batch_size) | ||
.with_truncated_rows(true) // <- enable runtime truncated_rows | ||
.build_decoder(); | ||
DecoderDeserializer::new(CsvDecoder::new(decoder)) | ||
} | ||
|
||
#[tokio::test] | ||
async fn infer_schema_with_truncated_rows_true() -> Result<()> { | ||
let session_ctx = SessionContext::new(); | ||
let state = session_ctx.state(); | ||
|
||
// CSV: header has 3 columns, but first data row has only 2 columns, second row has 3 | ||
let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n"); | ||
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); | ||
let object_meta = ObjectMeta { | ||
location: Path::parse("/")?, | ||
last_modified: DateTime::default(), | ||
size: u64::MAX, | ||
e_tag: None, | ||
version: None, | ||
}; | ||
|
||
// Construct CsvFormat and enable truncated_rows via CsvOptions | ||
let csv_options = CsvOptions::default().with_truncated_rows(true); | ||
let csv_format = CsvFormat::default() | ||
.with_has_header(true) | ||
.with_options(csv_options) | ||
.with_schema_infer_max_rec(10); | ||
|
||
let inferred_schema = csv_format | ||
.infer_schema( | ||
&state, | ||
&(variable_object_store.clone() as Arc<dyn ObjectStore>), | ||
&[object_meta], | ||
) | ||
.await?; | ||
|
||
// header has 3 columns; inferred schema should also have 3 | ||
assert_eq!(inferred_schema.fields().len(), 3); | ||
|
||
// inferred columns should be nullable | ||
for f in inferred_schema.fields() { | ||
assert!(f.is_nullable()); | ||
} | ||
|
||
Ok(()) | ||
} | ||
#[test] | ||
fn test_decoder_truncated_rows_runtime() -> Result<()> { | ||
// Synchronous test: Decoder API used here is synchronous | ||
let schema = csv_schema(); // helper already defined in file | ||
|
||
// Construct a decoder that enables truncated_rows at runtime | ||
let mut deserializer = csv_deserializer_with_truncated(10, &schema); | ||
|
||
// Provide two rows: first row complete, second row missing last column | ||
let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n"); | ||
deserializer.digest(input); | ||
|
||
// Finish and collect output | ||
deserializer.finish(); | ||
|
||
let output = deserializer.next()?; | ||
match output { | ||
DeserializerOutput::RecordBatch(batch) => { | ||
// ensure at least two rows present | ||
assert!(batch.num_rows() >= 2); | ||
// column 4 (index 3) should be a StringArray where second row is NULL | ||
let col4 = batch | ||
.column(3) | ||
.as_any() | ||
.downcast_ref::<StringArray>() | ||
.expect("column 4 should be StringArray"); | ||
|
||
// first row present, second row should be null | ||
assert!(!col4.is_null(0)); | ||
assert!(col4.is_null(1)); | ||
} | ||
other => panic!("expected RecordBatch but got {other:?}"), | ||
} | ||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn infer_schema_truncated_rows_false_error() -> Result<()> { | ||
let session_ctx = SessionContext::new(); | ||
let state = session_ctx.state(); | ||
|
||
// CSV: header has 4 cols, first data row has 3 cols -> truncated at end | ||
let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n"); | ||
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); | ||
let object_meta = ObjectMeta { | ||
location: Path::parse("/")?, | ||
last_modified: DateTime::default(), | ||
size: u64::MAX, | ||
e_tag: None, | ||
version: None, | ||
}; | ||
|
||
// CsvFormat without enabling truncated_rows (default behavior = false) | ||
let csv_format = CsvFormat::default() | ||
.with_has_header(true) | ||
.with_schema_infer_max_rec(10); | ||
|
||
let res = csv_format | ||
.infer_schema( | ||
&state, | ||
&(variable_object_store.clone() as Arc<dyn ObjectStore>), | ||
&[object_meta], | ||
) | ||
.await; | ||
|
||
// Expect an error due to unequal lengths / incorrect number of fields | ||
assert!( | ||
res.is_err(), | ||
"expected infer_schema to error on truncated rows when disabled" | ||
); | ||
|
||
// Optional: check message contains indicative text (two known possibilities) | ||
if let Err(err) = res { | ||
let msg = format!("{err}"); | ||
assert!( | ||
msg.contains("Encountered unequal lengths") | ||
|| msg.contains("incorrect number of fields"), | ||
"unexpected error message: {msg}", | ||
); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> { | ||
use std::io::Write; | ||
|
||
// create a SessionContext | ||
let ctx = SessionContext::new(); | ||
|
||
// Create a temp file with a .csv suffix so the reader accepts it | ||
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv | ||
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete. | ||
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?; | ||
let path = tmp.path().to_str().unwrap().to_string(); | ||
|
||
// Build CsvReadOptions: header present, enable truncated_rows. | ||
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here, | ||
// if the method name differs in your codebase use the appropriate one.) | ||
let options = CsvReadOptions::default().truncated_rows(true); | ||
|
||
println!("options: {}, path: {path}", options.truncated_rows); | ||
|
||
// Call the API under test | ||
let df = ctx.read_csv(&path, options).await?; | ||
|
||
// Collect the results and combine batches so we can inspect columns | ||
let batches = df.collect().await?; | ||
let combined = concat_batches(&batches[0].schema(), &batches)?; | ||
|
||
// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL. | ||
let col_c = combined.column(2); | ||
assert!( | ||
col_c.is_null(0), | ||
"expected first row column 'c' to be NULL due to truncated row" | ||
); | ||
|
||
// Also ensure we read at least one row | ||
assert!(combined.num_rows() >= 2); | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> { | |
pub file_sort_order: Vec<Vec<SortExpr>>, | ||
/// Optional regex to match null values | ||
pub null_regex: Option<String>, | ||
/// Whether to allow truncated rows when parsing. | ||
/// By default this is set to false and will error if the CSV rows have different lengths. | ||
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. | ||
/// If the record’s schema is not nullable, then it will still return an error. | ||
pub truncated_rows: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||
} | ||
|
||
impl Default for CsvReadOptions<'_> { | ||
|
@@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> { | |
file_sort_order: vec![], | ||
comment: None, | ||
null_regex: None, | ||
truncated_rows: false, | ||
} | ||
} | ||
|
||
|
@@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> { | |
self.null_regex = null_regex; | ||
self | ||
} | ||
|
||
/// Configure whether to allow truncated rows when parsing. | ||
/// By default this is set to false and will error if the CSV rows have different lengths | ||
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. | ||
/// If the record’s schema is not nullable, then it will still return an error. | ||
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self { | ||
self.truncated_rows = truncated_rows; | ||
self | ||
} | ||
} | ||
|
||
/// Options that control the reading of Parquet files. | ||
|
@@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { | |
.with_newlines_in_values(self.newlines_in_values) | ||
.with_schema_infer_max_rec(self.schema_infer_max_records) | ||
.with_file_compression_type(self.file_compression_type.to_owned()) | ||
.with_null_regex(self.null_regex.clone()); | ||
.with_null_regex(self.null_regex.clone()) | ||
.with_truncated_rows(self.truncated_rows); | ||
|
||
ListingOptions::new(Arc::new(file_format)) | ||
.with_file_extension(self.file_extension) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -581,11 +581,11 @@ mod tests { | |
assert_eq!(string_truncation_stats.null_count, Precision::Exact(2)); | ||
assert_eq!( | ||
string_truncation_stats.max_value, | ||
Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c"))) | ||
Precision::Inexact(Utf8(Some("b".repeat(63) + "c"))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We default to utf8. |
||
); | ||
assert_eq!( | ||
string_truncation_stats.min_value, | ||
Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64)))) | ||
Precision::Inexact(Utf8(Some("a".repeat(64)))) | ||
); | ||
|
||
Ok(()) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,9 @@ use datafusion_physical_expr::create_physical_expr; | |
use datafusion_physical_expr_common::physical_expr::PhysicalExpr; | ||
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; | ||
use datafusion_physical_optimizer::PhysicalOptimizerRule; | ||
use datafusion_physical_plan::node_id::{ | ||
annotate_node_id_for_execution_plan, NodeIdAnnotator, | ||
}; | ||
use datafusion_physical_plan::ExecutionPlan; | ||
use datafusion_session::Session; | ||
use datafusion_sql::parser::{DFParserBuilder, Statement}; | ||
|
@@ -647,9 +650,12 @@ impl SessionState { | |
logical_plan: &LogicalPlan, | ||
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> { | ||
let logical_plan = self.optimize(logical_plan)?; | ||
self.query_planner | ||
let physical_plan = self | ||
.query_planner | ||
.create_physical_plan(&logical_plan, self) | ||
.await | ||
.await?; | ||
let mut id_annotator = NodeIdAnnotator::new(); | ||
annotate_node_id_for_execution_plan(&physical_plan, &mut id_annotator) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our internal node_id support. |
||
} | ||
|
||
/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -165,7 +165,9 @@ async fn page_index_filter_one_col() { | |
|
||
// 5.create filter date_string_col == "01/01/09"`; | ||
// Note this test doesn't apply type coercion so the literal must match the actual view type | ||
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09"))); | ||
// xudong: use new_utf8, because schema_force_view_types was changed to false now. | ||
// qi: when schema_force_view_types setting to true, we should change back to utf8view | ||
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8("01/01/09"))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default to utf8. |
||
let batches = get_filter_results(&state, filter.clone(), false).await; | ||
assert_eq!(batches[0].num_rows(), 14); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default to utf8.