From 8024cf8682c06f4f8869448cc4ccc0dc030af05e Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 21 Apr 2024 23:22:09 +0200 Subject: [PATCH] always return state --- crates/core/src/operations/constraints.rs | 10 +-- crates/core/src/operations/delete.rs | 24 +++-- .../core/src/operations/drop_constraints.rs | 10 +-- crates/core/src/operations/merge/mod.rs | 28 +++--- crates/core/src/operations/transaction/mod.rs | 88 +++++++++---------- crates/core/src/operations/update.rs | 30 +++---- crates/core/src/operations/write.rs | 11 +-- 7 files changed, 89 insertions(+), 112 deletions(-) diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index abdaf89e7b..a2ffc5e145 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -196,12 +196,10 @@ impl std::future::IntoFuture for ConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - let table = if let Some(new_snapshot) = commit.snapshot() { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok(table) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 2a95808b7e..525c98e05b 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -187,16 +187,16 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, -) -> DeltaResult<(Option, DeleteMetrics)> { +) -> DeltaResult<(DeltaTableState, DeleteMetrics)> { let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -206,7 +206,7 @@ async fn execute( } else { let write_start = Instant::now(); let add = excute_non_empty_expr( - snapshot, + &snapshot, log_store.clone(), &state, &predicate, @@ -259,12 +259,12 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; if actions.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot.clone(), metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; Ok((commit.snapshot(), metrics)) } @@ -302,19 +302,17 @@ impl std::future::IntoFuture for DeleteBuilder { let (new_snapshot, metrics) = execute( predicate, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, new_snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index f336c285ac..f332a52d63 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -89,12 +89,10 @@ impl std::future::IntoFuture for DropConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - let table = if let Some(new_snapshot) = commit.snapshot() { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok(table) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index adb49667c5..4dd74cd257 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -929,7 +929,7 @@ async fn execute( predicate: Expression, source: DataFrame, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, @@ -939,7 +939,7 @@ async fn execute( match_operations: Vec, not_match_target_operations: Vec, not_match_source_operations: Vec, -) -> DeltaResult<(Option, MergeMetrics)> { +) -> DeltaResult<(DeltaTableState, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); @@ -984,7 +984,7 @@ async fn execute( let scan_config = DeltaScanConfigBuilder::default() .with_file_column(true) .with_parquet_pushdown(false) - .build(snapshot)?; + .build(&snapshot)?; let target_provider = Arc::new(DeltaTableProvider::try_new( snapshot.clone(), @@ -1014,7 +1014,7 @@ async fn execute( } else { try_construct_early_filter( predicate.clone(), - snapshot, + &snapshot, &state, &source, &source_name, @@ -1369,7 +1369,7 @@ async fn execute( let rewrite_start = Instant::now(); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), write, table_partition_cols.clone(), @@ -1448,12 +1448,12 @@ async fn execute( }; if actions.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store.clone(), operation)? + .build(Some(&snapshot), log_store.clone(), operation)? .await?; Ok((commit.snapshot(), metrics)) } @@ -1512,11 +1512,11 @@ impl std::future::IntoFuture for MergeBuilder { session.state() }); - let (new_snapshot, metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.source, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -1529,12 +1529,10 @@ impl std::future::IntoFuture for MergeBuilder { ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 4057ce362b..6606a8c339 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync { fn metadata(&self) -> &Metadata; /// Try to cast this table reference to a `EagerSnapshot` - fn eager_snapshot(&self) -> Option<&EagerSnapshot>; + fn eager_snapshot(&self) -> &EagerSnapshot; } impl TableReference for EagerSnapshot { @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot { self.table_config() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(self) + fn eager_snapshot(&self) -> &EagerSnapshot { + self } } @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState { self.snapshot.metadata() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(&self.snapshot) + fn eager_snapshot(&self) -> &EagerSnapshot { + &self.snapshot } } @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { // unwrap() is safe here due to the above check // TODO: refactor to only depend on TableReference Trait - let read_snapshot = - this.table_data - .unwrap() - .eager_snapshot() - .ok_or(DeltaTableError::Generic( - "Expected an instance of EagerSnapshot".to_owned(), - ))?; + let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { @@ -595,34 +589,38 @@ pub struct PostCommit<'a> { impl<'a> PostCommit<'a> { /// Runs the post commit activities - async fn run_post_commit_hook(&self) -> DeltaResult> { + async fn run_post_commit_hook(&self) -> DeltaResult { if let Some(table) = self.table_data { - if let Some(mut snapshot) = table.eager_snapshot().cloned() { - if self.version - snapshot.version() > 1 { - // This may only occur during concurrent write actions. We need to update the state first to - 1 - // then we can advance. - snapshot - .update(self.log_store.clone(), Some(self.version - 1)) - .await?; - snapshot.advance(vec![&self.data])?; - } else { - snapshot.advance(vec![&self.data])?; - } - let state = DeltaTableState { - app_transaction_version: HashMap::new(), - snapshot, - }; - // Execute each hook - if self.create_checkpoint { - self.create_checkpoint(&state, &self.log_store, self.version) - .await?; - } - return Ok(Some(state)); + let mut snapshot = table.eager_snapshot().clone(); + if self.version - snapshot.version() > 1 { + // This may only occur during concurrent write actions. We need to update the state first to - 1 + // then we can advance. + snapshot + .update(self.log_store.clone(), Some(self.version - 1)) + .await?; + snapshot.advance(vec![&self.data])?; } else { - return Ok(None); + snapshot.advance(vec![&self.data])?; } + let state = DeltaTableState { + app_transaction_version: HashMap::new(), + snapshot, + }; + // Execute each hook + if self.create_checkpoint { + self.create_checkpoint(&state, &self.log_store, self.version) + .await?; + } + Ok(state) } else { - return Ok(None); + let state = DeltaTableState::try_new( + &Path::default(), + self.log_store.object_store(), + Default::default(), + Some(self.version), + ) + .await?; + Ok(state) } } async fn create_checkpoint( @@ -642,7 +640,7 @@ impl<'a> PostCommit<'a> { /// A commit that successfully completed pub struct FinalizedCommit { /// The new table state after a commmit - pub snapshot: Option, + pub snapshot: DeltaTableState, /// Version of the finalized commit pub version: i64, @@ -650,7 +648,7 @@ pub struct FinalizedCommit { impl FinalizedCommit { /// The new table state after a commmit - pub fn snapshot(&self) -> Option { + pub fn snapshot(&self) -> DeltaTableState { self.snapshot.clone() } /// Version of the finalized commit @@ -668,14 +666,12 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> { Box::pin(async move { match this.run_post_commit_hook().await { - Ok(snapshot) => { - return Ok(FinalizedCommit { - snapshot, - version: this.version, - }) - } - Err(err) => return Err(err), - }; + Ok(snapshot) => Ok(FinalizedCommit { + snapshot, + version: this.version, + }), + Err(err) => Err(err), + } }) } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 7f52dda6de..24032efa6d 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -167,12 +167,12 @@ async fn execute( predicate: Option, updates: HashMap, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, safe_cast: bool, -) -> DeltaResult<(Option, UpdateMetrics)> { +) -> DeltaResult<(DeltaTableState, UpdateMetrics)> { // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -188,7 +188,7 @@ async fn execute( let version = snapshot.version(); if updates.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let predicate = match predicate { @@ -213,11 +213,11 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -225,7 +225,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -348,7 +348,7 @@ async fn execute( )?); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), projection.clone(), table_partition_cols.clone(), @@ -414,7 +414,7 @@ async fn execute( let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; Ok((commit.snapshot(), metrics)) @@ -440,11 +440,11 @@ impl std::future::IntoFuture for UpdateBuilder { session.state() }); - let (new_snapshot, metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.updates, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -452,12 +452,10 @@ impl std::future::IntoFuture for UpdateBuilder { ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 415d4a27b3..9563dd15bc 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -808,16 +808,7 @@ impl std::future::IntoFuture for WriteBuilder { )? .await?; - // TODO we do not have the table config available, but since we are merging only our newly - // created actions, it may be safe to assume, that we want to include all actions. - // then again, having only some tombstones may be misleading. - if let (Some(mut snapshot), Some(new_snapshot)) = (this.snapshot, commit.snapshot) { - Ok(DeltaTable::new_with_state(this.log_store, new_snapshot)) - } else { - let mut table = DeltaTable::new(this.log_store, Default::default()); - table.update().await?; - Ok(table) - } + Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot)) }) } }