From 776002006aab7bc9f3e213e3dd6acb5f67b5e3e6 Mon Sep 17 00:00:00 2001 From: Cole MacKenzie Date: Fri, 8 Sep 2023 12:10:09 -0700 Subject: [PATCH] fix: change map nullable value to false This value was true but where arrow defines it as always false https://github.com/apache/arrow-rs/blob/master/arrow-schema/src/field.rs#L230. This is also described in apache/arrow-rs#1697. This also replaces `key_value` as the struct name with `entries` to remain consistent with https://github.com/apache/arrow-rs/blob/878217b9e330b4f1ed13e798a214ea11fbeb2bbb/arrow-schema/src/datatype.rs#L247-L250 --- rust/src/delta_arrow.rs | 82 +++++++++++++++++++++------- rust/tests/integration_datafusion.rs | 51 +++++++++++++++++ 2 files changed, 114 insertions(+), 19 deletions(-) diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 7f4fa7a154..aaecc0a0a5 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -63,20 +63,17 @@ impl TryFrom<&schema::SchemaTypeMap> for ArrowField { type Error = ArrowError; fn try_from(a: &schema::SchemaTypeMap) -> Result { - Ok(ArrowField::new( + Ok(ArrowField::new_map( + "entires", "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::try_from(a.get_key_type())?, false), - ArrowField::new( - "value", - ArrowDataType::try_from(a.get_value_type())?, - a.get_value_contains_null(), - ), - ] - .into(), + ArrowField::new("key", ArrowDataType::try_from(a.get_key_type())?, false), + ArrowField::new( + "value", + ArrowDataType::try_from(a.get_value_type())?, + a.get_value_contains_null(), ), - false, // always non-null + false, + false, )) } } @@ -167,7 +164,7 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { ] .into(), ), - true, + false, )), false, )), @@ -305,7 +302,7 @@ macro_rules! arrow_map { stringify!($fieldname), ArrowDataType::Map( Arc::new(ArrowField::new( - "key_value", + "entries", ArrowDataType::Struct( vec![ ArrowField::new("key", ArrowDataType::Utf8, false), @@ -325,11 +322,11 @@ macro_rules! arrow_map { stringify!($fieldname), ArrowDataType::Map( Arc::new(ArrowField::new( - "key_value", + "entries", ArrowDataType::Struct( vec![ ArrowField::new("key", ArrowDataType::Utf8, false), - ArrowField::new("value", ArrowDataType::Utf8, true), + ArrowField::new("value", ArrowDataType::Utf8, false), ] .into(), ), @@ -637,6 +634,13 @@ fn null_count_schema_for_fields(dest: &mut Vec, f: &ArrowField) { #[cfg(test)] mod tests { + use arrow::array::ArrayData; + use arrow::datatypes::DataType; + use arrow_array::Array; + use arrow_array::{make_array, ArrayRef, MapArray, StringArray, StructArray}; + use arrow_buffer::{Buffer, ToByteSlice}; + use arrow_schema::Field; + use super::*; use std::collections::HashMap; use std::sync::Arc; @@ -840,7 +844,7 @@ mod tests { fn test_delta_from_arrow_map_type() { let arrow_map = ArrowDataType::Map( Arc::new(ArrowField::new( - "key_value", + "entries", ArrowDataType::Struct( vec![ ArrowField::new("key", ArrowDataType::Int8, false), @@ -848,7 +852,7 @@ mod tests { ] .into(), ), - true, + false, )), false, ); @@ -877,7 +881,47 @@ mod tests { let entry_offsets = vec![0u32, 1, 1, 4, 5, 5]; let num_rows = keys.len(); - let map_array = arrow::array::MapArray::new_from_strings( + // Copied the function `new_from_string` with the patched code from https://github.com/apache/arrow-rs/pull/4808 + // This should be reverted back [`MapArray::new_from_strings`] once arrow is upgraded in this project. + fn new_from_strings<'a>( + keys: impl Iterator, + values: &dyn Array, + entry_offsets: &[u32], + ) -> Result { + let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice()); + let keys_data = StringArray::from_iter_values(keys); + + let keys_field = Arc::new(Field::new("keys", DataType::Utf8, false)); + let values_field = Arc::new(Field::new( + "values", + values.data_type().clone(), + values.null_count() > 0, + )); + + let entry_struct = StructArray::from(vec![ + (keys_field, Arc::new(keys_data) as ArrayRef), + (values_field, make_array(values.to_data())), + ]); + + let map_data_type = DataType::Map( + Arc::new(Field::new( + "entries", + entry_struct.data_type().clone(), + false, + )), + false, + ); + + let map_data = ArrayData::builder(map_data_type) + .len(entry_offsets.len() - 1) + .add_buffer(entry_offsets_buffer) + .add_child_data(entry_struct.into_data()) + .build()?; + + Ok(MapArray::from(map_data)) + } + + let map_array = new_from_strings( keys.into_iter(), &arrow::array::BinaryArray::from(values), entry_offsets.as_slice(), diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index beab2561f4..2489ab9371 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -45,6 +45,8 @@ use std::error::Error; mod common; mod local { + use deltalake::{writer::JsonWriter, SchemaTypeMap}; + use super::*; #[tokio::test] #[serial] @@ -933,6 +935,55 @@ mod local { Ok(()) } + + #[tokio::test] + async fn test_issue_1619_parquet_panic_using_map_type() -> Result<()> { + let _ = tokio::fs::remove_dir_all("./tests/data/issue-1619").await; + let fields: Vec = vec![SchemaField::new( + "metadata".to_string(), + SchemaDataType::map(SchemaTypeMap::new( + Box::new(SchemaDataType::primitive("string".to_string())), + Box::new(SchemaDataType::primitive("string".to_string())), + true, + )), + true, + HashMap::new(), + )]; + let schema = deltalake::Schema::new(fields); + let table = deltalake::DeltaTableBuilder::from_uri("./tests/data/issue-1619").build()?; + let _ = DeltaOps::from(table) + .create() + .with_columns(schema.get_fields().to_owned()) + .await?; + + let mut table = deltalake::open_table("./tests/data/issue-1619").await?; + + let mut writer = JsonWriter::for_table(&table).unwrap(); + let _ = writer + .write(vec![ + serde_json::json!({"metadata": {"hello": "world", "something": null}}), + ]) + .await + .unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(table))?; + + let batches = ctx.sql(r#"SELECT * FROM t"#).await?.collect().await?; + + let expected = vec![ + "+-----------------------------+", + "| metadata |", + "+-----------------------------+", + "| {hello: world, something: } |", // unclear why it doesn't say `null` for something... + "+-----------------------------+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); + + Ok(()) + } } #[cfg(any(feature = "s3", feature = "s3-native-tls"))]