From a8da45de1cc4d9c0a7d3bc68122692e3ca24f3a5 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Wed, 20 Mar 2024 07:21:01 +0100 Subject: [PATCH 1/6] WIP on schema_merge --- crates/core/src/kernel/arrow/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/src/kernel/arrow/mod.rs b/crates/core/src/kernel/arrow/mod.rs index d27bc6463b..de24068825 100644 --- a/crates/core/src/kernel/arrow/mod.rs +++ b/crates/core/src/kernel/arrow/mod.rs @@ -260,7 +260,8 @@ impl TryFrom<&ArrowDataType> for DataType { } else { panic!("DataType::Map should contain a struct field child"); } - } + }, + ArrowDataType::Dictionary(_, value_type) => Ok(value_type.as_ref().try_into()?), s => Err(ArrowError::SchemaError(format!( "Invalid data type for Delta Lake: {s}" ))), From d4811c32337624bb9d5f518a1836a73e2e4d3ed9 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Wed, 20 Mar 2024 08:22:06 +0100 Subject: [PATCH 2/6] Fixes #2298 --- crates/core/src/operations/cast.rs | 44 ++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/crates/core/src/operations/cast.rs b/crates/core/src/operations/cast.rs index 33155dedd8..d02544f70d 100644 --- a/crates/core/src/operations/cast.rs +++ b/crates/core/src/operations/cast.rs @@ -1,5 +1,6 @@ //! Provide common cast functionality for callers //! +use crate::kernel::DataType as DeltaDataType; use arrow::datatypes::DataType::Dictionary; use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray}; use arrow_cast::{cast_with_options, CastOptions}; @@ -12,18 +13,25 @@ use std::sync::Arc; use crate::DeltaResult; pub(crate) fn merge_field(left: &ArrowField, right: &ArrowField) -> Result { - if let Dictionary(_, value_type) = right.data_type() { - if value_type.equals_datatype(left.data_type()) { - return Ok(left.clone()); - } + let left_type = left.data_type(); + let right_type = right.data_type(); + if left_type.equals_datatype(right_type) { + return Ok(left.clone()); } - if let Dictionary(_, value_type) = left.data_type() { - if value_type.equals_datatype(right.data_type()) { - return Ok(right.clone()); + let left_delta_type: DeltaDataType = left_type.try_into()?; + let right_delta_type: DeltaDataType = right_type.try_into()?; + if left_delta_type == right_delta_type { + // The types are same in delta, so we just pick the left one as a base + // As we only store the delta type in metadata, that should be fine + // However, we need to merge the nullable flag + if (left.is_nullable() || left.is_nullable() == right.is_nullable()) { + return Ok(left.clone()); } + let mut new_field = left.clone(); + return Ok(new_field.with_nullable(left.is_nullable() || right.is_nullable())); } let mut new_field = left.clone(); - new_field.try_merge(right)?; + new_field.try_merge(right)?; // this is mostly used for structs / arrays etc Ok(new_field) } @@ -142,6 +150,7 @@ pub fn cast_record_batch( #[cfg(test)] mod tests { + use crate::kernel::DataType as DeltaDataType; use crate::operations::cast::{cast_record_batch, is_cast_required}; use arrow::array::ArrayData; use arrow_array::{Array, ArrayRef, ListArray, RecordBatch}; @@ -149,6 +158,25 @@ mod tests { use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use std::sync::Arc; + #[test] + fn test_merge_schema_with_dict() { + let left_schema = Schema::new(vec![Field::new( + "f", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + false, + )]); + let right_schema = Schema::new(vec![Field::new("f", DataType::LargeUtf8, true)]); + + let result = super::merge_schema(left_schema, right_schema).unwrap(); + assert_eq!(result.fields().len(), 1); + let delta_type: DeltaDataType = result.fields()[0].data_type().try_into().unwrap(); + assert_eq!(delta_type, DeltaDataType::STRING); + assert_eq!(result.fields()[0].is_nullable(), true); + } + #[test] fn test_cast_record_batch_with_list_non_default_item() { let array = Arc::new(make_list_array()) as ArrayRef; From f437d3dfd02578c28e68df043fc453332afa4cb6 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Wed, 20 Mar 2024 08:32:03 +0100 Subject: [PATCH 3/6] oops, forgot fmt :) --- crates/core/src/kernel/arrow/mod.rs | 2 +- crates/core/src/operations/cast.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/core/src/kernel/arrow/mod.rs b/crates/core/src/kernel/arrow/mod.rs index de24068825..45d6432e1d 100644 --- a/crates/core/src/kernel/arrow/mod.rs +++ b/crates/core/src/kernel/arrow/mod.rs @@ -260,7 +260,7 @@ impl TryFrom<&ArrowDataType> for DataType { } else { panic!("DataType::Map should contain a struct field child"); } - }, + } ArrowDataType::Dictionary(_, value_type) => Ok(value_type.as_ref().try_into()?), s => Err(ArrowError::SchemaError(format!( "Invalid data type for Delta Lake: {s}" diff --git a/crates/core/src/operations/cast.rs b/crates/core/src/operations/cast.rs index d02544f70d..52137b8021 100644 --- a/crates/core/src/operations/cast.rs +++ b/crates/core/src/operations/cast.rs @@ -162,10 +162,7 @@ mod tests { fn test_merge_schema_with_dict() { let left_schema = Schema::new(vec![Field::new( "f", - DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Utf8), - ), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false, )]); let right_schema = Schema::new(vec![Field::new("f", DataType::LargeUtf8, true)]); From 88164a3599f7182df68bf554231b60dddc0ee200 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Wed, 20 Mar 2024 10:54:22 +0100 Subject: [PATCH 4/6] switch to merging delta schema --- crates/core/src/operations/cast.rs | 136 +++++++++++++++++-------- crates/core/src/operations/write.rs | 6 +- crates/core/src/writer/record_batch.rs | 8 +- 3 files changed, 97 insertions(+), 53 deletions(-) diff --git a/crates/core/src/operations/cast.rs b/crates/core/src/operations/cast.rs index 52137b8021..e85fd44525 100644 --- a/crates/core/src/operations/cast.rs +++ b/crates/core/src/operations/cast.rs @@ -1,61 +1,38 @@ //! Provide common cast functionality for callers //! -use crate::kernel::DataType as DeltaDataType; -use arrow::datatypes::DataType::Dictionary; +use crate::kernel::{ArrayType, DataType as DeltaDataType, MapType, StructField, StructType}; use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray}; use arrow_cast::{cast_with_options, CastOptions}; -use arrow_schema::{ - ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema, - SchemaRef as ArrowSchemaRef, -}; +use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; use std::sync::Arc; use crate::DeltaResult; -pub(crate) fn merge_field(left: &ArrowField, right: &ArrowField) -> Result { - let left_type = left.data_type(); - let right_type = right.data_type(); - if left_type.equals_datatype(right_type) { - return Ok(left.clone()); - } - let left_delta_type: DeltaDataType = left_type.try_into()?; - let right_delta_type: DeltaDataType = right_type.try_into()?; - if left_delta_type == right_delta_type { - // The types are same in delta, so we just pick the left one as a base - // As we only store the delta type in metadata, that should be fine - // However, we need to merge the nullable flag - if (left.is_nullable() || left.is_nullable() == right.is_nullable()) { - return Ok(left.clone()); - } - let mut new_field = left.clone(); - return Ok(new_field.with_nullable(left.is_nullable() || right.is_nullable())); - } - let mut new_field = left.clone(); - new_field.try_merge(right)?; // this is mostly used for structs / arrays etc - Ok(new_field) -} - -pub(crate) fn merge_schema( - left: ArrowSchema, - right: ArrowSchema, -) -> Result { +pub(crate) fn merge_struct( + left: &StructType, + right: &StructType, +) -> Result { let mut errors = Vec::with_capacity(left.fields().len()); - let merged_fields: Result, ArrowError> = left + let merged_fields: Result, ArrowError> = left .fields() .iter() .map(|field| { let right_field = right.field_with_name(field.name()); if let Ok(right_field) = right_field { - let field_or_not = merge_field(field.as_ref(), right_field); - match field_or_not { + let type_or_not = merge_type(field.data_type(), right_field.data_type()); + match type_or_not { Err(e) => { errors.push(e.to_string()); Err(e) } - Ok(f) => Ok(f), + Ok(f) => Ok(StructField::new( + field.name(), + f, + field.is_nullable() || right_field.is_nullable(), + )), } } else { - Ok(field.as_ref().clone()) + Ok(field.clone()) } }) .collect(); @@ -63,11 +40,11 @@ pub(crate) fn merge_schema( Ok(mut fields) => { for field in right.fields() { if !left.field_with_name(field.name()).is_ok() { - fields.push(field.as_ref().clone()); + fields.push(field.clone()); } } - Ok(ArrowSchema::new(fields)) + Ok(StructType::new(fields)) } Err(e) => { errors.push(e.to_string()); @@ -76,6 +53,51 @@ pub(crate) fn merge_schema( } } +pub(crate) fn merge_type( + left: &DeltaDataType, + right: &DeltaDataType, +) -> Result { + if left == right { + return Ok(left.clone()); + } + match (left, right) { + (DeltaDataType::Array(a), DeltaDataType::Array(b)) => { + let merged = merge_type(&a.element_type, &b.element_type)?; + Ok(DeltaDataType::Array(Box::new(ArrayType::new( + merged, + a.contains_null() || b.contains_null(), + )))) + } + (DeltaDataType::Map(a), DeltaDataType::Map(b)) => { + let merged_key = merge_type(&a.key_type, &b.key_type)?; + let merged_value = merge_type(&a.value_type, &b.value_type)?; + Ok(DeltaDataType::Map(Box::new(MapType::new( + merged_key, + merged_value, + a.value_contains_null() || b.value_contains_null(), + )))) + } + (DeltaDataType::Struct(a), DeltaDataType::Struct(b)) => { + let merged = merge_struct(a, b)?; + Ok(DeltaDataType::Struct(Box::new(merged))) + } + (a, b) => Err(ArrowError::SchemaError(format!( + "Cannot merge types {} and {}", + a, b + ))), + } +} + +pub(crate) fn merge_schema( + left: ArrowSchemaRef, + right: ArrowSchemaRef, +) -> Result { + let left_delta: StructType = left.try_into()?; + let right_delta: StructType = right.try_into()?; + let merged: StructType = merge_struct(&left_delta, &right_delta)?; + Ok(Arc::new((&merged).try_into()?)) +} + fn cast_struct( struct_array: &StructArray, fields: &Fields, @@ -150,7 +172,7 @@ pub fn cast_record_batch( #[cfg(test)] mod tests { - use crate::kernel::DataType as DeltaDataType; + use crate::kernel::{ArrayType as DeltaArrayType, DataType as DeltaDataType}; use crate::operations::cast::{cast_record_batch, is_cast_required}; use arrow::array::ArrayData; use arrow_array::{Array, ArrayRef, ListArray, RecordBatch}; @@ -160,12 +182,16 @@ mod tests { #[test] fn test_merge_schema_with_dict() { - let left_schema = Schema::new(vec![Field::new( + let left_schema = Arc::new(Schema::new(vec![Field::new( "f", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false, - )]); - let right_schema = Schema::new(vec![Field::new("f", DataType::LargeUtf8, true)]); + )])); + let right_schema = Arc::new(Schema::new(vec![Field::new( + "f", + DataType::LargeUtf8, + true, + )])); let result = super::merge_schema(left_schema, right_schema).unwrap(); assert_eq!(result.fields().len(), 1); @@ -173,6 +199,28 @@ mod tests { assert_eq!(delta_type, DeltaDataType::STRING); assert_eq!(result.fields()[0].is_nullable(), true); } + #[test] + fn test_merge_schema_with_nested() { + let left_schema = Arc::new(Schema::new(vec![Field::new( + "f", + DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, false))), + false, + )])); + let right_schema = Arc::new(Schema::new(vec![Field::new( + "f", + DataType::List(Arc::new(Field::new("item", DataType::LargeUtf8, false))), + true, + )])); + + let result = super::merge_schema(left_schema, right_schema).unwrap(); + assert_eq!(result.fields().len(), 1); + let delta_type: DeltaDataType = result.fields()[0].data_type().try_into().unwrap(); + assert_eq!( + delta_type, + DeltaDataType::Array(Box::new(DeltaArrayType::new(DeltaDataType::STRING, false))) + ); + assert_eq!(result.fields()[0].is_nullable(), true); + } #[test] fn test_cast_record_batch_with_list_non_default_item() { diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index d2751a6b1f..ec0a6c80d1 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -623,10 +623,8 @@ impl std::future::IntoFuture for WriteBuilder { if this.mode == SaveMode::Overwrite && this.schema_mode.is_some() { new_schema = None // we overwrite anyway, so no need to cast } else if this.schema_mode == Some(SchemaMode::Merge) { - new_schema = Some(Arc::new(merge_schema( - table_schema.as_ref().clone(), - schema.as_ref().clone(), - )?)); + new_schema = + Some(merge_schema(table_schema.clone(), schema.clone())?); } else { return Err(schema_err.into()); } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 5c8fb57509..8b43f35242 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -306,11 +306,9 @@ impl PartitionWriter { WriteMode::MergeSchema => { debug!("The writer and record batch schemas do not match, merging"); - let merged = merge_schema( - self.arrow_schema.as_ref().clone(), - record_batch.schema().as_ref().clone(), - )?; - self.arrow_schema = Arc::new(merged); + let merged = + merge_schema(self.arrow_schema.clone(), record_batch.schema().clone())?; + self.arrow_schema = merged; let mut cols = vec![]; for field in self.arrow_schema.fields() { From 4053a01d142aca829afd50b631209bc2d85b0079 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Wed, 20 Mar 2024 13:45:43 +0100 Subject: [PATCH 5/6] fixes test? --- python/tests/test_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index dfd124a73d..96903f0824 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -259,7 +259,7 @@ def test_update_schema_rust_writer_append(existing_table: DeltaTable): ) with pytest.raises( SchemaMismatchError, - match="Schema error: Fail to merge schema field 'utf8' because the from data_type = Int64 does not equal Utf8", + match="Schema error: Cannot merge types string and long", ): write_deltalake( existing_table, From 8ab4ab7d4e574e3c3a8c6478f67ab574fca5ed51 Mon Sep 17 00:00:00 2001 From: Adrian Ehrsam Date: Wed, 20 Mar 2024 21:20:28 +0100 Subject: [PATCH 6/6] handle metadata and test --- crates/core/src/operations/cast.rs | 77 +++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 7 deletions(-) diff --git a/crates/core/src/operations/cast.rs b/crates/core/src/operations/cast.rs index e85fd44525..f92b3c646e 100644 --- a/crates/core/src/operations/cast.rs +++ b/crates/core/src/operations/cast.rs @@ -1,13 +1,35 @@ //! Provide common cast functionality for callers //! -use crate::kernel::{ArrayType, DataType as DeltaDataType, MapType, StructField, StructType}; +use crate::kernel::{ + ArrayType, DataType as DeltaDataType, MapType, MetadataValue, StructField, StructType, +}; use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray}; use arrow_cast::{cast_with_options, CastOptions}; use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; +use std::collections::HashMap; use std::sync::Arc; use crate::DeltaResult; +fn try_merge_metadata( + left: &mut HashMap, + right: &HashMap, +) -> Result<(), ArrowError> { + for (k, v) in right { + if let Some(vl) = left.get(k) { + if vl != v { + return Err(ArrowError::SchemaError(format!( + "Cannot merge metadata with different values for key {}", + k + ))); + } + } else { + left.insert(k.clone(), v.clone()); + } + } + Ok(()) +} + pub(crate) fn merge_struct( left: &StructType, right: &StructType, @@ -25,11 +47,17 @@ pub(crate) fn merge_struct( errors.push(e.to_string()); Err(e) } - Ok(f) => Ok(StructField::new( - field.name(), - f, - field.is_nullable() || right_field.is_nullable(), - )), + Ok(f) => { + let mut new_field = StructField::new( + field.name(), + f, + field.is_nullable() || right_field.is_nullable(), + ); + + new_field.metadata = field.metadata.clone(); + try_merge_metadata(&mut new_field.metadata, &right_field.metadata)?; + Ok(new_field) + } } } else { Ok(field.clone()) @@ -172,12 +200,17 @@ pub fn cast_record_batch( #[cfg(test)] mod tests { - use crate::kernel::{ArrayType as DeltaArrayType, DataType as DeltaDataType}; + use crate::kernel::{ + ArrayType as DeltaArrayType, DataType as DeltaDataType, StructField as DeltaStructField, + StructType as DeltaStructType, + }; + use crate::operations::cast::MetadataValue; use crate::operations::cast::{cast_record_batch, is_cast_required}; use arrow::array::ArrayData; use arrow_array::{Array, ArrayRef, ListArray, RecordBatch}; use arrow_buffer::Buffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; + use std::collections::HashMap; use std::sync::Arc; #[test] @@ -199,6 +232,36 @@ mod tests { assert_eq!(delta_type, DeltaDataType::STRING); assert_eq!(result.fields()[0].is_nullable(), true); } + + #[test] + fn test_merge_schema_with_meta() { + let mut left_meta = HashMap::new(); + left_meta.insert("a".to_string(), "a1".to_string()); + let left_schema = DeltaStructType::new(vec![DeltaStructField::new( + "f", + DeltaDataType::STRING, + false, + ) + .with_metadata(left_meta)]); + let mut right_meta = HashMap::new(); + right_meta.insert("b".to_string(), "b2".to_string()); + let right_schema = DeltaStructType::new(vec![DeltaStructField::new( + "f", + DeltaDataType::STRING, + true, + ) + .with_metadata(right_meta)]); + + let result = super::merge_struct(&left_schema, &right_schema).unwrap(); + assert_eq!(result.fields().len(), 1); + let delta_type = result.fields()[0].data_type(); + assert_eq!(delta_type, &DeltaDataType::STRING); + let mut expected_meta = HashMap::new(); + expected_meta.insert("a".to_string(), MetadataValue::String("a1".to_string())); + expected_meta.insert("b".to_string(), MetadataValue::String("b2".to_string())); + assert_eq!(result.fields()[0].metadata(), &expected_meta); + } + #[test] fn test_merge_schema_with_nested() { let left_schema = Arc::new(Schema::new(vec![Field::new(