Skip to content

Commit

Permalink
Support null values in Avro string columns (#6307)
Browse files Browse the repository at this point in the history
* Support null values in Avro string columns

* Temporarily change arrow-testing to fork

* Update testing submodule

* Switch back to apache/arrow-testing

* Fix arrow-testing URL
  • Loading branch information
nenorbot authored May 12, 2023
1 parent 7696aa5 commit b9e5c07
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 13 deletions.
27 changes: 15 additions & 12 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
} else if let Value::Array(n) = value {
n.iter()
.map(resolve_string)
.collect::<ArrowResult<Vec<String>>>()?
.collect::<ArrowResult<Vec<Option<String>>>>()?
.into_iter()
.map(Some)
.collect::<Vec<Option<String>>>()
} else if let Value::Null = value {
vec![None]
} else if !matches!(value, Value::Record(_)) {
vec![Some(resolve_string(value)?)]
vec![resolve_string(value)?]
} else {
return Err(SchemaError(
"Only scalars are currently supported in Avro arrays".to_string(),
Expand Down Expand Up @@ -351,7 +350,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
self.build_string_dictionary_builder(rows.len());
for row in rows {
if let Some(value) = self.field_lookup(col_name, row) {
if let Ok(str_v) = resolve_string(value) {
if let Ok(Some(str_v)) = resolve_string(value) {
builder.append(str_v).map(drop)?
} else {
builder.append_null()
Expand Down Expand Up @@ -689,7 +688,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
rows.iter()
.map(|row| {
let maybe_value = self.field_lookup(field.name(), row);
maybe_value.map(resolve_string).transpose()
match maybe_value {
None => Ok(None),
Some(v) => resolve_string(v),
}
})
.collect::<ArrowResult<StringArray>>()?,
)
Expand Down Expand Up @@ -841,12 +843,12 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
if let Value::Array(values) = row {
values
.iter()
.map(|s| resolve_string(s).ok())
.map(|s| resolve_string(s).ok().flatten())
.collect::<Vec<Option<_>>>()
} else if let Value::Null = row {
vec![]
} else {
vec![resolve_string(row).ok()]
vec![resolve_string(row).ok().flatten()]
}
})
.collect::<Vec<Option<_>>>()
Expand All @@ -855,13 +857,14 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
/// Reads an Avro value as a string, regardless of its type.
/// This is useful if the expected datatype is a string, in which case we preserve
/// all the values regardless of they type.
fn resolve_string(v: &Value) -> ArrowResult<String> {
fn resolve_string(v: &Value) -> ArrowResult<Option<String>> {
let v = if let Value::Union(_, b) = v { b } else { v };
match v {
Value::String(s) => Ok(s.clone()),
Value::Bytes(bytes) => {
String::from_utf8(bytes.to_vec()).map_err(AvroError::ConvertToUtf8)
}
Value::String(s) => Ok(Some(s.clone())),
Value::Bytes(bytes) => String::from_utf8(bytes.to_vec())
.map_err(AvroError::ConvertToUtf8)
.map(Some),
Value::Null => Ok(None),
other => Err(AvroError::GetString(other.into())),
}
.map_err(|e| SchemaError(format!("expected resolvable string : {e:?}")))
Expand Down
85 changes: 85 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod tests {
use crate::datasource::file_format::test_util::scan_format;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{as_string_array, Array};
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
as_int32_array, as_timestamp_microsecond_array,
Expand Down Expand Up @@ -221,6 +222,27 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_null_bool_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![2]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_boolean_array(batches[0].column(0))?;

assert!(array.is_null(0));

Ok(())
}

#[tokio::test]
async fn read_i32_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
Expand All @@ -245,6 +267,27 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_null_i32_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![1]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_int32_array(batches[0].column(0))?;

assert!(array.is_null(0));

Ok(())
}

#[tokio::test]
async fn read_i96_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -350,6 +393,48 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_null_binary_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![6]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_binary_array(batches[0].column(0))?;

assert!(array.is_null(0));

Ok(())
}

#[tokio::test]
async fn read_null_string_alltypes_plain_avro() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![0]);
let exec =
get_exec(&state, "alltypes_nulls_plain.avro", projection, None).await?;

let batches = collect(exec, task_ctx).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

let array = as_string_array(batches[0].column(0));

assert!(array.is_null(0));

Ok(())
}

async fn get_exec(
state: &SessionState,
file_name: &str,
Expand Down
2 changes: 1 addition & 1 deletion testing

0 comments on commit b9e5c07

Please sign in to comment.