diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index edabaa91c0..c826e57bde 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -185,7 +185,7 @@ pub struct OptimizeBuilder<'a> { optimize_type: OptimizeType, min_commit_interval: Option, /// Indicates whether the writes should be committed - commit_writes: bool + commit_writes: bool, } impl<'a> OptimizeBuilder<'a> { @@ -273,17 +273,15 @@ impl<'a> OptimizeBuilder<'a> { impl<'a> OptimizeBuilder<'a> { /// Commit writes after processing - pub async fn commit_writes( - self, - commit_info: CommitContext - ) -> DeltaResult { + pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult { commit( - self.log_store.as_ref(), - &commit_info.actions, - commit_info.operation.clone().into(), - commit_info.snapshot.as_ref(), - commit_info.app_metadata.clone() - ).await?; + self.log_store.as_ref(), + &commit_info.actions, + commit_info.operation.clone().into(), + commit_info.snapshot.as_ref(), + commit_info.app_metadata.clone(), + ) + .await?; let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot); @@ -324,7 +322,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.max_spill_size, this.min_commit_interval, this.app_metadata, - this.commit_writes + this.commit_writes, ) .await?; @@ -657,9 +655,8 @@ impl MergePlan { max_spill_size: usize, min_commit_interval: Option, app_metadata: Option>, - commit_writes: bool + commit_writes: bool, ) -> Result<(Metrics, Option), DeltaTableError> { - let operations = std::mem::take(&mut self.operations); let stream = match operations { OptimizeOperations::Compact(bins) => futures::stream::iter(bins) @@ -794,7 +791,6 @@ impl MergePlan { table.update().await?; if commit_writes { - debug!("committing {} actions", actions.len()); //// TODO: Check for remove actions on optimized partitions. If a //// optimized partition was updated then abort the commit. Requires (#593). @@ -806,12 +802,10 @@ impl MergePlan { Some(app_metadata.clone()), ) .await?; - } else { // Save the actions buffer so the client can commit later total_actions.extend(actions) } - } if end { @@ -820,12 +814,12 @@ impl MergePlan { } let commit_context = if !commit_writes { - Some(CommitContext{ - actions: total_actions, - app_metadata, - snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once - operation: self.task_parameters.input_parameters.clone() - }) + Some(CommitContext { + actions: total_actions, + app_metadata, + snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once + operation: self.task_parameters.input_parameters.clone(), + }) } else { None }; @@ -838,7 +832,6 @@ impl MergePlan { total_metrics.files_removed.min = 0; } - Ok((total_metrics, commit_context)) } } diff --git a/crates/core/tests/command_optimize.rs b/crates/core/tests/command_optimize.rs index 1df93d2227..36b911d71b 100644 --- a/crates/core/tests/command_optimize.rs +++ b/crates/core/tests/command_optimize.rs @@ -240,7 +240,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box let version = dt.version(); assert_eq!(dt.get_files_count(), 5); - let optimize = DeltaOps(dt).optimize().with_target_size(2_000_000).with_commit_writes(false); + let optimize = DeltaOps(dt) + .optimize() + .with_target_size(2_000_000) + .with_commit_writes(false); let (dt, metrics, commit_context) = optimize.await?; // Still have same version, and file count, @@ -255,9 +258,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box let commit_info = dt.history(None).await?; assert_eq!(commit_info.len(), 6); - let dt_commit = DeltaOps(dt).optimize().commit_writes( - commit_context.unwrap() - ).await?; + let dt_commit = DeltaOps(dt) + .optimize() + .commit_writes(commit_context.unwrap()) + .await?; assert_eq!(version + 1, dt_commit.version()); @@ -309,7 +313,10 @@ async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box Result<(), Box Result<(), Box> { 20, Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added None, - true + true, ) .await?; assert_eq!(metrics.num_files_added, 2);