From 55182613348246b4f07ec0b65de25db4fc730076 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Thu, 1 Feb 2024 21:34:56 +0100 Subject: [PATCH] fix: replace BTreeMap with IndexMap to preserve insertion order (#2150) # Description When switching to using `BTreeMap` for representing partition values, I introduced a bug, thinking `BTreeMap` would preserve insertion order. While it is ordered, it is ordered based on keys. This PR moves to using `IndexMap`, which actually preserves insertion order. --- crates/core/Cargo.toml | 1 + crates/core/src/kernel/snapshot/log_data.rs | 23 ++++++++++----------- crates/core/src/operations/optimize.rs | 19 +++++++++-------- crates/core/src/operations/writer.rs | 11 +++++----- crates/core/src/writer/json.rs | 15 +++++++------- crates/core/src/writer/record_batch.rs | 12 +++++------ crates/core/src/writer/stats.rs | 6 +++--- 7 files changed, 45 insertions(+), 42 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9773f82c46..4bd58a492c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -77,6 +77,7 @@ errno = "0.3" either = "1.8" fix-hidden-lifetime-bug = "0.2" hyper = { version = "0.14", optional = true } +indexmap = "2.2.1" itertools = "0.12" lazy_static = "1" libc = ">=0.2.90, <1" diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index b874b53421..686a4110fe 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -1,9 +1,10 @@ use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray}; use chrono::{NaiveDateTime, TimeZone, Utc}; +use indexmap::IndexMap; use object_store::path::Path; use object_store::ObjectMeta; use percent_encoding::percent_decode_str; @@ -19,37 +20,35 @@ const COL_MIN_VALUES: &str = "minValues"; const COL_MAX_VALUES: &str = "maxValues"; const COL_NULL_COUNT: &str = "nullCount"; -pub(crate) type PartitionFields<'a> = Arc>; -pub(crate) type PartitionValues<'a> = BTreeMap<&'a str, Scalar>; +pub(crate) type PartitionFields<'a> = Arc>; +pub(crate) type PartitionValues<'a> = IndexMap<&'a str, Scalar>; pub(crate) trait PartitionsExt { fn hive_partition_path(&self) -> String; } -impl PartitionsExt for BTreeMap<&str, Scalar> { +impl PartitionsExt for IndexMap<&str, Scalar> { fn hive_partition_path(&self) -> String { - let mut fields = self + let fields = self .iter() .map(|(k, v)| { let encoded = v.serialize_encoded(); format!("{k}={encoded}") }) .collect::>(); - fields.reverse(); fields.join("/") } } -impl PartitionsExt for BTreeMap { +impl PartitionsExt for IndexMap { fn hive_partition_path(&self) -> String { - let mut fields = self + let fields = self .iter() .map(|(k, v)| { let encoded = v.serialize_encoded(); format!("{k}={encoded}") }) .collect::>(); - fields.reverse(); fields.join("/") } } @@ -192,7 +191,7 @@ impl LogicalFile<'_> { /// The partition values for this logical file. pub fn partition_values(&self) -> DeltaResult> { if self.partition_fields.is_empty() { - return Ok(BTreeMap::new()); + return Ok(IndexMap::new()); } let map_value = self.partition_values.value(self.index); let keys = map_value @@ -237,7 +236,7 @@ impl LogicalFile<'_> { .unwrap_or(Scalar::Null(f.data_type.clone())); Ok((*k, val)) }) - .collect::>>() + .collect::>>() } /// Defines a deletion vector @@ -355,7 +354,7 @@ impl<'a> FileStatsAccessor<'a> { .partition_columns .iter() .map(|c| Ok((c.as_str(), schema.field_with_name(c.as_str())?))) - .collect::>>()?, + .collect::>>()?, ); let deletion_vector = extract_and_cast_opt::(data, "add.deletionVector"); let deletion_vector = deletion_vector.and_then(|dv| { diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index c67b31a71b..6b33939829 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -29,6 +29,7 @@ use arrow_array::RecordBatch; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{Future, StreamExt, TryStreamExt}; +use indexmap::IndexMap; use itertools::Itertools; use num_cpus; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; @@ -308,7 +309,7 @@ impl From for DeltaOperation { fn create_remove( path: &str, - partitions: &BTreeMap, + partitions: &IndexMap, size: i64, ) -> Result { // NOTE unwrap is safe since UNIX_EPOCH will always be earlier then now. @@ -353,11 +354,11 @@ enum OptimizeOperations { /// /// Bins are determined by the bin-packing algorithm to reach an optimal size. /// Files that are large enough already are skipped. Bins of size 1 are dropped. - Compact(HashMap, Vec)>), + Compact(HashMap, Vec)>), /// Plan to Z-order each partition ZOrder( Vec, - HashMap, MergeBin)>, + HashMap, MergeBin)>, ), // TODO: Sort } @@ -401,7 +402,7 @@ impl MergePlan { /// collected during the operation. async fn rewrite_files( task_parameters: Arc, - partition_values: BTreeMap, + partition_values: IndexMap, files: MergeBin, object_store: ObjectStoreRef, read_stream: F, @@ -849,7 +850,7 @@ fn build_compaction_plan( ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { let mut metrics = Metrics::default(); - let mut partition_files: HashMap, Vec)> = + let mut partition_files: HashMap, Vec)> = HashMap::new(); for add in snapshot.get_active_add_actions_by_partitions(filters)? { let add = add?; @@ -863,7 +864,7 @@ fn build_compaction_plan( .partition_values()? .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::>(); + .collect::>(); partition_files .entry(add.partition_values()?.hive_partition_path()) @@ -877,7 +878,7 @@ fn build_compaction_plan( file.sort_by(|a, b| b.size.cmp(&a.size)); } - let mut operations: HashMap, Vec)> = HashMap::new(); + let mut operations: HashMap, Vec)> = HashMap::new(); for (part, (partition, files)) in partition_files { let mut merge_bins = vec![MergeBin::new()]; @@ -955,14 +956,14 @@ fn build_zorder_plan( // For now, just be naive and optimize all files in each selected partition. let mut metrics = Metrics::default(); - let mut partition_files: HashMap, MergeBin)> = HashMap::new(); + let mut partition_files: HashMap, MergeBin)> = HashMap::new(); for add in snapshot.get_active_add_actions_by_partitions(filters)? { let add = add?; let partition_values = add .partition_values()? .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::>(); + .collect::>(); metrics.total_considered_files += 1; let object_meta = ObjectMeta::try_from(&add)?; diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 5d8808fa3c..f92918653c 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -1,11 +1,12 @@ //! Abstractions and implementations for writing data to delta tables -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use bytes::Bytes; +use indexmap::IndexMap; use object_store::{path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; use parquet::basic::Compression; @@ -155,7 +156,7 @@ impl DeltaWriter { pub async fn write_partition( &mut self, record_batch: RecordBatch, - partition_values: &BTreeMap, + partition_values: &IndexMap, ) -> DeltaResult<()> { let partition_key = Path::parse(partition_values.hive_partition_path())?; @@ -217,7 +218,7 @@ pub(crate) struct PartitionWriterConfig { /// Prefix applied to all paths prefix: Path, /// Values for all partition columns - partition_values: BTreeMap, + partition_values: IndexMap, /// Properties passed to underlying parquet writer writer_properties: WriterProperties, /// Size above which we will write a buffered parquet file to disk. @@ -230,7 +231,7 @@ pub(crate) struct PartitionWriterConfig { impl PartitionWriterConfig { pub fn try_new( file_schema: ArrowSchemaRef, - partition_values: BTreeMap, + partition_values: IndexMap, writer_properties: Option, target_file_size: Option, write_batch_size: Option, @@ -514,7 +515,7 @@ mod tests { ) -> PartitionWriter { let config = PartitionWriterConfig::try_new( batch.schema(), - BTreeMap::new(), + IndexMap::new(), writer_properties, target_file_size, write_batch_size, diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index a51dd86b58..a13d29ba4b 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -1,11 +1,12 @@ //! Main writer API to write json messages to delta table -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::record_batch::*; use bytes::Bytes; +use indexmap::IndexMap; use object_store::path::Path; use object_store::ObjectStore; use parquet::{ @@ -45,7 +46,7 @@ pub(crate) struct DataArrowWriter { writer_properties: WriterProperties, buffer: ShareableBuffer, arrow_writer: ArrowWriter, - partition_values: BTreeMap, + partition_values: IndexMap, buffered_record_batch_count: usize, } @@ -153,7 +154,7 @@ impl DataArrowWriter { writer_properties.clone(), )?; - let partition_values = BTreeMap::new(); + let partition_values = IndexMap::new(); let buffered_record_batch_count = 0; Ok(Self { @@ -397,8 +398,8 @@ fn quarantine_failed_parquet_rows( fn extract_partition_values( partition_cols: &[String], record_batch: &RecordBatch, -) -> Result, DeltaWriterError> { - let mut partition_values = BTreeMap::new(); +) -> Result, DeltaWriterError> { + let mut partition_values = IndexMap::new(); for col_name in partition_cols.iter() { let arrow_schema = record_batch.schema(); @@ -499,7 +500,7 @@ mod tests { &record_batch ) .unwrap(), - BTreeMap::from([ + IndexMap::from([ (String::from("col1"), Scalar::Integer(1)), (String::from("col2"), Scalar::Integer(2)), (String::from("col3"), Scalar::Null(DataType::INTEGER)), @@ -507,7 +508,7 @@ mod tests { ); assert_eq!( extract_partition_values(&[String::from("col1")], &record_batch).unwrap(), - BTreeMap::from([(String::from("col1"), Scalar::Integer(1)),]) + IndexMap::from([(String::from("col1"), Scalar::Integer(1)),]) ); assert!(extract_partition_values(&[String::from("col4")], &record_batch).is_err()) } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 48525a3335..a9ff09cf1d 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -5,7 +5,6 @@ //! the writer. Once written, add actions are returned by the writer. It's the users responsibility //! to create the transaction using those actions. -use std::collections::BTreeMap; use std::{collections::HashMap, sync::Arc}; use arrow::array::{Array, UInt32Array}; @@ -15,6 +14,7 @@ use arrow_array::ArrayRef; use arrow_row::{RowConverter, SortField}; use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; +use indexmap::IndexMap; use object_store::{path::Path, ObjectStore}; use parquet::{arrow::ArrowWriter, errors::ParquetError}; use parquet::{basic::Compression, file::properties::WriterProperties}; @@ -127,7 +127,7 @@ impl RecordBatchWriter { pub async fn write_partition( &mut self, record_batch: RecordBatch, - partition_values: &BTreeMap, + partition_values: &IndexMap, ) -> Result<(), DeltaTableError> { let arrow_schema = arrow_schema_without_partitions(&self.arrow_schema_ref, &self.partition_columns); @@ -212,7 +212,7 @@ impl DeltaWriter for RecordBatchWriter { #[derive(Clone, Debug)] pub struct PartitionResult { /// values found in partition columns - pub partition_values: BTreeMap, + pub partition_values: IndexMap, /// remaining dataset with partition column values removed pub record_batch: RecordBatch, } @@ -222,14 +222,14 @@ struct PartitionWriter { writer_properties: WriterProperties, pub(super) buffer: ShareableBuffer, pub(super) arrow_writer: ArrowWriter, - pub(super) partition_values: BTreeMap, + pub(super) partition_values: IndexMap, pub(super) buffered_record_batch_count: usize, } impl PartitionWriter { pub fn new( arrow_schema: Arc, - partition_values: BTreeMap, + partition_values: IndexMap, writer_properties: WriterProperties, ) -> Result { let buffer = ShareableBuffer::default(); @@ -302,7 +302,7 @@ pub(crate) fn divide_by_partition_values( if partition_columns.is_empty() { partitions.push(PartitionResult { - partition_values: BTreeMap::new(), + partition_values: IndexMap::new(), record_batch: values.clone(), }); return Ok(partitions); diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 4ba217cc1e..40e7b303cc 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -1,8 +1,8 @@ -use std::collections::BTreeMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; +use indexmap::IndexMap; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{basic::LogicalType, errors::ParquetError}; @@ -17,7 +17,7 @@ use crate::protocol::{ColumnValueStat, Stats}; /// Creates an [`Add`] log action struct. pub fn create_add( - partition_values: &BTreeMap, + partition_values: &IndexMap, path: String, size: i64, file_metadata: &FileMetaData, @@ -59,7 +59,7 @@ pub fn create_add( } fn stats_from_file_metadata( - partition_values: &BTreeMap, + partition_values: &IndexMap, file_metadata: &FileMetaData, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice());