Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support null values in Avro string columns #6307

Merged
merged 6 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
} else if let Value::Array(n) = value {
n.iter()
.map(resolve_string)
.collect::<ArrowResult<Vec<String>>>()?
.collect::<ArrowResult<Vec<Option<String>>>>()?
.into_iter()
.map(Some)
.collect::<Vec<Option<String>>>()
} else if let Value::Null = value {
vec![None]
} else if !matches!(value, Value::Record(_)) {
vec![Some(resolve_string(value)?)]
vec![resolve_string(value)?]
} else {
return Err(SchemaError(
"Only scalars are currently supported in Avro arrays".to_string(),
Expand Down Expand Up @@ -351,7 +350,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
self.build_string_dictionary_builder(rows.len());
for row in rows {
if let Some(value) = self.field_lookup(col_name, row) {
if let Ok(str_v) = resolve_string(value) {
if let Ok(Some(str_v)) = resolve_string(value) {
builder.append(str_v).map(drop)?
} else {
builder.append_null()
Expand Down Expand Up @@ -692,7 +691,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
rows.iter()
.map(|row| {
let maybe_value = self.field_lookup(field.name(), row);
maybe_value.map(resolve_string).transpose()
match maybe_value {
None => Ok(None),
Some(v) => resolve_string(v),
}
})
.collect::<ArrowResult<StringArray>>()?,
)
Expand Down Expand Up @@ -846,12 +848,12 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
if let Value::Array(values) = row {
values
.iter()
.map(|s| resolve_string(s).ok())
.map(|s| resolve_string(s).ok().flatten())
.collect::<Vec<Option<_>>>()
} else if let Value::Null = row {
vec![]
} else {
vec![resolve_string(row).ok()]
vec![resolve_string(row).ok().flatten()]
}
})
.collect::<Vec<Option<_>>>()
Expand All @@ -860,13 +862,14 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
/// Reads an Avro value as a string, regardless of its type.
/// This is useful if the expected datatype is a string, in which case we preserve
/// all the values regardless of they type.
fn resolve_string(v: &Value) -> ArrowResult<String> {
fn resolve_string(v: &Value) -> ArrowResult<Option<String>> {
let v = if let Value::Union(_, b) = v { b } else { v };
match v {
Value::String(s) => Ok(s.clone()),
Value::Bytes(bytes) => {
String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8)
}
Value::String(s) => Ok(Some(s.clone())),
Value::Bytes(bytes) => String::from_utf8(bytes.to_vec())
.map_err(AvroError::ConvertToUtf8)
.map(Some),
Value::Null => Ok(None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me. 👍

other => Err(AvroError::GetString(other.into())),
}
.map_err(|e| SchemaError(format!("expected resolvable string : {e:?}")))
Expand Down
85 changes: 85 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod tests {
use crate::datasource::file_format::test_util::scan_format;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{as_string_array, Array};
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_microsecond_array,
Expand Down Expand Up @@ -221,6 +222,27 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_null_bool_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![2]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_boolean_array(batches[0].column(0))?;

assert!(array.is_null(0));

Ok(())
}

#[tokio::test]
async fn read_i32_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
Expand All @@ -245,6 +267,27 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_null_i32_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![1]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_int32_array(batches[0].column(0))?;

assert!(array.is_null(0));

Ok(())
}

#[tokio::test]
async fn read_i96_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -350,6 +393,48 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_null_binary_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![6]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be worth checking out the https://docs.rs/datafusion/latest/datafusion/macro.assert_batches_eq.html macro to verify the rows / columns in a more easy to maintain wai

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that as well, however since we're explicitly checking for null values, the expected value would be something like

        let expected = vec![
            "+------------+",
            "| string_col |",
            "+------------+",
            "|            |",
            "+------------+",
        ];

... making it hard to differentiate between an empty string and null, so I opted to explicitly test via Array#is_null

assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_binary_array(batches[0].column(0))?;

assert!(array.is_null(0));

Ok(())
}

#[tokio::test]
async fn read_null_string_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![0]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_string_array(batches[0].column(0));

assert!(array.is_null(0));

Ok(())
}

async fn get_exec(
state: &SessionState,
file_name: &str,
Expand Down