Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Requirements Validation #200

Merged
merged 5 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class GenericDynamoDbError(DynamoDbError):
pass


class CommitFailedException(RESTError):
class CommitFailedException(Exception):
"""Commit failed, refresh and try again."""


Expand Down
80 changes: 78 additions & 2 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from sortedcontainers import SortedList
from typing_extensions import Annotated

from pyiceberg.exceptions import ResolveError, ValidationError
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
And,
Expand Down Expand Up @@ -540,18 +540,40 @@ def update_table_metadata(base_metadata: TableMetadata, updates: Tuple[TableUpda
class TableRequirement(IcebergBaseModel):
type: str

@abstractmethod
def validate(self, base_metadata: Optional[TableMetadata]) -> None:
"""Validate the requirement against the base metadata.

Args:
base_metadata: The base metadata to be validated against.

Raises:
CommitFailedException: When the requirement is not met.
"""
...


class AssertCreate(TableRequirement):
"""The table must not already exist; used for create transactions."""

type: Literal["assert-create"] = Field(default="assert-create")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the signature of the parent class as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I updated the signature and also add None-checks for other requirements, which will raise exception when the current table metadata is None.

if base_metadata is not None:
raise CommitFailedException("Table already exists")


class AssertTableUUID(TableRequirement):
"""The table UUID must match the requirement's `uuid`."""

type: Literal["assert-table-uuid"] = Field(default="assert-table-uuid")
uuid: str
uuid: uuid.UUID

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.uuid != base_metadata.table_uuid:
raise CommitFailedException(f"Table UUID does not match: {self.uuid} != {base_metadata.table_uuid}")


class AssertRefSnapshotId(TableRequirement):
Expand All @@ -564,41 +586,95 @@ class AssertRefSnapshotId(TableRequirement):
ref: str
snapshot_id: Optional[int] = Field(default=None, alias="snapshot-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif snapshot_ref := base_metadata.refs.get(self.ref):
ref_type = snapshot_ref.snapshot_ref_type
if self.snapshot_id is None:
raise CommitFailedException(f"Requirement failed: {ref_type} {self.ref} was created concurrently")
elif self.snapshot_id != snapshot_ref.snapshot_id:
raise CommitFailedException(
f"Requirement failed: {ref_type} {self.ref} has changed: expected id {self.snapshot_id}, found {snapshot_ref.snapshot_id}"
)
elif self.snapshot_id is not None:
raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")


class AssertLastAssignedFieldId(TableRequirement):
"""The table's last assigned column id must match the requirement's `last-assigned-field-id`."""

type: Literal["assert-last-assigned-field-id"] = Field(default="assert-last-assigned-field-id")
last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif base_metadata.last_column_id != self.last_assigned_field_id:
raise CommitFailedException(
f"Requirement failed: last assigned field id has changed: expected {self.last_assigned_field_id}, found {base_metadata.last_column_id}"
)


class AssertCurrentSchemaId(TableRequirement):
"""The table's current schema id must match the requirement's `current-schema-id`."""

type: Literal["assert-current-schema-id"] = Field(default="assert-current-schema-id")
current_schema_id: int = Field(..., alias="current-schema-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.current_schema_id != base_metadata.current_schema_id:
raise CommitFailedException(
f"Requirement failed: current schema id has changed: expected {self.current_schema_id}, found {base_metadata.current_schema_id}"
)


class AssertLastAssignedPartitionId(TableRequirement):
"""The table's last assigned partition id must match the requirement's `last-assigned-partition-id`."""

type: Literal["assert-last-assigned-partition-id"] = Field(default="assert-last-assigned-partition-id")
last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif base_metadata.last_partition_id != self.last_assigned_partition_id:
raise CommitFailedException(
f"Requirement failed: last assigned partition id has changed: expected {self.last_assigned_partition_id}, found {base_metadata.last_partition_id}"
)


class AssertDefaultSpecId(TableRequirement):
"""The table's default spec id must match the requirement's `default-spec-id`."""

type: Literal["assert-default-spec-id"] = Field(default="assert-default-spec-id")
default_spec_id: int = Field(..., alias="default-spec-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.default_spec_id != base_metadata.default_spec_id:
raise CommitFailedException(
f"Requirement failed: default spec id has changed: expected {self.default_spec_id}, found {base_metadata.default_spec_id}"
)


class AssertDefaultSortOrderId(TableRequirement):
"""The table's default sort order id must match the requirement's `default-sort-order-id`."""

type: Literal["assert-default-sort-order-id"] = Field(default="assert-default-sort-order-id")
default_sort_order_id: int = Field(..., alias="default-sort-order-id")

def validate(self, base_metadata: Optional[TableMetadata]) -> None:
if base_metadata is None:
raise CommitFailedException("Requirement failed: current table metadata is missing")
elif self.default_sort_order_id != base_metadata.default_sort_order_id:
raise CommitFailedException(
f"Requirement failed: default sort order id has changed: expected {self.default_sort_order_id}, found {base_metadata.default_sort_order_id}"
)


class Namespace(IcebergRootModel[List[str]]):
"""Reference to one or more levels of a namespace."""
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/table/refs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def __repr__(self) -> str:
"""Return the string representation of the SnapshotRefType class."""
return f"SnapshotRefType.{self.name}"

def __str__(self) -> str:
"""Return the string representation of the SnapshotRefType class."""
return self.value
Copy link
Contributor Author

@HonahX HonahX Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need this __str__. Otherwise,

>>> ref_type=SnapshotRefType.Branch
>>> f"Test: {ref_type}"

will show Test: SnapshotRefType.Branch in my local environment (Python 3.11) but Test: branch in CI's python 3.8. I think it's because Python 3.11 changes the default behavior of Enum's str: https://blog.pecar.me/python-enum



class SnapshotRef(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
Expand Down
127 changes: 127 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
import uuid
from copy import copy
from typing import Dict

import pytest
from sortedcontainers import SortedList

from pyiceberg.exceptions import CommitFailedException
from pyiceberg.expressions import (
AlwaysTrue,
And,
Expand All @@ -39,6 +41,14 @@
from pyiceberg.schema import Schema
from pyiceberg.table import (
AddSnapshotUpdate,
AssertCreate,
AssertCurrentSchemaId,
AssertDefaultSortOrderId,
AssertDefaultSpecId,
AssertLastAssignedFieldId,
AssertLastAssignedPartitionId,
AssertRefSnapshotId,
AssertTableUUID,
SetPropertiesUpdate,
SetSnapshotRefUpdate,
SnapshotRef,
Expand Down Expand Up @@ -721,3 +731,120 @@ def test_metadata_isolation_from_illegal_updates(table_v1: Table) -> None:
def test_generate_snapshot_id(table_v2: Table) -> None:
assert isinstance(_generate_snapshot_id(), int)
assert isinstance(table_v2.new_snapshot_id(), int)


def test_assert_create(table_v2: Table) -> None:
AssertCreate().validate(None)

with pytest.raises(CommitFailedException, match="Table already exists"):
AssertCreate().validate(table_v2.metadata)


def test_assert_table_uuid(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertTableUUID(uuid=base_metadata.table_uuid).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertTableUUID(uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c2")).validate(None)

with pytest.raises(
CommitFailedException,
match="Table UUID does not match: 9c12d441-03fe-4693-9a96-a0705ddf69c2 != 9c12d441-03fe-4693-9a96-a0705ddf69c1",
):
AssertTableUUID(uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c2")).validate(base_metadata)


def test_assert_ref_snapshot_id(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertRefSnapshotId(ref="main", snapshot_id=base_metadata.current_snapshot_id).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertRefSnapshotId(ref="main", snapshot_id=1).validate(None)

with pytest.raises(
CommitFailedException,
match="Requirement failed: branch main was created concurrently",
):
AssertRefSnapshotId(ref="main", snapshot_id=None).validate(base_metadata)

with pytest.raises(
CommitFailedException,
match="Requirement failed: branch main has changed: expected id 1, found 3055729675574597004",
):
AssertRefSnapshotId(ref="main", snapshot_id=1).validate(base_metadata)

with pytest.raises(
CommitFailedException,
match="Requirement failed: branch or tag not_exist is missing, expected 1",
):
AssertRefSnapshotId(ref="not_exist", snapshot_id=1).validate(base_metadata)


def test_assert_last_assigned_field_id(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertLastAssignedFieldId(last_assigned_field_id=base_metadata.last_column_id).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertLastAssignedFieldId(last_assigned_field_id=1).validate(None)

with pytest.raises(
CommitFailedException,
match="Requirement failed: last assigned field id has changed: expected 1, found 3",
):
AssertLastAssignedFieldId(last_assigned_field_id=1).validate(base_metadata)


def test_assert_current_schema_id(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertCurrentSchemaId(current_schema_id=base_metadata.current_schema_id).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertCurrentSchemaId(current_schema_id=1).validate(None)

with pytest.raises(
CommitFailedException,
match="Requirement failed: current schema id has changed: expected 2, found 1",
):
AssertCurrentSchemaId(current_schema_id=2).validate(base_metadata)


def test_last_assigned_partition_id(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertLastAssignedPartitionId(last_assigned_partition_id=base_metadata.last_partition_id).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertLastAssignedPartitionId(last_assigned_partition_id=1).validate(None)

with pytest.raises(
CommitFailedException,
match="Requirement failed: last assigned partition id has changed: expected 1, found 1000",
):
AssertLastAssignedPartitionId(last_assigned_partition_id=1).validate(base_metadata)


def test_assert_default_spec_id(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertDefaultSpecId(default_spec_id=base_metadata.default_spec_id).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertDefaultSpecId(default_spec_id=1).validate(None)

with pytest.raises(
CommitFailedException,
match="Requirement failed: default spec id has changed: expected 1, found 0",
):
AssertDefaultSpecId(default_spec_id=1).validate(base_metadata)


def test_assert_default_sort_order_id(table_v2: Table) -> None:
base_metadata = table_v2.metadata
AssertDefaultSortOrderId(default_sort_order_id=base_metadata.default_sort_order_id).validate(base_metadata)

with pytest.raises(CommitFailedException, match="Requirement failed: current table metadata is missing"):
AssertDefaultSortOrderId(default_sort_order_id=1).validate(None)

with pytest.raises(
CommitFailedException,
match="Requirement failed: default sort order id has changed: expected 1, found 3",
):
AssertDefaultSortOrderId(default_sort_order_id=1).validate(base_metadata)