Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent writing checkpoints with a version that does not exist in table state #1863

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ enum CheckpointError {
#[error("Partition value {0} cannot be parsed from string.")]
PartitionValueNotParseable(String),

/// Caller attempt to create a checkpoint for a version which does not exist on the table state
#[error("Attempted to create a checkpoint for a version {0} that does not match the table state {1}")]
StaleTableVersion(i64, i64),

/// Error returned when the parquet writer fails while writing the checkpoint.
#[error("Failed to write parquet: {}", .source)]
Parquet {
Expand All @@ -60,6 +64,7 @@ impl From<CheckpointError> for ProtocolError {
match value {
CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()),
CheckpointError::Arrow { source } => Self::Arrow { source },
CheckpointError::StaleTableVersion(..) => Self::Generic(value.to_string()),
CheckpointError::Parquet { source } => Self::ParquetParseError { source },
}
}
Expand Down Expand Up @@ -117,6 +122,14 @@ pub async fn create_checkpoint_for(
state: &DeltaTableState,
log_store: &dyn LogStore,
) -> Result<(), ProtocolError> {
if version != state.version() {
error!(
"create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded",
state.version()
);
return Err(CheckpointError::StaleTableVersion(version, state.version()).into());
Comment on lines +126 to +130
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just checkout the correct version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 how do you mean "checkout?" I am less versed in the nuance of checkpoints. If the last version in the table is 100, is it valid to call this with _last_checkpoint of version: 5?

I honestly don't know whether we should even allow a user-specified version here at all, rather than creating a checkpoint with the version of the table state.

HOWEVER, if a writer has a stale table state creates _last_checkpoint after another writer has created a later checkpoint, this becomes confusing and out of sync anyways 😖

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to expand, I like the failure option here because the caller is asking us to do something here that will result in incorrect state, and we don't know what they intend or what performance concerns they have. I.e. I don't think we should reload the state here, as an example, because that would be surprising and may actually result in a later version than the user specified too

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like (a) failing is the easiest solution that avoids any potential new bug introduction and (b) reloading correct version could have unexpected perf impact, esp for things like streaming workloads that are managing their own table state

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh one more thing since my thoughts are disjointed here 😆 I think we already have the path you describe covered with create_checkpoint() 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay re-read the signature. Erroring makes sense to me.

I was thinking in the situation you were describing in the issue report, it sounds like the correct thing to do is to checkout the desired version first then create the checkpoint.

}

// TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for
// an appropriate split point yet though so only writing a single part currently.
// See https://github.com/delta-io/delta-rs/issues/288
Expand Down Expand Up @@ -486,6 +499,72 @@ mod tests {
use lazy_static::lazy_static;
use serde_json::json;

use crate::operations::DeltaOps;
use crate::writer::test_utils::get_delta_schema;
use object_store::path::Path;

#[tokio::test]
async fn test_create_checkpoint_for() {
let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().clone())
.with_save_mode(crate::protocol::SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
let res = create_checkpoint_for(0, table.get_state(), table.log_store.as_ref()).await;
assert!(res.is_ok());

// Look at the "files" and verify that the _last_checkpoint has the right version
let path = Path::from("_delta_log/_last_checkpoint");
let last_checkpoint = table
.object_store()
.get(&path)
.await
.expect("Failed to get the _last_checkpoint")
.bytes()
.await
.expect("Failed to get bytes for _last_checkpoint");
let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
assert_eq!(last_checkpoint.version, 0);
}

#[tokio::test]
async fn test_create_checkpoint_for_invalid_version() {
let table_schema = get_delta_schema();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().clone())
.with_save_mode(crate::protocol::SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
match create_checkpoint_for(1, table.get_state(), table.log_store.as_ref()).await {
Ok(_) => {
/*
* If a checkpoint is allowed to be created here, it will use the passed in
* version, but _last_checkpoint is generated from the table state will point to a
* version 0 checkpoint.
* E.g.
*
* Path { raw: "_delta_log/00000000000000000000.json" }
* Path { raw: "_delta_log/00000000000000000001.checkpoint.parquet" }
* Path { raw: "_delta_log/_last_checkpoint" }
*
*/
panic!(
"We should not allow creating a checkpoint for a version which doesn't exist!"
);
}
Err(_) => { /* We should expect an error in the "right" case */ }
}
}

#[test]
fn typed_partition_value_from_string_test() {
let string_value: Value = "Hello World!".into();
Expand Down
Loading