From dcd6c64bf8b6e72677c0e13388758e5da1dc960b Mon Sep 17 00:00:00 2001 From: Ronen Cohen Date: Sun, 9 Apr 2023 19:58:29 +0300 Subject: [PATCH 1/5] Support null values in Avro string columns --- .../src/avro_to_arrow/arrow_array_reader.rs | 27 +++--- .../core/src/datasource/file_format/avro.rs | 85 +++++++++++++++++++ 2 files changed, 100 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index 1f06078e4627..a5c85d52b9e0 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -275,14 +275,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { } else if let Value::Array(n) = value { n.iter() .map(resolve_string) - .collect::>>()? + .collect::>>>()? .into_iter() - .map(Some) .collect::>>() } else if let Value::Null = value { vec![None] } else if !matches!(value, Value::Record(_)) { - vec![Some(resolve_string(value)?)] + vec![resolve_string(value)?] } else { return Err(SchemaError( "Only scalars are currently supported in Avro arrays".to_string(), @@ -351,7 +350,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { self.build_string_dictionary_builder(rows.len()); for row in rows { if let Some(value) = self.field_lookup(col_name, row) { - if let Ok(str_v) = resolve_string(value) { + if let Ok(Some(str_v)) = resolve_string(value) { builder.append(str_v).map(drop)? } else { builder.append_null() @@ -692,7 +691,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { rows.iter() .map(|row| { let maybe_value = self.field_lookup(field.name(), row); - maybe_value.map(resolve_string).transpose() + match maybe_value { + None => Ok(None), + Some(v) => resolve_string(v), + } }) .collect::>()?, ) @@ -846,12 +848,12 @@ fn flatten_string_values(values: &[&Value]) -> Vec> { if let Value::Array(values) = row { values .iter() - .map(|s| resolve_string(s).ok()) + .map(|s| resolve_string(s).ok().flatten()) .collect::>>() } else if let Value::Null = row { vec![] } else { - vec![resolve_string(row).ok()] + vec![resolve_string(row).ok().flatten()] } }) .collect::>>() @@ -860,13 +862,14 @@ fn flatten_string_values(values: &[&Value]) -> Vec> { /// Reads an Avro value as a string, regardless of its type. /// This is useful if the expected datatype is a string, in which case we preserve /// all the values regardless of they type. -fn resolve_string(v: &Value) -> ArrowResult { +fn resolve_string(v: &Value) -> ArrowResult> { let v = if let Value::Union(_, b) = v { b } else { v }; match v { - Value::String(s) => Ok(s.clone()), - Value::Bytes(bytes) => { - String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8) - } + Value::String(s) => Ok(Some(s.clone())), + Value::Bytes(bytes) => String::from_utf8(bytes.to_vec()) + .map_err(AvroError::ConvertToUtf8) + .map(Some), + Value::Null => Ok(None), other => Err(AvroError::GetString(other.into())), } .map_err(|e| SchemaError(format!("expected resolvable string : {e:?}"))) diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 422733308996..374ef18970f0 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -96,6 +96,7 @@ mod tests { use crate::datasource::file_format::test_util::scan_format; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; + use arrow::array::{as_string_array, Array}; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, as_int32_array, as_timestamp_microsecond_array, @@ -221,6 +222,27 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_null_bool_alltypes_plain_avro() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let projection = Some(vec![2]); + let exec = + get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?; + + let batches = collect(exec, task_ctx).await?; + assert_eq!(batches.len(), 1); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + + let array = as_boolean_array(batches[0].column(0))?; + + assert!(array.is_null(0)); + + Ok(()) + } + #[tokio::test] async fn read_i32_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); @@ -245,6 +267,27 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_null_i32_alltypes_plain_avro() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let projection = Some(vec![1]); + let exec = + get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?; + + let batches = collect(exec, task_ctx).await?; + assert_eq!(batches.len(), 1); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + + let array = as_int32_array(batches[0].column(0))?; + + assert!(array.is_null(0)); + + Ok(()) + } + #[tokio::test] async fn read_i96_alltypes_plain_avro() -> Result<()> { let session_ctx = SessionContext::new(); @@ -350,6 +393,48 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_null_binary_alltypes_plain_avro() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let projection = Some(vec![6]); + let exec = + get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?; + + let batches = collect(exec, task_ctx).await?; + assert_eq!(batches.len(), 1); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + + let array = as_binary_array(batches[0].column(0))?; + + assert!(array.is_null(0)); + + Ok(()) + } + + #[tokio::test] + async fn read_null_string_alltypes_plain_avro() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let projection = Some(vec![0]); + let exec = + get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?; + + let batches = collect(exec, task_ctx).await?; + assert_eq!(batches.len(), 1); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + + let array = as_string_array(batches[0].column(0)); + + assert!(array.is_null(0)); + + Ok(()) + } + async fn get_exec( state: &SessionState, file_name: &str, From c7e1cee345cc632b7a1338d0e5b5b117015252f5 Mon Sep 17 00:00:00 2001 From: Ronen Cohen Date: Wed, 10 May 2023 16:13:36 +0300 Subject: [PATCH 2/5] Temporarily change arrow-testing to fork --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index ec5d6208b8dd..ca1fa0c1c99d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,4 @@ url = https://github.com/apache/parquet-testing.git [submodule "testing"] path = testing - url = https://github.com/apache/arrow-testing + url = https://github.com/nenorbot/arrow-testing From 3488e2200e92adb52db10b10332f0a2d044d630c Mon Sep 17 00:00:00 2001 From: Ronen Cohen Date: Wed, 10 May 2023 18:18:35 +0300 Subject: [PATCH 3/5] Update testing submodule --- testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing b/testing index 5bab2f264a23..70ba4d8610cf 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88 +Subproject commit 70ba4d8610cf64ac2179ec6751677795eaba77f4 From bbf8d4bb76f2ae2f587502a306c54b746620946d Mon Sep 17 00:00:00 2001 From: Ronen Cohen Date: Wed, 10 May 2023 19:51:24 +0300 Subject: [PATCH 4/5] Switch back to apache/arrow-testing --- .gitmodules | 2 +- testing | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitmodules b/.gitmodules index ca1fa0c1c99d..38da00520f03 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,4 @@ url = https://github.com/apache/parquet-testing.git [submodule "testing"] path = testing - url = https://github.com/nenorbot/arrow-testing + url = https://github.com/apache/arrow-testing.git diff --git a/testing b/testing index 70ba4d8610cf..e81d0c6de359 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 70ba4d8610cf64ac2179ec6751677795eaba77f4 +Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e From 33d80989d573f784082a5c3a5e0b20f471e5149f Mon Sep 17 00:00:00 2001 From: Ronen Cohen Date: Wed, 10 May 2023 20:03:19 +0300 Subject: [PATCH 5/5] Fix arrow-testing URL --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index 38da00520f03..ec5d6208b8dd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,4 @@ url = https://github.com/apache/parquet-testing.git [submodule "testing"] path = testing - url = https://github.com/apache/arrow-testing.git + url = https://github.com/apache/arrow-testing