Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement TableRequirement checks #689

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 302 additions & 7 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder,
UnboundPartitionSpec, ViewRepresentations,
FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, TableMetadata,
TableMetadataBuilder, UnboundPartitionSpec, ViewRepresentations,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -312,29 +312,29 @@ pub enum TableRequirement {
LastAssignedFieldIdMatch {
/// The last assigned field id of the table to assert.
#[serde(rename = "last-assigned-field-id")]
last_assigned_field_id: i64,
last_assigned_field_id: i32,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced type changes to match TableMetadata and TableUpdate

},
/// The table's current schema id must match the requirement.
#[serde(rename = "assert-current-schema-id")]
CurrentSchemaIdMatch {
/// Current schema id of the table to assert.
#[serde(rename = "current-schema-id")]
current_schema_id: i64,
current_schema_id: SchemaId,
},
/// The table's last assigned partition id must match the
/// requirement.
#[serde(rename = "assert-last-assigned-partition-id")]
LastAssignedPartitionIdMatch {
/// Last assigned partition id of the table to assert.
#[serde(rename = "last-assigned-partition-id")]
last_assigned_partition_id: i64,
last_assigned_partition_id: i32,
},
/// The table's default spec id must match the requirement.
#[serde(rename = "assert-default-spec-id")]
DefaultSpecIdMatch {
/// Default spec id of the table to assert.
#[serde(rename = "default-spec-id")]
default_spec_id: i64,
default_spec_id: i32,
},
/// The table's default sort order id must match the requirement.
#[serde(rename = "assert-default-sort-order-id")]
Expand Down Expand Up @@ -453,6 +453,140 @@ impl TableUpdate {
}
}

impl TableRequirement {
/// Check that the requirement is met by the table metadata.
/// If the requirement is not met, an appropriate error is returned.
///
/// Provide metadata as `None` if the table does not exist.
pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
if let Some(metadata) = metadata {
match self {
TableRequirement::NotExist => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Table with id {} already exists",
metadata.uuid()
),
));
}
TableRequirement::UuidMatch { uuid } => {
if &metadata.uuid() != uuid {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Table UUID does not match",
)
.with_context("expected", *uuid)
.with_context("found", metadata.uuid()));
}
}
TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
// ToDo: Harmonize the types of current_schema_id
if metadata.current_schema_id != *current_schema_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Current schema id does not match",
)
.with_context("expected", current_schema_id.to_string())
.with_context("found", metadata.current_schema_id.to_string()));
}
}
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id,
} => {
if metadata.default_sort_order().order_id != *default_sort_order_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Default sort order id does not match",
)
.with_context("expected", default_sort_order_id.to_string())
.with_context(
"found",
metadata.default_sort_order().order_id.to_string(),
));
}
}
TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
let snapshot_ref = metadata.snapshot_for_ref(r#ref);
if let Some(snapshot_id) = snapshot_id {
let snapshot_ref = snapshot_ref.ok_or(Error::new(
ErrorKind::DataInvalid,
format!("Requirement failed: Branch or tag `{}` not found", r#ref),
))?;
if snapshot_ref.snapshot_id() != *snapshot_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Branch or tag `{}`'s snapshot has changed",
r#ref
),
)
.with_context("expected", snapshot_id.to_string())
.with_context("found", snapshot_ref.snapshot_id().to_string()));
}
} else if snapshot_ref.is_some() {
// a null snapshot ID means the ref should not exist already
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Branch or tag `{}` already exists",
r#ref
),
));
}
}
TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
// ToDo: Harmonize the types of default_spec_id
if metadata.default_partition_spec_id() != *default_spec_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Default partition spec id does not match",
)
.with_context("expected", default_spec_id.to_string())
.with_context("found", metadata.default_partition_spec_id().to_string()));
}
}
TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id,
} => {
if metadata.last_partition_id != *last_assigned_partition_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Last assigned partition id does not match",
)
.with_context("expected", last_assigned_partition_id.to_string())
.with_context("found", metadata.last_partition_id.to_string()));
}
}
TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id,
} => {
if &metadata.last_column_id != last_assigned_field_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Last assigned field id does not match",
)
.with_context("expected", last_assigned_field_id.to_string())
.with_context("found", metadata.last_column_id.to_string()));
}
}
};
} else {
match self {
TableRequirement::NotExist => {}
_ => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Table does not exist",
));
}
}
}

Ok(())
}
}

pub(super) mod _serde {
use serde::{Deserialize as _, Deserializer};

Expand Down Expand Up @@ -549,7 +683,7 @@ mod tests {
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
TableMetadata, TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -593,6 +727,167 @@ mod tests {
);
}

fn metadata() -> TableMetadata {
let tbl_creation = TableCreation::builder()
.name("table".to_string())
.location("/path/to/table".to_string())
.schema(Schema::builder().build().unwrap())
.build();

TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
.unwrap()
.build()
.unwrap()
}

#[test]
fn test_check_requirement_not_exist() {
let metadata = metadata();
let requirement = TableRequirement::NotExist;

assert!(requirement.check(Some(&metadata)).is_err());
assert!(requirement.check(None).is_ok());
}

#[test]
fn test_check_table_uuid() {
let metadata = metadata();

let requirement = TableRequirement::UuidMatch {
uuid: uuid::Uuid::now_v7(),
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::UuidMatch {
uuid: uuid::Uuid::nil(),
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_ref_snapshot_id() {
let metadata = metadata();

// Ref does not exist but should
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "my_branch".to_string(),
snapshot_id: Some(1),
};
assert!(requirement.check(Some(&metadata)).is_err());

// Ref does not exist and should not
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "my_branch".to_string(),
snapshot_id: None,
};
assert!(requirement.check(Some(&metadata)).is_ok());

// Add snapshot
let record = r#"
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 1515100955770,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s1.avro",
"schema-id": 0
}
"#;

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let mut metadata = metadata;
metadata.append_snapshot(snapshot);

// Ref exists and should matches
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(3051729675574597004),
};
assert!(requirement.check(Some(&metadata)).is_ok());

// Ref exists but does not match
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(1),
};
assert!(requirement.check(Some(&metadata)).is_err());
}

#[test]
fn test_check_last_assigned_field_id() {
let metadata = metadata();

let requirement = TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_current_schema_id() {
let metadata = metadata();

let requirement = TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_default_spec_id() {
let metadata = metadata();

let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_default_sort_order_id() {
let metadata = metadata();

let requirement = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_table_uuid() {
test_serde_json(
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl<'a> ReplaceSortOrderAction<'a> {

let requirements = vec![
TableRequirement::CurrentSchemaIdMatch {
current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64,
current_schema_id: self.tx.table.metadata().current_schema().schema_id(),
},
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id,
Expand Down
Loading