From 570b9fbcea0d731bb87ec2088425b0b1cb445688 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 11:21:17 +0200 Subject: [PATCH 01/16] make stats_cols and num_index accessible in python --- python/deltalake/_internal.pyi | 2 ++ python/src/lib.rs | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index d100cdb11f..d7a1465a99 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -43,6 +43,8 @@ class RawDeltaTable: def table_uri(self) -> str: ... def version(self) -> int: ... def get_latest_version(self) -> int: ... + def get_num_index_cols(self) -> int: ... + def get_stats_columns(self) -> Optional[List[str]]: ... def metadata(self) -> RawDeltaTableMetaData: ... def protocol_versions(self) -> List[Any]: ... def load_version(self, version: int) -> None: ... diff --git a/python/src/lib.rs b/python/src/lib.rs index a64a5efe84..24722ef9fc 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -33,7 +33,7 @@ use deltalake::operations::filesystem_check::FileSystemCheckBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; -use deltalake::operations::transaction::{CommitBuilder, CommitProperties}; +use deltalake::operations::transaction::{CommitBuilder, CommitProperties, TableReference}; use deltalake::operations::update::UpdateBuilder; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::parquet::basic::Compression; @@ -187,6 +187,20 @@ impl RawDeltaTable { .map_err(PythonError::from)?) } + pub fn get_num_index_cols(&mut self) -> PyResult { + Ok(self._table.snapshot().map_err(PythonError::from)?.config().num_indexed_cols()) + } + + pub fn get_stats_columns(&mut self) -> PyResult>> { + Ok(self._table.snapshot().map_err(PythonError::from)? + .config() + .stats_columns() + .map(|v|{ + v.iter() + .map(|v|v.to_string()).collect::>() + })) + } + pub fn load_with_datetime(&mut self, ds: &str) -> PyResult<()> { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( From 824170754799d48cb74d5584a85dff7be0356cec Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 12:38:55 +0200 Subject: [PATCH 02/16] respect file stats column collection configs --- python/deltalake/writer.py | 42 ++++++++++++++++++++++++++++++++++++-- python/src/lib.rs | 17 +++++++++------ 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 7924b072b9..921f66a615 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -264,7 +264,30 @@ def write_deltalake( storage_options.update(storage_options or {}) table.update_incremental() + num_indexed_cols = table._table.get_num_index_cols() + stats_cols = table._table.get_stats_columns() + else: + DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS = 32 + if configuration is not None: + num_indexed_cols = configuration.get( + "delta.dataSkippingNumIndexedCols", DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS + ) + if num_indexed_cols is None: + num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS + else: + num_indexed_cols = int(num_indexed_cols) + stats_cols = configuration.get("delta.dataSkippingStatsColumns") + if isinstance(stats_cols, str): + stats_cols = stats_cols.split(",") + if stats_cols and isinstance(stats_cols, list): + if not all(isinstance(inner, str) for inner in stats_cols): + raise ValueError( + "dataSkippingStatsColumns needs to be a comma-separated list of column names in string format. Example: 'foo,bar,baz'" + ) + else: + num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS + stats_cols = None __enforce_append_only(table=table, configuration=configuration, mode=mode) if overwrite_schema: schema_mode = "overwrite" @@ -404,7 +427,11 @@ def _large_to_normal_dtype(dtype: pa.DataType) -> pa.DataType: def visitor(written_file: Any) -> None: path, partition_values = get_partitions_from_path(written_file.path) - stats = get_file_stats_from_metadata(written_file.metadata) + stats = get_file_stats_from_metadata( + written_file.metadata, + num_indexed_cols=num_indexed_cols, + columns_to_collect_stats=stats_cols, + ) # PyArrow added support for written_file.size in 9.0.0 if PYARROW_MAJOR_VERSION >= 9: @@ -701,6 +728,8 @@ def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: def get_file_stats_from_metadata( metadata: Any, + num_indexed_cols: int, + columns_to_collect_stats: Optional[List[str]], ) -> Dict[str, Union[int, Dict[str, Any]]]: stats = { "numRecords": metadata.num_rows, @@ -714,8 +743,17 @@ def iter_groups(metadata: Any) -> Iterator[Any]: if metadata.row_group(i).num_rows > 0: yield metadata.row_group(i) - for column_idx in range(metadata.num_columns): + if columns_to_collect_stats is not None: + columns_to_iterate = metadata.num_columns + else: + columns_to_iterate = num_indexed_cols + + for column_idx in range(columns_to_iterate): name = metadata.row_group(0).column(column_idx).path_in_schema + + if columns_to_collect_stats is not None: + if name not in columns_to_collect_stats: + continue # If stats missing, then we can't know aggregate stats if all( group.column(column_idx).is_stats_set for group in iter_groups(metadata) diff --git a/python/src/lib.rs b/python/src/lib.rs index 24722ef9fc..48f07933b4 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -188,17 +188,22 @@ impl RawDeltaTable { } pub fn get_num_index_cols(&mut self) -> PyResult { - Ok(self._table.snapshot().map_err(PythonError::from)?.config().num_indexed_cols()) + Ok(self + ._table + .snapshot() + .map_err(PythonError::from)? + .config() + .num_indexed_cols()) } pub fn get_stats_columns(&mut self) -> PyResult>> { - Ok(self._table.snapshot().map_err(PythonError::from)? + Ok(self + ._table + .snapshot() + .map_err(PythonError::from)? .config() .stats_columns() - .map(|v|{ - v.iter() - .map(|v|v.to_string()).collect::>() - })) + .map(|v| v.iter().map(|v| v.to_string()).collect::>())) } pub fn load_with_datetime(&mut self, ds: &str) -> PyResult<()> { From e10f8be8bc03d9256dce3dce2d98c6e8404bc96c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 16:43:31 +0200 Subject: [PATCH 03/16] Collect table stats according to config in rust writer --- crates/core/src/operations/delete.rs | 10 ++++ crates/core/src/operations/merge/mod.rs | 11 ++++- crates/core/src/operations/optimize.rs | 16 ++++++- crates/core/src/operations/update.rs | 14 +++++- crates/core/src/operations/write.rs | 63 ++++++++++++++++++++++++- crates/core/src/operations/writer.rs | 26 +++++++++- crates/core/src/table/config.rs | 3 ++ crates/core/src/writer/json.rs | 3 ++ crates/core/src/writer/record_batch.rs | 3 ++ crates/core/src/writer/stats.rs | 26 +++++++++- 10 files changed, 167 insertions(+), 8 deletions(-) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index e28633ae17..10998cf356 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -35,6 +35,7 @@ use serde::Serialize; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use super::write::WriterStatsConfig; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext, @@ -154,6 +155,14 @@ async fn excute_non_empty_expr( let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let add_actions = write_execution_plan( Some(snapshot), state.clone(), @@ -165,6 +174,7 @@ async fn excute_non_empty_expr( writer_properties, false, None, + writer_stats_config, ) .await? .into_iter() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 206ee0e899..aed02d18d5 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -74,7 +74,7 @@ use crate::kernel::Action; use crate::logstore::LogStoreRef; use crate::operations::merge::barrier::find_barrier_node; use crate::operations::transaction::CommitBuilder; -use crate::operations::write::write_execution_plan; +use crate::operations::write::{write_execution_plan, WriterStatsConfig}; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -1367,6 +1367,14 @@ async fn execute( // write projected records let table_partition_cols = current_metadata.partition_columns.clone(); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let rewrite_start = Instant::now(); let add_actions = write_execution_plan( Some(snapshot), @@ -1379,6 +1387,7 @@ async fn execute( writer_properties, safe_cast, None, + writer_stats_config, ) .await?; diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index babe17a6a0..97c99a390e 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -422,6 +422,10 @@ pub struct MergeTaskParameters { file_schema: ArrowSchemaRef, /// Properties passed to parquet writer writer_properties: WriterProperties, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } /// A stream of record batches, with a ParquetError on failure. @@ -481,7 +485,12 @@ impl MergePlan { Some(task_parameters.input_parameters.target_size as usize), None, )?; - let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?; + let mut writer = PartitionWriter::try_with_config( + object_store, + writer_config, + task_parameters.num_indexed_cols, + task_parameters.stats_columns.clone(), + )?; let mut read_stream = read_stream.await?; @@ -839,6 +848,11 @@ pub fn create_merge_plan( input_parameters, file_schema, writer_properties, + num_indexed_cols: snapshot.table_config().num_indexed_cols(), + stats_columns: snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), }), read_table_version: snapshot.version(), }) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 6f8b595029..d9e0f4655e 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -42,12 +42,15 @@ use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use serde::Serialize; -use super::transaction::PROTOCOL; use super::write::write_execution_plan; use super::{ datafusion_utils::Expression, transaction::{CommitBuilder, CommitProperties}, }; +use super::{ + transaction::{TableReference, PROTOCOL}, + write::WriterStatsConfig, +}; use crate::delta_datafusion::{ expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn, DeltaSessionContext, @@ -347,6 +350,14 @@ async fn execute( projection_update.clone(), )?); + let writer_stats_config = WriterStatsConfig::new( + snapshot.table_config().num_indexed_cols(), + snapshot + .table_config() + .stats_columns() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ); + let add_actions = write_execution_plan( Some(snapshot), state.clone(), @@ -358,6 +369,7 @@ async fn execute( writer_properties, safe_cast, None, + writer_stats_config, ) .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 10b48a768c..bc3c08050a 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -25,6 +25,7 @@ //! ```` use std::collections::HashMap; +use std::hash::Hash; use std::str::FromStr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -33,6 +34,7 @@ use std::vec; use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; +use datafusion::execution::config; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -52,11 +54,12 @@ use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType}; +use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, Snapshot, StructType}; use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema}; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::table::state::DeltaTableState; use crate::table::Constraint as DeltaConstraint; use crate::writer::record_batch::divide_by_partition_values; @@ -334,6 +337,24 @@ impl WriteBuilder { } } } +/// Configuration for the writer on how to collect stats +#[derive(Clone)] +pub struct WriterStatsConfig { + /// Number of columns to collect stats for, idx based + num_indexed_cols: i32, + /// Optional list of columns which to collect stats for, takes precedende over num_index_cols + stats_columns: Option>, +} + +impl WriterStatsConfig { + /// Create new writer stats config + pub fn new(num_indexed_cols: i32, stats_columns: Option>) -> Self { + Self { + num_indexed_cols, + stats_columns, + } + } +} #[allow(clippy::too_many_arguments)] async fn write_execution_plan_with_predicate( @@ -348,6 +369,7 @@ async fn write_execution_plan_with_predicate( writer_properties: Option, safe_cast: bool, schema_mode: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { let schema: ArrowSchemaRef = if schema_mode.is_some() { plan.schema() @@ -383,6 +405,8 @@ async fn write_execution_plan_with_predicate( writer_properties.clone(), target_file_size, write_batch_size, + writer_stats_config.num_indexed_cols, + writer_stats_config.stats_columns.clone(), ); let mut writer = DeltaWriter::new(object_store.clone(), config); let checker_stream = checker.clone(); @@ -435,6 +459,7 @@ pub(crate) async fn write_execution_plan( writer_properties: Option, safe_cast: bool, schema_mode: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { write_execution_plan_with_predicate( None, @@ -448,6 +473,7 @@ pub(crate) async fn write_execution_plan( writer_properties, safe_cast, schema_mode, + writer_stats_config, ) .await } @@ -460,6 +486,7 @@ async fn execute_non_empty_expr( expression: &Expr, rewrite: &[Add], writer_properties: Option, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. @@ -496,6 +523,7 @@ async fn execute_non_empty_expr( writer_properties, false, None, + writer_stats_config, ) .await?; @@ -511,6 +539,7 @@ async fn prepare_predicate_actions( partition_columns: Vec, writer_properties: Option, deletion_timestamp: i64, + writer_stats_config: WriterStatsConfig, ) -> DeltaResult> { let candidates = find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; @@ -526,6 +555,7 @@ async fn prepare_predicate_actions( &predicate, &candidates.candidates, writer_properties, + writer_stats_config, ) .await? }; @@ -723,6 +753,35 @@ impl std::future::IntoFuture for WriteBuilder { _ => (None, None), }; + let config = if let Some(snapshot) = &this.snapshot { + Some(snapshot.table_config()) + } else { + None + }; + + let (num_index_cols, stats_columns) = match &config { + Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()), + _ => ( + this.configuration + .get("delta.dataSkippingNumIndexedCols") + .map(|v| v.clone().map(|v| v.parse::().unwrap())) + .flatten() + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + this.configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| v.as_ref().map(|v| v.split(',').collect::>())), + ), + }; + + dbg!(num_index_cols.clone()); + dbg!(stats_columns.clone()); + + let writer_stats_config = WriterStatsConfig { + num_indexed_cols: num_index_cols.clone(), + stats_columns: stats_columns + .clone() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + }; // Here we need to validate if the new data conforms to a predicate if one is provided let add_actions = write_execution_plan_with_predicate( predicate.clone(), @@ -736,6 +795,7 @@ impl std::future::IntoFuture for WriteBuilder { this.writer_properties.clone(), this.safe_cast, this.schema_mode, + writer_stats_config.clone(), ) .await?; actions.extend(add_actions); @@ -772,6 +832,7 @@ impl std::future::IntoFuture for WriteBuilder { partition_columns.clone(), this.writer_properties, deletion_timestamp, + writer_stats_config, ) .await?; if !predicate_actions.is_empty() { diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index c778ddfad5..0b266f7827 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -79,6 +79,10 @@ pub struct WriterConfig { /// Row chunks passed to parquet writer. This and the internal parquet writer settings /// determine how fine granular we can track / control the size of resulting files. write_batch_size: usize, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } impl WriterConfig { @@ -89,6 +93,8 @@ impl WriterConfig { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + num_indexed_cols: i32, + stats_columns: Option>, ) -> Self { let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() @@ -104,6 +110,8 @@ impl WriterConfig { writer_properties, target_file_size, write_batch_size, + num_indexed_cols, + stats_columns, } } @@ -177,8 +185,12 @@ impl DeltaWriter { Some(self.config.target_file_size), Some(self.config.write_batch_size), )?; - let mut writer = - PartitionWriter::try_with_config(self.object_store.clone(), config)?; + let mut writer = PartitionWriter::try_with_config( + self.object_store.clone(), + config, + self.config.num_indexed_cols, + self.config.stats_columns.clone(), + )?; writer.write(&record_batch).await?; let _ = self.partition_writers.insert(partition_key, writer); } @@ -269,6 +281,10 @@ pub(crate) struct PartitionWriter { arrow_writer: ArrowWriter, part_counter: usize, files_written: Vec, + /// Num index cols to collect stats for + num_indexed_cols: i32, + /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols + stats_columns: Option>, } impl PartitionWriter { @@ -276,6 +292,8 @@ impl PartitionWriter { pub fn try_with_config( object_store: ObjectStoreRef, config: PartitionWriterConfig, + num_indexed_cols: i32, + stats_columns: Option>, ) -> DeltaResult { let buffer = ShareableBuffer::default(); let arrow_writer = ArrowWriter::try_new( @@ -292,6 +310,8 @@ impl PartitionWriter { arrow_writer, part_counter: 0, files_written: Vec::new(), + num_indexed_cols, + stats_columns, }) } @@ -349,6 +369,8 @@ impl PartitionWriter { path.to_string(), file_size, &metadata, + self.num_indexed_cols, + &self.stats_columns, ) .map_err(|err| WriteError::CreateAdd { source: Box::new(err), diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs index 28b06e8f79..05fb0c53ca 100644 --- a/crates/core/src/table/config.rs +++ b/crates/core/src/table/config.rs @@ -208,6 +208,9 @@ macro_rules! table_config { /// Well known delta table configuration pub struct TableConfig<'a>(pub(crate) &'a HashMap>); +/// Default num index cols +pub const DEFAULT_NUM_INDEX_COLS: i32 = 32; + impl<'a> TableConfig<'a> { table_config!( ( diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 8cc908320e..72d6ffff42 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -27,6 +27,7 @@ use crate::errors::DeltaTableError; use crate::kernel::{Add, PartitionsExt, Scalar, StructType}; use crate::storage::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -368,6 +369,8 @@ impl DeltaWriter> for JsonWriter { path.to_string(), file_size, &metadata, + DEFAULT_NUM_INDEX_COLS, + &None, )?); } Ok(actions) diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 0effca3693..08a1331731 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -32,6 +32,7 @@ use crate::kernel::{Action, Add, PartitionsExt, Scalar, StructType}; use crate::operations::cast::merge_schema; use crate::storage::ObjectStoreRetryExt; use crate::table::builder::DeltaTableBuilder; +use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::DeltaTable; /// Writes messages to a delta lake table. @@ -224,6 +225,8 @@ impl DeltaWriter for RecordBatchWriter { path.to_string(), file_size, &metadata, + DEFAULT_NUM_INDEX_COLS, + &None, )?); } Ok(actions) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 312de6f9e3..9848322a46 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -3,6 +3,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; use indexmap::IndexMap; +use parquet::column; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{basic::LogicalType, errors::ParquetError}; @@ -21,8 +22,15 @@ pub fn create_add( path: String, size: i64, file_metadata: &FileMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, ) -> Result { - let stats = stats_from_file_metadata(partition_values, file_metadata)?; + let stats = stats_from_file_metadata( + partition_values, + file_metadata, + num_indexed_cols, + stats_columns, + )?; let stats_string = serde_json::to_string(&stats)?; // Determine the modification timestamp to include in the add action - milliseconds since epoch @@ -61,6 +69,8 @@ pub fn create_add( fn stats_from_file_metadata( partition_values: &IndexMap, file_metadata: &FileMetaData, + num_indexed_cols: i32, + stats_columns: &Option>, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; @@ -76,7 +86,13 @@ fn stats_from_file_metadata( .collect(); let row_group_metadata = row_group_metadata?; - for i in 0..schema_descriptor.num_columns() { + let number_to_iterate = if let Some(cols) = stats_columns { + schema_descriptor.num_columns() + } else { + num_indexed_cols as usize + }; + + for i in 0..number_to_iterate { let column_descr = schema_descriptor.column(i); let column_path = column_descr.path(); @@ -87,6 +103,12 @@ fn stats_from_file_metadata( continue; } + if let Some(cols) = stats_columns { + if !cols.contains(&column_descr.name().to_string()) { + continue; + } + } + let maybe_stats: Option = row_group_metadata .iter() .map(|g| { From 68c781771c440747f1a69b25c2110e88c151e441 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 16:44:22 +0200 Subject: [PATCH 04/16] rm dbgs --- crates/core/src/operations/write.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index bc3c08050a..598b96449a 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -773,9 +773,6 @@ impl std::future::IntoFuture for WriteBuilder { ), }; - dbg!(num_index_cols.clone()); - dbg!(stats_columns.clone()); - let writer_stats_config = WriterStatsConfig { num_indexed_cols: num_index_cols.clone(), stats_columns: stats_columns From 82e4e404d6612e9ab765e4131ffb0aabaad1a04d Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 16:52:31 +0200 Subject: [PATCH 05/16] clippy --- crates/core/src/operations/update.rs | 5 +---- crates/core/src/operations/write.rs | 20 +++++++++----------- crates/core/src/writer/stats.rs | 1 - 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index d9e0f4655e..e60e9b73b3 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -47,10 +47,7 @@ use super::{ datafusion_utils::Expression, transaction::{CommitBuilder, CommitProperties}, }; -use super::{ - transaction::{TableReference, PROTOCOL}, - write::WriterStatsConfig, -}; +use super::{transaction::PROTOCOL, write::WriterStatsConfig}; use crate::delta_datafusion::{ expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn, DeltaSessionContext, diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 598b96449a..8a0d1832b8 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -25,7 +25,6 @@ //! ```` use std::collections::HashMap; -use std::hash::Hash; use std::str::FromStr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -34,7 +33,6 @@ use std::vec; use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; -use datafusion::execution::config; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::filter::FilterExec; @@ -54,7 +52,7 @@ use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, Snapshot, StructType}; +use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::operations::cast::{cast_record_batch, merge_schema}; use crate::protocol::{DeltaOperation, SaveMode}; @@ -478,6 +476,7 @@ pub(crate) async fn write_execution_plan( .await } +#[allow(clippy::too_many_arguments)] async fn execute_non_empty_expr( snapshot: &DeltaTableState, log_store: LogStoreRef, @@ -531,6 +530,7 @@ async fn execute_non_empty_expr( } // This should only be called wth a valid predicate +#[allow(clippy::too_many_arguments)] async fn prepare_predicate_actions( predicate: Expr, log_store: LogStoreRef, @@ -753,19 +753,17 @@ impl std::future::IntoFuture for WriteBuilder { _ => (None, None), }; - let config = if let Some(snapshot) = &this.snapshot { - Some(snapshot.table_config()) - } else { - None - }; + let config = this + .snapshot + .as_ref() + .map(|snapshot| snapshot.table_config()); let (num_index_cols, stats_columns) = match &config { Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()), _ => ( this.configuration .get("delta.dataSkippingNumIndexedCols") - .map(|v| v.clone().map(|v| v.parse::().unwrap())) - .flatten() + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) .unwrap_or(DEFAULT_NUM_INDEX_COLS), this.configuration .get("delta.dataSkippingStatsColumns") @@ -774,7 +772,7 @@ impl std::future::IntoFuture for WriteBuilder { }; let writer_stats_config = WriterStatsConfig { - num_indexed_cols: num_index_cols.clone(), + num_indexed_cols: num_index_cols, stats_columns: stats_columns .clone() .map(|v| v.iter().map(|v| v.to_string()).collect::>()), diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 9848322a46..35818a2fd3 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -3,7 +3,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, ops::AddAssign}; use indexmap::IndexMap; -use parquet::column; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; use parquet::{basic::LogicalType, errors::ParquetError}; From 2b04a13bbabd12fd25739691090b221117ccd886 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 17:40:23 +0200 Subject: [PATCH 06/16] add tests --- crates/core/src/writer/stats.rs | 12 ++--- python/deltalake/writer.py | 2 +- python/tests/test_delete.py | 48 +++++++++++++++++++ python/tests/test_merge.py | 61 ++++++++++++++++++++++++ python/tests/test_update.py | 48 +++++++++++++++++++ python/tests/test_writer.py | 84 +++++++++++++++++++++++++++++++++ 6 files changed, 248 insertions(+), 7 deletions(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 35818a2fd3..ce9d166a60 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -1,7 +1,3 @@ -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, ops::AddAssign}; - use indexmap::IndexMap; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; @@ -10,6 +6,10 @@ use parquet::{ file::{metadata::RowGroupMetaData, statistics::Statistics}, format::TimeUnit, }; +use std::cmp::min; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{collections::HashMap, ops::AddAssign}; use super::*; use crate::kernel::{Add, Scalar}; @@ -85,10 +85,10 @@ fn stats_from_file_metadata( .collect(); let row_group_metadata = row_group_metadata?; - let number_to_iterate = if let Some(cols) = stats_columns { + let number_to_iterate = if stats_columns.is_some() { schema_descriptor.num_columns() } else { - num_indexed_cols as usize + min(num_indexed_cols as usize, schema_descriptor.num_columns()) }; for i in 0..number_to_iterate { diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 921f66a615..941332d196 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -746,7 +746,7 @@ def iter_groups(metadata: Any) -> Iterator[Any]: if columns_to_collect_stats is not None: columns_to_iterate = metadata.num_columns else: - columns_to_iterate = num_indexed_cols + columns_to_iterate = min(num_indexed_cols, metadata.num_columns) for column_idx in range(columns_to_iterate): name = metadata.row_group(0).column(column_idx).path_in_schema diff --git a/python/tests/test_delete.py b/python/tests/test_delete.py index 519af0c935..65b5ebdec3 100644 --- a/python/tests/test_delete.py +++ b/python/tests/test_delete.py @@ -81,3 +81,51 @@ def test_delete_large_dtypes( table = dt.to_pyarrow_table() assert table.equals(expected_table) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_delete_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + dt.delete("bar == 3") + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 1 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 1 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 9628cad5f3..6c946afbb8 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -855,3 +855,64 @@ def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, pre assert last_action["operation"] == "MERGE" assert result == data assert last_action["operationParameters"].get("predicate") == predicate + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_merge_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + data = pa.table( + { + "foo": pa.array(["a"]), + "bar": pa.array([10]), + "baz": pa.array([10]), + } + ) + + dt.merge( + data, + predicate="source.foo = target.foo", + source_alias="source", + target_alias="target", + ).when_matched_update_all().execute() + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 10 diff --git a/python/tests/test_update.py b/python/tests/test_update.py index fcc17cf027..74ae130224 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -234,3 +234,51 @@ def test_update_with_incorrect_updates_input( str(excinfo.value) == "Invalid datatype provided in new_values, only int, float, bool, list, str or datetime or accepted." ) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_update_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + dt.update({"foo": "'hello world'"}) + + dt = DeltaTable(tmp_path) + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert dt.version() == 1 + assert stats["null_count.foo"] == 0 + assert stats["min.foo"] == "hello world" + assert stats["max.foo"] == "hello world" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index eb8244dbb3..4fa4fdb932 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1528,3 +1528,87 @@ def test_rust_decimal_cast(tmp_path: pathlib.Path): write_deltalake( tmp_path, data, mode="append", schema_mode="merge", engine="rust" ) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_stats_column_idx(tmp_path: pathlib.Path, engine): + def _check_stats(dt: DeltaTable): + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] == 1 + assert stats["min.bar"] == 1 + assert stats["max.bar"] == 3 + assert stats["null_count.baz"] is None + assert stats["min.baz"] is None + assert stats["max.baz"] is None + + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingNumIndexedCols": "2"}, + ) + + dt = DeltaTable(tmp_path) + _check_stats(dt) + + # Check if it properly takes skippingNumIndexCols from the config in the table + write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + + dt = DeltaTable(tmp_path) + assert dt.version() == 1 + _check_stats(dt) + + +@pytest.mark.parametrize("engine", ["pyarrow", "rust"]) +def test_write_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): + def _check_stats(dt: DeltaTable): + add_actions_table = dt.get_add_actions(flatten=True) + stats = add_actions_table.to_pylist()[0] + + assert stats["null_count.foo"] == 2 + assert stats["min.foo"] == "a" + assert stats["max.foo"] == "b" + assert stats["null_count.bar"] is None + assert stats["min.bar"] is None + assert stats["max.bar"] is None + assert stats["null_count.baz"] == 2 + assert stats["min.baz"] == 1 + assert stats["max.baz"] == 1 + + data = pa.table( + { + "foo": pa.array(["a", "b", None, None]), + "bar": pa.array([1, 2, 3, None]), + "baz": pa.array([1, 1, None, None]), + } + ) + write_deltalake( + tmp_path, + data, + mode="append", + engine=engine, + configuration={"delta.dataSkippingStatsColumns": "foo,baz"}, + ) + + dt = DeltaTable(tmp_path) + _check_stats(dt) + + # Check if it properly takes skippingNumIndexCols from the config in the table + write_deltalake(tmp_path, data, mode="overwrite", engine=engine) + + dt = DeltaTable(tmp_path) + assert dt.version() == 1 + _check_stats(dt) From faa57f0c85694a05fe5a19b9de6a90d8ea673a72 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 17:49:42 +0200 Subject: [PATCH 07/16] lint --- python/deltalake/writer.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 941332d196..3fe2fcdff7 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -269,17 +269,15 @@ def write_deltalake( else: DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS = 32 if configuration is not None: - num_indexed_cols = configuration.get( - "delta.dataSkippingNumIndexedCols", DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS - ) - if num_indexed_cols is None: - num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS + if "delta.dataSkippingNumIndexedCols" in configuration: + str_indexed_cols = configuration["delta.dataSkippingNumIndexedCols"] + if str_indexed_cols is not None: + num_indexed_cols = int(str_indexed_cols) else: - num_indexed_cols = int(num_indexed_cols) - - stats_cols = configuration.get("delta.dataSkippingStatsColumns") - if isinstance(stats_cols, str): - stats_cols = stats_cols.split(",") + num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS + string_stats_cols = configuration.get("delta.dataSkippingStatsColumns") + if isinstance(string_stats_cols, str): + stats_cols = string_stats_cols.split(",") if stats_cols and isinstance(stats_cols, list): if not all(isinstance(inner, str) for inner in stats_cols): raise ValueError( From 6c4908bc50f79c640ff2551cf86dacf6b4be44ae Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 17:55:41 +0200 Subject: [PATCH 08/16] add missing params --- crates/core/src/operations/writer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 0b266f7827..6c22cf6828 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -422,6 +422,7 @@ impl PartitionWriter { mod tests { use super::*; use crate::storage::utils::flatten_list_stream as list; + use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::test_utils::*; use crate::DeltaTableBuilder; use arrow::array::{Int32Array, StringArray}; @@ -441,6 +442,8 @@ mod tests { writer_properties, target_file_size, write_batch_size, + DEFAULT_NUM_INDEX_COLS, + None, ); DeltaWriter::new(object_store, config) } @@ -460,7 +463,8 @@ mod tests { write_batch_size, ) .unwrap(); - PartitionWriter::try_with_config(object_store, config).unwrap() + PartitionWriter::try_with_config(object_store, config, DEFAULT_NUM_INDEX_COLS, None) + .unwrap() } #[tokio::test] From fec7f7e9edf86c0fa5e6c1ef173641bb152c5cc6 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 18:13:05 +0200 Subject: [PATCH 09/16] handle -1 --- crates/core/src/writer/stats.rs | 10 +++++++++- python/deltalake/writer.py | 9 ++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index ce9d166a60..f4349d9ac8 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -88,7 +88,15 @@ fn stats_from_file_metadata( let number_to_iterate = if stats_columns.is_some() { schema_descriptor.num_columns() } else { - min(num_indexed_cols as usize, schema_descriptor.num_columns()) + if num_indexed_cols == -1 { + schema_descriptor.num_columns() + } else if num_indexed_cols >= 0 { + min(num_indexed_cols as usize, schema_descriptor.num_columns()) + } else { + return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic( + "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(), + ))); + } }; for i in 0..number_to_iterate { diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 3fe2fcdff7..3b6c9000bc 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -283,6 +283,8 @@ def write_deltalake( raise ValueError( "dataSkippingStatsColumns needs to be a comma-separated list of column names in string format. Example: 'foo,bar,baz'" ) + else: + stats_cols = None else: num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS stats_cols = None @@ -744,7 +746,12 @@ def iter_groups(metadata: Any) -> Iterator[Any]: if columns_to_collect_stats is not None: columns_to_iterate = metadata.num_columns else: - columns_to_iterate = min(num_indexed_cols, metadata.num_columns) + if num_indexed_cols == -1: + columns_to_iterate = metadata.num_columns + elif num_indexed_cols >= 0: + columns_to_iterate = min(num_indexed_cols, metadata.num_columns) + else: + raise ValueError("delta.dataSkippingNumIndexedCols valid values are >=-1") for column_idx in range(columns_to_iterate): name = metadata.row_group(0).column(column_idx).path_in_schema From 11c5e3e2d404538b394f1887e634fc190afe5c80 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 2 May 2024 22:25:47 +0200 Subject: [PATCH 10/16] fmt and simplify --- python/deltalake/writer.py | 12 ++++-------- python/src/lib.rs | 4 +++- python/tests/test_merge.py | 1 + python/tests/test_writer.py | 1 + 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 30e9272900..a8c54a7f80 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -66,6 +66,7 @@ _has_pandas = True PYARROW_MAJOR_VERSION = int(pa.__version__.split(".", maxsplit=1)[0]) +DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS = 32 @dataclass @@ -267,14 +268,13 @@ def write_deltalake( num_indexed_cols = table._table.get_num_index_cols() stats_cols = table._table.get_stats_columns() else: - DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS = 32 + num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS + stats_cols = None if configuration is not None: if "delta.dataSkippingNumIndexedCols" in configuration: str_indexed_cols = configuration["delta.dataSkippingNumIndexedCols"] if str_indexed_cols is not None: num_indexed_cols = int(str_indexed_cols) - else: - num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS string_stats_cols = configuration.get("delta.dataSkippingStatsColumns") if isinstance(string_stats_cols, str): stats_cols = string_stats_cols.split(",") @@ -283,11 +283,7 @@ def write_deltalake( raise ValueError( "dataSkippingStatsColumns needs to be a comma-separated list of column names in string format. Example: 'foo,bar,baz'" ) - else: - stats_cols = None - else: - num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS - stats_cols = None + __enforce_append_only(table=table, configuration=configuration, mode=mode) if overwrite_schema: schema_mode = "overwrite" diff --git a/python/src/lib.rs b/python/src/lib.rs index 347cce7aaf..822d6088ee 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -37,7 +37,9 @@ use deltalake::operations::load_cdf::CdfLoadBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; -use deltalake::operations::transaction::{CommitBuilder, CommitProperties, PROTOCOL, TableReference}; +use deltalake::operations::transaction::{ + CommitBuilder, CommitProperties, TableReference, PROTOCOL, +}; use deltalake::operations::update::UpdateBuilder; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::parquet::basic::Compression; diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index ddf22a8e6d..2349e68963 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -917,6 +917,7 @@ def test_merge_stats_columns_stats_provided(tmp_path: pathlib.Path, engine): assert stats["min.baz"] == 1 assert stats["max.baz"] == 10 + def test_merge_field_special_characters_delete_2438(tmp_path: pathlib.Path): ## See issue: https://github.com/delta-io/delta-rs/issues/2438 data = pa.table({"x": [1, 2, 3], "y--1": [4, 5, 6]}) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index c820bae397..c311b56117 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1613,6 +1613,7 @@ def _check_stats(dt: DeltaTable): assert dt.version() == 1 _check_stats(dt) + @pytest.mark.parametrize( "array", [ From c943ab8dbc67832ed634a421446c42838447b69a Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 10 May 2024 15:08:21 +0200 Subject: [PATCH 11/16] create vec of idx first --- crates/core/src/writer/stats.rs | 43 ++++++++++++++++++--------------- python/deltalake/writer.py | 19 ++++++--------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index f4349d9ac8..80ee596c10 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -1,3 +1,4 @@ +use datafusion::catalog::schema; use indexmap::IndexMap; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; @@ -6,6 +7,7 @@ use parquet::{ file::{metadata::RowGroupMetaData, statistics::Statistics}, format::TimeUnit, }; +use serde::de::value; use std::cmp::min; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -84,23 +86,30 @@ fn stats_from_file_metadata( .map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone())) .collect(); let row_group_metadata = row_group_metadata?; + let schema_cols = file_metadata + .schema + .iter() + .map(|v| &v.name) + .collect::>(); - let number_to_iterate = if stats_columns.is_some() { - schema_descriptor.num_columns() + let idx_to_iterate = if let Some(stats_cols) = stats_columns { + stats_cols + .iter() + .map(|col| schema_cols[1..].iter().position(|value| *value == col)) + .flatten() + .collect() + } else if num_indexed_cols == -1 { + (0..schema_descriptor.num_columns()).collect::>() + } else if num_indexed_cols >= 0 { + (0..min(num_indexed_cols as usize, schema_descriptor.num_columns())).collect::>() } else { - if num_indexed_cols == -1 { - schema_descriptor.num_columns() - } else if num_indexed_cols >= 0 { - min(num_indexed_cols as usize, schema_descriptor.num_columns()) - } else { - return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic( - "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(), - ))); - } + return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic( + "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(), + ))); }; - for i in 0..number_to_iterate { - let column_descr = schema_descriptor.column(i); + for idx in idx_to_iterate { + let column_descr = schema_descriptor.column(idx); let column_path = column_descr.path(); let column_path_parts = column_path.parts(); @@ -110,16 +119,10 @@ fn stats_from_file_metadata( continue; } - if let Some(cols) = stats_columns { - if !cols.contains(&column_descr.name().to_string()) { - continue; - } - } - let maybe_stats: Option = row_group_metadata .iter() .map(|g| { - g.column(i) + g.column(idx) .statistics() .map(|s| AggregatedStats::from((s, &column_descr.logical_type()))) }) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index a8c54a7f80..521f620381 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -741,22 +741,19 @@ def iter_groups(metadata: Any) -> Iterator[Any]: if metadata.row_group(i).num_rows > 0: yield metadata.row_group(i) + schema_columns = metadata.schema.names if columns_to_collect_stats is not None: - columns_to_iterate = metadata.num_columns + idx_to_iterate = [schema_columns.index(col) for col in columns_to_collect_stats] + elif num_indexed_cols == -1: + idx_to_iterate = list(range(metadata.num_columns)) + elif num_indexed_cols >= 0: + idx_to_iterate = list(range(min(num_indexed_cols, metadata.num_columns))) else: - if num_indexed_cols == -1: - columns_to_iterate = metadata.num_columns - elif num_indexed_cols >= 0: - columns_to_iterate = min(num_indexed_cols, metadata.num_columns) - else: - raise ValueError("delta.dataSkippingNumIndexedCols valid values are >=-1") + raise ValueError("delta.dataSkippingNumIndexedCols valid values are >=-1") - for column_idx in range(columns_to_iterate): + for column_idx in idx_to_iterate: name = metadata.row_group(0).column(column_idx).path_in_schema - if columns_to_collect_stats is not None: - if name not in columns_to_collect_stats: - continue # If stats missing, then we can't know aggregate stats if all( group.column(column_idx).is_stats_set for group in iter_groups(metadata) From 71d9da55f6f1f31fe5a13fcc96d9a54116839f62 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 10 May 2024 15:11:52 +0200 Subject: [PATCH 12/16] fmt --- crates/core/src/writer/stats.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index 80ee596c10..849179a973 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -1,4 +1,8 @@ -use datafusion::catalog::schema; +use std::cmp::min; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{collections::HashMap, ops::AddAssign}; + use indexmap::IndexMap; use parquet::format::FileMetaData; use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; @@ -7,11 +11,6 @@ use parquet::{ file::{metadata::RowGroupMetaData, statistics::Statistics}, format::TimeUnit, }; -use serde::de::value; -use std::cmp::min; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, ops::AddAssign}; use super::*; use crate::kernel::{Add, Scalar}; From d2586bb0cd454b70b312522355fc531b9a42d723 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 10 May 2024 15:54:44 +0200 Subject: [PATCH 13/16] rewrite config grabbing --- crates/core/src/operations/write.rs | 48 +++++++++++++++++++---------- python/deltalake/_internal.pyi | 3 ++ python/deltalake/writer.py | 38 ++++++++--------------- python/src/lib.rs | 24 +++++++++++++++ 4 files changed, 71 insertions(+), 42 deletions(-) diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index d536f38f27..1b5f27c95f 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -753,29 +753,16 @@ impl std::future::IntoFuture for WriteBuilder { _ => (None, None), }; - let config = this + let config: Option> = this .snapshot .as_ref() .map(|snapshot| snapshot.table_config()); - let (num_index_cols, stats_columns) = match &config { - Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()), - _ => ( - this.configuration - .get("delta.dataSkippingNumIndexedCols") - .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) - .unwrap_or(DEFAULT_NUM_INDEX_COLS), - this.configuration - .get("delta.dataSkippingStatsColumns") - .and_then(|v| v.as_ref().map(|v| v.split(',').collect::>())), - ), - }; + let (num_indexed_cols, stats_columns) = get_num_idx_cols_and_stats_columns(config, this.configuration); let writer_stats_config = WriterStatsConfig { - num_indexed_cols: num_index_cols, - stats_columns: stats_columns - .clone() - .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + num_indexed_cols, + stats_columns, }; // Here we need to validate if the new data conforms to a predicate if one is provided let add_actions = write_execution_plan_with_predicate( @@ -934,6 +921,33 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE Ok(()) } +/// Get the num_idx_columns and stats_columns from the table configuration in the state +/// If table_config does not exist (only can occur in the first write action) it takes +/// the configuration that was passed to the writerBuilder. +pub fn get_num_idx_cols_and_stats_columns( + config: Option>, + configuration: HashMap>, +) -> (i32, Option>) { + let (num_index_cols, stats_columns) = match &config { + Some(conf) => (conf.num_indexed_cols(), conf.stats_columns()), + _ => ( + configuration + .get("delta.dataSkippingNumIndexedCols") + .and_then(|v| v.clone().map(|v| v.parse::().unwrap())) + .unwrap_or(DEFAULT_NUM_INDEX_COLS), + configuration + .get("delta.dataSkippingStatsColumns") + .and_then(|v| v.as_ref().map(|v| v.split(',').collect::>())), + ), + }; + ( + num_index_cols, + stats_columns + .clone() + .map(|v| v.iter().map(|v| v.to_string()).collect::>()), + ) +} + #[cfg(test)] mod tests { use super::*; diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index e580243f87..9e338e17b0 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -215,6 +215,9 @@ def create_deltalake( custom_metadata: Optional[Dict[str, str]], ) -> None: ... def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ... +def get_num_idx_cols_and_stats_columns( + table: Optional[RawDeltaTable], configuration: Optional[Mapping[str, Optional[str]]] +) -> Tuple[int, Optional[List[str]]]: ... # Can't implement inheritance (see note in src/schema.rs), so this is next # best thing. diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 521f620381..f18aa6002a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -41,6 +41,9 @@ from ._internal import DeltaDataChecker as _DeltaDataChecker from ._internal import batch_distinct from ._internal import convert_to_deltalake as _convert_to_deltalake +from ._internal import ( + get_num_idx_cols_and_stats_columns as get_num_idx_cols_and_stats_columns, +) from ._internal import write_new_deltalake as write_deltalake_pyarrow from ._internal import write_to_deltalake as write_deltalake_rust from .exceptions import DeltaProtocolError, TableNotFoundError @@ -260,30 +263,10 @@ def write_deltalake( custom_metadata: Custom metadata to add to the commitInfo. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) - if table is not None: - storage_options = table._storage_options or {} - storage_options.update(storage_options or {}) - - table.update_incremental() - num_indexed_cols = table._table.get_num_index_cols() - stats_cols = table._table.get_stats_columns() - else: - num_indexed_cols = DEFAULT_DATA_SKIPPING_NUM_INDEX_COLS - stats_cols = None - if configuration is not None: - if "delta.dataSkippingNumIndexedCols" in configuration: - str_indexed_cols = configuration["delta.dataSkippingNumIndexedCols"] - if str_indexed_cols is not None: - num_indexed_cols = int(str_indexed_cols) - string_stats_cols = configuration.get("delta.dataSkippingStatsColumns") - if isinstance(string_stats_cols, str): - stats_cols = string_stats_cols.split(",") - if stats_cols and isinstance(stats_cols, list): - if not all(isinstance(inner, str) for inner in stats_cols): - raise ValueError( - "dataSkippingStatsColumns needs to be a comma-separated list of column names in string format. Example: 'foo,bar,baz'" - ) - + num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns( + table._table if table is not None else None, configuration + ) + print(num_indexed_cols, stats_cols) __enforce_append_only(table=table, configuration=configuration, mode=mode) if overwrite_schema: schema_mode = "overwrite" @@ -743,7 +726,12 @@ def iter_groups(metadata: Any) -> Iterator[Any]: schema_columns = metadata.schema.names if columns_to_collect_stats is not None: - idx_to_iterate = [schema_columns.index(col) for col in columns_to_collect_stats] + idx_to_iterate = [] + for col in columns_to_collect_stats: + try: + idx_to_iterate.append(schema_columns.index(col)) + except ValueError: + pass elif num_indexed_cols == -1: idx_to_iterate = list(range(metadata.num_columns)) elif num_indexed_cols >= 0: diff --git a/python/src/lib.rs b/python/src/lib.rs index 822d6088ee..faea1f236b 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1720,6 +1720,26 @@ fn convert_to_deltalake( Ok(()) } +#[pyfunction] +fn get_num_idx_cols_and_stats_columns( + table: Option<&RawDeltaTable>, + configuration: Option>>, +) -> PyResult<(i32, Option>)> { + let config = table + .as_ref() + .map(|table| table._table.snapshot()) + .transpose() + .map_err(PythonError::from)? + .map(|snapshot| snapshot.table_config()); + + Ok( + deltalake::operations::write::get_num_idx_cols_and_stats_columns( + config, + configuration.unwrap_or_default(), + ), + ) +} + #[pyclass(name = "DeltaDataChecker", module = "deltalake._internal")] struct PyDeltaDataChecker { inner: DeltaDataChecker, @@ -1776,6 +1796,10 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_function(pyo3::wrap_pyfunction!(write_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(convert_to_deltalake, m)?)?; m.add_function(pyo3::wrap_pyfunction!(batch_distinct, m)?)?; + m.add_function(pyo3::wrap_pyfunction!( + get_num_idx_cols_and_stats_columns, + m + )?)?; m.add_class::()?; m.add_class::()?; m.add_class::()?; From f2662d0746bfbaa261c651ce71ad71884f48eab7 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 10 May 2024 15:55:51 +0200 Subject: [PATCH 14/16] fmt --- crates/core/src/operations/write.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 1b5f27c95f..f87037fa16 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -758,7 +758,8 @@ impl std::future::IntoFuture for WriteBuilder { .as_ref() .map(|snapshot| snapshot.table_config()); - let (num_indexed_cols, stats_columns) = get_num_idx_cols_and_stats_columns(config, this.configuration); + let (num_indexed_cols, stats_columns) = + get_num_idx_cols_and_stats_columns(config, this.configuration); let writer_stats_config = WriterStatsConfig { num_indexed_cols, @@ -922,7 +923,7 @@ fn try_cast_batch(from_fields: &Fields, to_fields: &Fields) -> Result<(), ArrowE } /// Get the num_idx_columns and stats_columns from the table configuration in the state -/// If table_config does not exist (only can occur in the first write action) it takes +/// If table_config does not exist (only can occur in the first write action) it takes /// the configuration that was passed to the writerBuilder. pub fn get_num_idx_cols_and_stats_columns( config: Option>, From dc41b20091467f7bccfed2adc5b9c14e2bbcf9c8 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 10 May 2024 16:02:38 +0200 Subject: [PATCH 15/16] rm print --- python/deltalake/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index f18aa6002a..91e7920e7d 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -266,7 +266,7 @@ def write_deltalake( num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns( table._table if table is not None else None, configuration ) - print(num_indexed_cols, stats_cols) + __enforce_append_only(table=table, configuration=configuration, mode=mode) if overwrite_schema: schema_mode = "overwrite" From 56b0a3e6ab159989fc5aaf1c047c761d5cea755c Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 10 May 2024 16:50:39 +0200 Subject: [PATCH 16/16] add by accident removed lines back --- python/deltalake/writer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 91e7920e7d..0fb1eeedc0 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -263,9 +263,10 @@ def write_deltalake( custom_metadata: Custom metadata to add to the commitInfo. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) - num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns( - table._table if table is not None else None, configuration - ) + if table is not None: + storage_options = table._storage_options or {} + storage_options.update(storage_options or {}) + table.update_incremental() __enforce_append_only(table=table, configuration=configuration, mode=mode) if overwrite_schema: @@ -343,6 +344,10 @@ def write_deltalake( # We need to write against the latest table version filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) + num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns( + table._table if table is not None else None, configuration + ) + def sort_arrow_schema(schema: pa.schema) -> pa.schema: sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type))) return pa.schema(sorted_cols)