Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): post commit hook (v2), create checkpoint hook #2391

Merged
merged 9 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 125 additions & 8 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,33 @@
//! │
//! ▼
//! ┌───────────────────────────────┐
//! │ Post Commit │
//! │ │
//! │ Commit that was materialized │
//! │ to storage with post commit │
//! │ hooks to be executed │
//! └──────────────┬────────────────┘
//! │
//! ▼
//! ┌───────────────────────────────┐
//! │ Finalized Commit │
//! │ │
//! │ Commit that was materialized │
//! │ to storage │
//! │ │
//! └───────────────────────────────┘
//! └───────────────────────────────┘
//!</pre>

use std::collections::HashMap;

use chrono::Utc;
use conflict_checker::ConflictChecker;
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectStore};
use serde_json::Value;
use std::collections::HashMap;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};
use crate::checkpoints::create_checkpoint_for;
use crate::errors::DeltaTableError;
use crate::kernel::{
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, WriterFeatures,
Expand Down Expand Up @@ -293,19 +302,27 @@ impl CommitData {
}
}

#[derive(Clone, Debug, Copy)]
/// Properties for post commit hook.
pub struct PostCommitHookProperties {
create_checkpoint: bool,
}

Comment on lines +305 to +310
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this struct and just keep the config in the commit builder and commit properties for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to add support for log retention cleanup, auto compact and auto vacuüm afterwards. So perhaps still good to keep this then?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound good in that case.

#[derive(Clone, Debug)]
/// End user facing interface to be used by operations on the table.
/// Enable controling commit behaviour and modifying metadata that is written during a commit.
pub struct CommitProperties {
pub(crate) app_metadata: HashMap<String, Value>,
max_retries: usize,
create_checkpoint: bool,
}

impl Default for CommitProperties {
fn default() -> Self {
Self {
app_metadata: Default::default(),
max_retries: DEFAULT_RETRIES,
create_checkpoint: true,
}
}
}
Expand All @@ -319,13 +336,23 @@ impl CommitProperties {
self.app_metadata = HashMap::from_iter(metadata);
self
}

/// Specify if it should create a checkpoint when the commit interval condition is met
pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
self.create_checkpoint = create_checkpoint;
self
}
}

impl From<CommitProperties> for CommitBuilder {
fn from(value: CommitProperties) -> Self {
CommitBuilder {
max_retries: value.max_retries,
app_metadata: value.app_metadata,
post_commit_hook: PostCommitHookProperties {
create_checkpoint: value.create_checkpoint,
}
.into(),
..Default::default()
}
}
Expand All @@ -336,6 +363,7 @@ pub struct CommitBuilder {
actions: Vec<Action>,
app_metadata: HashMap<String, Value>,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
}

impl Default for CommitBuilder {
Expand All @@ -344,6 +372,7 @@ impl Default for CommitBuilder {
actions: Vec::new(),
app_metadata: HashMap::new(),
max_retries: DEFAULT_RETRIES,
post_commit_hook: None,
}
}
}
Expand All @@ -367,6 +396,12 @@ impl<'a> CommitBuilder {
self
}

/// Specify all the post commit hook properties
pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
self.post_commit_hook = post_commit_hook.into();
self
}

/// Prepare a Commit operation using the configured builder
pub fn build(
self,
Expand All @@ -380,6 +415,7 @@ impl<'a> CommitBuilder {
table_data,
max_retries: self.max_retries,
data,
post_commit_hook: self.post_commit_hook,
})
}
}
Expand All @@ -390,6 +426,7 @@ pub struct PreCommit<'a> {
table_data: Option<&'a dyn TableReference>,
data: CommitData,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
}

impl<'a> std::future::IntoFuture for PreCommit<'a> {
Expand All @@ -399,7 +436,7 @@ impl<'a> std::future::IntoFuture for PreCommit<'a> {
fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move { this.into_prepared_commit_future().await?.await })
Box::pin(async move { this.into_prepared_commit_future().await?.await?.await })
}
}

Expand Down Expand Up @@ -429,6 +466,7 @@ impl<'a> PreCommit<'a> {
table_data: this.table_data,
max_retries: this.max_retries,
data: this.data,
post_commit: this.post_commit_hook,
})
})
}
Expand All @@ -441,6 +479,7 @@ pub struct PreparedCommit<'a> {
data: CommitData,
table_data: Option<&'a dyn TableReference>,
max_retries: usize,
post_commit: Option<PostCommitHookProperties>,
}

impl<'a> PreparedCommit<'a> {
Expand All @@ -451,7 +490,7 @@ impl<'a> PreparedCommit<'a> {
}

impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
type Output = DeltaResult<FinalizedCommit>;
type Output = DeltaResult<PostCommit<'a>>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand All @@ -462,9 +501,12 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {

if this.table_data.is_none() {
this.log_store.write_commit_entry(0, tmp_commit).await?;
return Ok(FinalizedCommit {
return Ok(PostCommit {
version: 0,
data: this.data,
create_checkpoint: false,
log_store: this.log_store,
table_data: this.table_data,
});
}

Expand All @@ -483,10 +525,16 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
let version = read_snapshot.version() + attempt_number as i64;
match this.log_store.write_commit_entry(version, tmp_commit).await {
Ok(()) => {
return Ok(FinalizedCommit {
return Ok(PostCommit {
version,
data: this.data,
})
create_checkpoint: this
.post_commit
.map(|v| v.create_checkpoint)
.unwrap_or_default(),
log_store: this.log_store,
table_data: this.table_data,
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
let summary = WinningCommitSummary::try_new(
Expand Down Expand Up @@ -534,6 +582,54 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
}

/// Represents items for the post commit hook
pub struct PostCommit<'a> {
/// The winning version number of the commit
pub version: i64,
/// The data that was comitted to the log store
pub data: CommitData,
create_checkpoint: bool,
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
}

impl<'a> PostCommit<'a> {
/// Runs the post commit activities
async fn run_post_commit_hook(
&self,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if self.create_checkpoint {
self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data)
.await?
}
Ok(())
}
async fn create_checkpoint(
&self,
table: &Option<&'a dyn TableReference>,
log_store: &LogStoreRef,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if let Some(table) = table {
let checkpoint_interval = table.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
// We have to advance the snapshot otherwise we can't create a checkpoint
let mut snapshot = table.eager_snapshot().unwrap().clone();
snapshot.advance(vec![commit_data])?;
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
create_checkpoint_for(version, &state, log_store.as_ref()).await?
}
}
Ok(())
}
}

/// A commit that successfully completed
pub struct FinalizedCommit {
/// The winning version number of the commit
Expand All @@ -554,6 +650,27 @@ impl FinalizedCommit {
}
}

impl<'a> std::future::IntoFuture for PostCommit<'a> {
type Output = DeltaResult<FinalizedCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
match this.run_post_commit_hook(this.version, &this.data).await {
Ok(_) => {
return Ok(FinalizedCommit {
version: this.version,
data: this.data,
})
}
Err(err) => return Err(err),
};
})
}
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
use crate::{open_table_with_version, DeltaTable};

type SchemaPath = Vec<String>;

/// Error returned when there is an error during creating a checkpoint.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl<'a> TableConfig<'a> {
DeltaConfigKey::CheckpointInterval,
checkpoint_interval,
i32,
10
100
),
);

Expand Down Expand Up @@ -591,7 +591,7 @@ mod tests {
fn get_long_from_metadata_test() {
let md = dummy_metadata();
let config = TableConfig(&md.configuration);
assert_eq!(config.checkpoint_interval(), 10,)
assert_eq!(config.checkpoint_interval(), 100,)
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{DeltaResult, DeltaTableError};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeltaTableState {
app_transaction_version: HashMap<String, i64>,
pub(crate) app_transaction_version: HashMap<String, i64>,
pub(crate) snapshot: EagerSnapshot,
}

Expand Down
Loading
Loading