Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
gautham acharya authored and rtyler committed Mar 9, 2024
1 parent f82cbd3 commit 2786e92
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
41 changes: 17 additions & 24 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct OptimizeBuilder<'a> {
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
/// Indicates whether the writes should be committed
commit_writes: bool
commit_writes: bool,
}

impl<'a> OptimizeBuilder<'a> {
Expand Down Expand Up @@ -273,17 +273,15 @@ impl<'a> OptimizeBuilder<'a> {

impl<'a> OptimizeBuilder<'a> {
/// Commit writes after processing
pub async fn commit_writes(
self,
commit_info: CommitContext
) -> DeltaResult<DeltaTable> {
pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult<DeltaTable> {
commit(
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone()
).await?;
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone(),
)
.await?;

let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot);

Expand Down Expand Up @@ -324,7 +322,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.max_spill_size,
this.min_commit_interval,
this.app_metadata,
this.commit_writes
this.commit_writes,
)
.await?;

Expand Down Expand Up @@ -657,9 +655,8 @@ impl MergePlan {
max_spill_size: usize,
min_commit_interval: Option<Duration>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
commit_writes: bool
commit_writes: bool,
) -> Result<(Metrics, Option<CommitContext>), DeltaTableError> {

let operations = std::mem::take(&mut self.operations);
let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
Expand Down Expand Up @@ -794,7 +791,6 @@ impl MergePlan {
table.update().await?;

if commit_writes {

debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
Expand All @@ -806,12 +802,10 @@ impl MergePlan {
Some(app_metadata.clone()),
)
.await?;

} else {
// Save the actions buffer so the client can commit later
total_actions.extend(actions)
}

}

if end {
Expand All @@ -820,12 +814,12 @@ impl MergePlan {
}

let commit_context = if !commit_writes {
Some(CommitContext{
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone()
})
Some(CommitContext {
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone(),
})
} else {
None
};
Expand All @@ -838,7 +832,6 @@ impl MergePlan {
total_metrics.files_removed.min = 0;
}


Ok((total_metrics, commit_context))
}
}
Expand Down
26 changes: 17 additions & 9 deletions crates/core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box
let version = dt.version();
assert_eq!(dt.get_files_count(), 5);

let optimize = DeltaOps(dt).optimize().with_target_size(2_000_000).with_commit_writes(false);
let optimize = DeltaOps(dt)
.optimize()
.with_target_size(2_000_000)
.with_commit_writes(false);
let (dt, metrics, commit_context) = optimize.await?;

// Still have same version, and file count,
Expand All @@ -255,9 +258,10 @@ async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box
let commit_info = dt.history(None).await?;
assert_eq!(commit_info.len(), 6);

let dt_commit = DeltaOps(dt).optimize().commit_writes(
commit_context.unwrap()
).await?;
let dt_commit = DeltaOps(dt)
.optimize()
.commit_writes(commit_context.unwrap())
.await?;

assert_eq!(version + 1, dt_commit.version());

Expand Down Expand Up @@ -309,7 +313,10 @@ async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box<dyn E
let version = dt.version();
let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];

let optimize = DeltaOps(dt).optimize().with_filters(&filter).with_commit_writes(false);
let optimize = DeltaOps(dt)
.optimize()
.with_filters(&filter)
.with_commit_writes(false);
let (dt, metrics, commit_context) = optimize.await?;

// optimize is not committed, so we have 4 files.
Expand All @@ -321,9 +328,10 @@ async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box<dyn E
let commit_info = dt.history(None).await?;
assert_eq!(commit_info.len(), 5);

let dt_commit = DeltaOps(dt).optimize().commit_writes(
commit_context.unwrap()
).await?;
let dt_commit = DeltaOps(dt)
.optimize()
.commit_writes(commit_context.unwrap())
.await?;

assert_eq!(version + 1, dt_commit.version());

Expand Down Expand Up @@ -544,7 +552,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
20,
Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added
None,
true
true,
)
.await?;
assert_eq!(metrics.num_files_added, 2);
Expand Down

0 comments on commit 2786e92

Please sign in to comment.