Skip to content

Commit

Permalink
feat: Implement TableRequirement checks (#689)
Browse files Browse the repository at this point in the history
* Impelment TableRequirement check

* Address comments
  • Loading branch information
c-thiel authored Nov 11, 2024
1 parent 213f84e commit 52296eb
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 8 deletions.
309 changes: 302 additions & 7 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder,
UnboundPartitionSpec, ViewRepresentations,
FormatVersion, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, TableMetadata,
TableMetadataBuilder, UnboundPartitionSpec, ViewRepresentations,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -312,29 +312,29 @@ pub enum TableRequirement {
LastAssignedFieldIdMatch {
/// The last assigned field id of the table to assert.
#[serde(rename = "last-assigned-field-id")]
last_assigned_field_id: i64,
last_assigned_field_id: i32,
},
/// The table's current schema id must match the requirement.
#[serde(rename = "assert-current-schema-id")]
CurrentSchemaIdMatch {
/// Current schema id of the table to assert.
#[serde(rename = "current-schema-id")]
current_schema_id: i64,
current_schema_id: SchemaId,
},
/// The table's last assigned partition id must match the
/// requirement.
#[serde(rename = "assert-last-assigned-partition-id")]
LastAssignedPartitionIdMatch {
/// Last assigned partition id of the table to assert.
#[serde(rename = "last-assigned-partition-id")]
last_assigned_partition_id: i64,
last_assigned_partition_id: i32,
},
/// The table's default spec id must match the requirement.
#[serde(rename = "assert-default-spec-id")]
DefaultSpecIdMatch {
/// Default spec id of the table to assert.
#[serde(rename = "default-spec-id")]
default_spec_id: i64,
default_spec_id: i32,
},
/// The table's default sort order id must match the requirement.
#[serde(rename = "assert-default-sort-order-id")]
Expand Down Expand Up @@ -453,6 +453,140 @@ impl TableUpdate {
}
}

impl TableRequirement {
/// Check that the requirement is met by the table metadata.
/// If the requirement is not met, an appropriate error is returned.
///
/// Provide metadata as `None` if the table does not exist.
pub fn check(&self, metadata: Option<&TableMetadata>) -> Result<()> {
if let Some(metadata) = metadata {
match self {
TableRequirement::NotExist => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Table with id {} already exists",
metadata.uuid()
),
));
}
TableRequirement::UuidMatch { uuid } => {
if &metadata.uuid() != uuid {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Table UUID does not match",
)
.with_context("expected", *uuid)
.with_context("found", metadata.uuid()));
}
}
TableRequirement::CurrentSchemaIdMatch { current_schema_id } => {
// ToDo: Harmonize the types of current_schema_id
if metadata.current_schema_id != *current_schema_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Current schema id does not match",
)
.with_context("expected", current_schema_id.to_string())
.with_context("found", metadata.current_schema_id.to_string()));
}
}
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id,
} => {
if metadata.default_sort_order().order_id != *default_sort_order_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Default sort order id does not match",
)
.with_context("expected", default_sort_order_id.to_string())
.with_context(
"found",
metadata.default_sort_order().order_id.to_string(),
));
}
}
TableRequirement::RefSnapshotIdMatch { r#ref, snapshot_id } => {
let snapshot_ref = metadata.snapshot_for_ref(r#ref);
if let Some(snapshot_id) = snapshot_id {
let snapshot_ref = snapshot_ref.ok_or(Error::new(
ErrorKind::DataInvalid,
format!("Requirement failed: Branch or tag `{}` not found", r#ref),
))?;
if snapshot_ref.snapshot_id() != *snapshot_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Branch or tag `{}`'s snapshot has changed",
r#ref
),
)
.with_context("expected", snapshot_id.to_string())
.with_context("found", snapshot_ref.snapshot_id().to_string()));
}
} else if snapshot_ref.is_some() {
// a null snapshot ID means the ref should not exist already
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Requirement failed: Branch or tag `{}` already exists",
r#ref
),
));
}
}
TableRequirement::DefaultSpecIdMatch { default_spec_id } => {
// ToDo: Harmonize the types of default_spec_id
if metadata.default_partition_spec_id() != *default_spec_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Default partition spec id does not match",
)
.with_context("expected", default_spec_id.to_string())
.with_context("found", metadata.default_partition_spec_id().to_string()));
}
}
TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id,
} => {
if metadata.last_partition_id != *last_assigned_partition_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Last assigned partition id does not match",
)
.with_context("expected", last_assigned_partition_id.to_string())
.with_context("found", metadata.last_partition_id.to_string()));
}
}
TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id,
} => {
if &metadata.last_column_id != last_assigned_field_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Last assigned field id does not match",
)
.with_context("expected", last_assigned_field_id.to_string())
.with_context("found", metadata.last_column_id.to_string()));
}
}
};
} else {
match self {
TableRequirement::NotExist => {}
_ => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Requirement failed: Table does not exist",
));
}
}
}

Ok(())
}
}

pub(super) mod _serde {
use serde::{Deserialize as _, Deserializer};

Expand Down Expand Up @@ -549,7 +683,7 @@ mod tests {
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
TableMetadata, TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -593,6 +727,167 @@ mod tests {
);
}

fn metadata() -> TableMetadata {
let tbl_creation = TableCreation::builder()
.name("table".to_string())
.location("/path/to/table".to_string())
.schema(Schema::builder().build().unwrap())
.build();

TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
.unwrap()
.build()
.unwrap()
}

#[test]
fn test_check_requirement_not_exist() {
let metadata = metadata();
let requirement = TableRequirement::NotExist;

assert!(requirement.check(Some(&metadata)).is_err());
assert!(requirement.check(None).is_ok());
}

#[test]
fn test_check_table_uuid() {
let metadata = metadata();

let requirement = TableRequirement::UuidMatch {
uuid: uuid::Uuid::now_v7(),
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::UuidMatch {
uuid: uuid::Uuid::nil(),
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_ref_snapshot_id() {
let metadata = metadata();

// Ref does not exist but should
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "my_branch".to_string(),
snapshot_id: Some(1),
};
assert!(requirement.check(Some(&metadata)).is_err());

// Ref does not exist and should not
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "my_branch".to_string(),
snapshot_id: None,
};
assert!(requirement.check(Some(&metadata)).is_ok());

// Add snapshot
let record = r#"
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 1515100955770,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s1.avro",
"schema-id": 0
}
"#;

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let mut metadata = metadata;
metadata.append_snapshot(snapshot);

// Ref exists and should matches
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(3051729675574597004),
};
assert!(requirement.check(Some(&metadata)).is_ok());

// Ref exists but does not match
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(1),
};
assert!(requirement.check(Some(&metadata)).is_err());
}

#[test]
fn test_check_last_assigned_field_id() {
let metadata = metadata();

let requirement = TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedFieldIdMatch {
last_assigned_field_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_current_schema_id() {
let metadata = metadata();

let requirement = TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::CurrentSchemaIdMatch {
current_schema_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_default_spec_id() {
let metadata = metadata();

let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 1 };
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::DefaultSpecIdMatch { default_spec_id: 0 };
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_check_default_sort_order_id() {
let metadata = metadata();

let requirement = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 1,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}

#[test]
fn test_table_uuid() {
test_serde_json(
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl<'a> ReplaceSortOrderAction<'a> {

let requirements = vec![
TableRequirement::CurrentSchemaIdMatch {
current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64,
current_schema_id: self.tx.table.metadata().current_schema().schema_id(),
},
TableRequirement::DefaultSortOrderIdMatch {
default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id,
Expand Down

0 comments on commit 52296eb

Please sign in to comment.