From 236c86ad7af30f9a1e82a1787451a84e2a65fda2 Mon Sep 17 00:00:00 2001 From: "Constantin S. Pan" Date: Sat, 9 Sep 2023 07:50:33 +0100 Subject: [PATCH] feat: allow multiple incremental commits in optimize Currently "optimize" executes the whole plan in one commit, which might fail. The larger the table, the more likely it is to fail and the more expensive the failure is. Add an option in OptimizeBuilder that allows specifying a commit interval. If that is provided, the plan executor will periodically commit the accumulated actions. --- rust/src/operations/optimize.rs | 196 +++++++++++++++++++------------- rust/tests/command_optimize.rs | 2 + 2 files changed, 120 insertions(+), 78 deletions(-) diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 6e7c240f58..c9df14212c 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow_array::RecordBatch; @@ -172,6 +172,7 @@ pub struct OptimizeBuilder<'a> { max_spill_size: usize, /// Optimize type optimize_type: OptimizeType, + min_commit_interval: Option, } impl<'a> OptimizeBuilder<'a> { @@ -188,6 +189,7 @@ impl<'a> OptimizeBuilder<'a> { max_concurrent_tasks: num_cpus::get(), max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB. optimize_type: OptimizeType::Compact, + min_commit_interval: None, } } @@ -241,6 +243,12 @@ impl<'a> OptimizeBuilder<'a> { self.max_spill_size = max_spill_size; self } + + /// Min commit interval + pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self { + self.min_commit_interval = Some(min_commit_interval); + self + } } impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { @@ -265,6 +273,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { writer_properties, this.max_concurrent_tasks, this.max_spill_size, + this.min_commit_interval, )?; let metrics = plan.execute(this.store.clone(), &this.snapshot).await?; let mut table = DeltaTable::new_with_state(this.store, this.snapshot); @@ -341,12 +350,12 @@ pub struct MergePlan { task_parameters: Arc, /// Version of the table at beginning of optimization. Used for conflict resolution. read_table_version: i64, - /// Whether to preserve insertion order within files /// Max number of concurrent tasks max_concurrent_tasks: usize, #[allow(dead_code)] /// Maximum number of bytes that are allowed to spill to disk max_spill_size: usize, + min_commit_interval: Option, } /// Parameters passed to individual merge tasks @@ -580,50 +589,50 @@ impl MergePlan { let mut actions = vec![]; // Need to move metrics and operations out of self, so we can use self in the stream - let mut metrics = std::mem::take(&mut self.metrics); + let orig_metrics = std::mem::take(&mut self.metrics); + let mut metrics = orig_metrics.clone(); + let mut total_metrics = orig_metrics.clone(); let operations = std::mem::take(&mut self.operations); - match operations { - OptimizeOperations::Compact(bins) => { - futures::stream::iter(bins) - .flat_map(|(partition, bins)| { - futures::stream::iter(bins).map(move |bin| (partition.clone(), bin)) - }) - .map(|(partition, files)| { - let object_store_ref = object_store.clone(); - let batch_stream = futures::stream::iter(files.clone()) - .then(move |file| { - let object_store_ref = object_store_ref.clone(); - async move { - let file_reader = - ParquetObjectReader::new(object_store_ref, file); - ParquetRecordBatchStreamBuilder::new(file_reader) - .await? - .build() - } - }) - .try_flatten() - .boxed(); - - let rewrite_result = tokio::task::spawn(Self::rewrite_files( - self.task_parameters.clone(), - partition, - files, - object_store.clone(), - futures::future::ready(Ok(batch_stream)), - )); - util::flatten_join_error(rewrite_result) - }) - .buffer_unordered(self.max_concurrent_tasks) - .try_for_each(|(partial_actions, partial_metrics)| { - debug!("Recording metrics for a completed partition"); - actions.extend(partial_actions); - metrics.add(&partial_metrics); - async { Ok(()) } - }) - .await?; - } + let stream = match operations { + OptimizeOperations::Compact(bins) => futures::stream::iter(bins) + .flat_map(|(partition, bins)| { + futures::stream::iter(bins).map(move |bin| (partition.clone(), bin)) + }) + .map(|(partition, files)| { + debug!( + "merging a group of {} files in partition {:?}", + files.len(), + partition, + ); + for file in files.iter() { + debug!(" file {}", file.location); + } + let object_store_ref = object_store.clone(); + let batch_stream = futures::stream::iter(files.clone()) + .then(move |file| { + let object_store_ref = object_store_ref.clone(); + async move { + let file_reader = ParquetObjectReader::new(object_store_ref, file); + ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build() + } + }) + .try_flatten() + .boxed(); + + let rewrite_result = tokio::task::spawn(Self::rewrite_files( + self.task_parameters.clone(), + partition, + files, + object_store.clone(), + futures::future::ready(Ok(batch_stream)), + )); + util::flatten_join_error(rewrite_result) + }) + .boxed(), OptimizeOperations::ZOrder(zorder_columns, bins) => { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( @@ -641,14 +650,16 @@ impl MergePlan { object_store.clone(), self.max_spill_size, )?); + let task_parameters = self.task_parameters.clone(); + let object_store = object_store.clone(); futures::stream::iter(bins) - .map(|(partition, files)| { + .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); let object_store = object_store.clone(); let rewrite_result = tokio::task::spawn(Self::rewrite_files( - self.task_parameters.clone(), + task_parameters.clone(), partition, files, object_store, @@ -656,46 +667,73 @@ impl MergePlan { )); util::flatten_join_error(rewrite_result) }) - .buffer_unordered(self.max_concurrent_tasks) - .try_for_each(|(partial_actions, partial_metrics)| { - debug!("Recording metrics for a completed partition"); - actions.extend(partial_actions); - metrics.add(&partial_metrics); - async { Ok(()) } - }) - .await?; + .boxed() } - } + }; - metrics.preserve_insertion_order = true; - if metrics.num_files_added == 0 { - metrics.files_added.min = 0; - } - if metrics.num_files_removed == 0 { - metrics.files_removed.min = 0; - } + let mut stream = stream.buffer_unordered(self.max_concurrent_tasks); - // TODO: Check for remove actions on optimized partitions. If a - // optimized partition was updated then abort the commit. Requires (#593). - if !actions.is_empty() { - let mut metadata = Map::new(); - metadata.insert("readVersion".to_owned(), self.read_table_version.into()); - let maybe_map_metrics = serde_json::to_value(metrics.clone()); - if let Ok(map) = maybe_map_metrics { - metadata.insert("operationMetrics".to_owned(), map); + let mut table = DeltaTable::new_with_state(object_store.clone(), snapshot.clone()); + + let mut last_commit = Instant::now(); + loop { + let next = stream.next().await.transpose()?; + + let end = next.is_none(); + + if let Some((partial_actions, partial_metrics)) = next { + debug!("Recording metrics for a completed partition"); + actions.extend(partial_actions); + metrics.add(&partial_metrics); + total_metrics.add(&partial_metrics); } - commit( - object_store.as_ref(), - &actions, - self.task_parameters.input_parameters.clone().into(), - snapshot, - Some(metadata), - ) - .await?; + let now = Instant::now(); + let mature = match self.min_commit_interval { + None => false, + Some(i) => now.duration_since(last_commit) > i, + }; + if !actions.is_empty() && (mature || end) { + let actions = std::mem::take(&mut actions); + last_commit = now; + + metrics.preserve_insertion_order = true; + let mut metadata = Map::new(); + metadata.insert("readVersion".to_owned(), self.read_table_version.into()); + let maybe_map_metrics = + serde_json::to_value(std::mem::replace(&mut metrics, orig_metrics.clone())); + if let Ok(map) = maybe_map_metrics { + metadata.insert("operationMetrics".to_owned(), map); + } + + table.update_incremental(None).await?; + debug!("committing {} actions", actions.len()); + //// TODO: Check for remove actions on optimized partitions. If a + //// optimized partition was updated then abort the commit. Requires (#593). + commit( + table.object_store().as_ref(), + &actions, + self.task_parameters.input_parameters.clone().into(), + table.get_state(), + Some(metadata), + ) + .await?; + } + + if end { + break; + } + } + + total_metrics.preserve_insertion_order = true; + if total_metrics.num_files_added == 0 { + total_metrics.files_added.min = 0; + } + if total_metrics.num_files_removed == 0 { + total_metrics.files_removed.min = 0; } - Ok(metrics) + Ok(total_metrics) } } @@ -729,6 +767,7 @@ pub fn create_merge_plan( writer_properties: WriterProperties, max_concurrent_tasks: usize, max_spill_size: usize, + min_commit_interval: Option, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); @@ -769,6 +808,7 @@ pub fn create_merge_plan( read_table_version: snapshot.version(), max_concurrent_tasks, max_spill_size, + min_commit_interval, }) } diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 78ec191704..046a9571e8 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -276,6 +276,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { WriterProperties::builder().build(), 1, 20, + None, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); @@ -346,6 +347,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { WriterProperties::builder().build(), 1, 20, + None, )?; let uri = context.tmp_dir.path().to_str().to_owned().unwrap();