From 37475afae604d3a03d6daf676de15125cca6a719 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Wed, 7 Aug 2024 15:21:49 -0500 Subject: [PATCH 1/3] chore: update delta_kernel to 0.3.0 --- Cargo.toml | 3 +- crates/core/src/kernel/arrow/mod.rs | 40 +++++++++++++++++++++----- crates/core/src/kernel/snapshot/mod.rs | 20 ++++++++++++- crates/core/src/table/config.rs | 2 +- crates/core/src/table/state_arrow.rs | 2 +- 5 files changed, 56 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 21dfcb73d3..c5cdd86c74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,8 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.2.0" } +# delta_kernel = { version = "0.2.0" } +delta_kernel = { version = "0.3.0" } # delta_kernel = { path = "../delta-kernel-rs/kernel" } # arrow diff --git a/crates/core/src/kernel/arrow/mod.rs b/crates/core/src/kernel/arrow/mod.rs index 1448f43428..0fb41379dd 100644 --- a/crates/core/src/kernel/arrow/mod.rs +++ b/crates/core/src/kernel/arrow/mod.rs @@ -362,7 +362,9 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - use arrow_array::{MapArray, RecordBatch}; + use arrow::array::ArrayData; + use arrow_array::{Array, BinaryArray, MapArray, RecordBatch, StringArray, StructArray}; + use arrow_buffer::{Buffer, ToByteSlice}; use delta_kernel::schema::{DataType, MapType, PrimitiveType, StructField, StructType}; use super::*; @@ -521,12 +523,36 @@ mod tests { let entry_offsets = vec![0u32, 1, 1, 4, 5, 5]; let num_rows = keys.len(); - let map_array = MapArray::new_from_strings( - keys.into_iter(), - &arrow::array::BinaryArray::from(values), - entry_offsets.as_slice(), - ) - .expect("Could not create a map array"); + let key_field = Arc::new(ArrowField::new(MAP_KEY_DEFAULT, ArrowDataType::Utf8, false)); + let value_field = Arc::new(ArrowField::new( + MAP_VALUE_DEFAULT, + ArrowDataType::Binary, + false, + )); + let key_value_field = ArrowField::new_struct( + MAP_ROOT_DEFAULT, + vec![key_field.clone(), value_field.clone()], + false, + ); + let key_value_array = StructArray::new( + vec![key_field, value_field].into(), + vec![ + Arc::new(StringArray::from(keys)), + Arc::new(BinaryArray::from(values)), + ], + None, + ); + let entry_offsets_buffer = Buffer::from(entry_offsets.as_slice().to_byte_slice()); + + let map_data_type = ArrowDataType::Map(Arc::new(key_value_field), false); + let map_data = ArrayData::builder(map_data_type) + .len(entry_offsets.len() - 1) + .add_buffer(entry_offsets_buffer) + .add_child_data(key_value_array.into_data()) + .build() + .unwrap(); + + let map_array = MapArray::from(map_data); let schema = >::try_from(&StructType::new(vec![ diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index d34b78fbed..2cc13e39b1 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -315,7 +315,7 @@ impl Snapshot { let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() { stats_cols .iter() - .map(|col| match schema.field(col) { + .map(|col| match lookup_stats_field(schema, col) { Some(field) => match field.data_type() { DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => { Err(DeltaTableError::Generic(format!( @@ -358,6 +358,24 @@ impl Snapshot { } } +fn lookup_stats_field<'a>(schema: &'a StructType, field_name: &str) -> Option<&'a StructField> { + if field_name.starts_with('`') && field_name.ends_with('`') { + let field_name = &field_name[1..field_name.len() - 1]; + schema.field(field_name) + } else { + match field_name.split_once('.') { + Some((parent, children)) => { + let parent_field = schema.fields.get(parent)?; + match parent_field.data_type() { + DataType::Struct(s) => lookup_stats_field(s.as_ref(), children), + _ => None, + } + } + None => schema.field(field_name), + } + } +} + /// A snapshot of a Delta table that has been eagerly loaded into memory. #[derive(Debug, Clone, PartialEq)] pub struct EagerSnapshot { diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index 3512e3abb5..47307cfecd 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::{collections::HashMap, str::FromStr}; -use delta_kernel::column_mapping::ColumnMappingMode; +use delta_kernel::features::ColumnMappingMode; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index 24d4a474ff..197e8d7fd3 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -14,7 +14,7 @@ use arrow_array::{ StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, }; use arrow_schema::{DataType, Field, Fields, TimeUnit}; -use delta_kernel::column_mapping::ColumnMappingMode; +use delta_kernel::features::ColumnMappingMode; use itertools::Itertools; use super::state::DeltaTableState; From 83394eb477ca234201f23fdce3ccd9a4cec81c77 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Wed, 7 Aug 2024 15:25:25 -0500 Subject: [PATCH 2/3] chore: reverting in progress work --- crates/core/src/kernel/snapshot/mod.rs | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 2cc13e39b1..d34b78fbed 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -315,7 +315,7 @@ impl Snapshot { let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() { stats_cols .iter() - .map(|col| match lookup_stats_field(schema, col) { + .map(|col| match schema.field(col) { Some(field) => match field.data_type() { DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => { Err(DeltaTableError::Generic(format!( @@ -358,24 +358,6 @@ impl Snapshot { } } -fn lookup_stats_field<'a>(schema: &'a StructType, field_name: &str) -> Option<&'a StructField> { - if field_name.starts_with('`') && field_name.ends_with('`') { - let field_name = &field_name[1..field_name.len() - 1]; - schema.field(field_name) - } else { - match field_name.split_once('.') { - Some((parent, children)) => { - let parent_field = schema.fields.get(parent)?; - match parent_field.data_type() { - DataType::Struct(s) => lookup_stats_field(s.as_ref(), children), - _ => None, - } - } - None => schema.field(field_name), - } - } -} - /// A snapshot of a Delta table that has been eagerly loaded into memory. #[derive(Debug, Clone, PartialEq)] pub struct EagerSnapshot { From 5a47b412082a9618b4278390d1cc883fc14927ff Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Wed, 7 Aug 2024 15:26:20 -0500 Subject: [PATCH 3/3] chore: removing comment --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c5cdd86c74..0892b0f12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -# delta_kernel = { version = "0.2.0" } delta_kernel = { version = "0.3.0" } # delta_kernel = { path = "../delta-kernel-rs/kernel" }