Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement update for remove-snapshots action #1561

Merged
merged 15 commits into from
Feb 17, 2025
Merged
56 changes: 56 additions & 0 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import itertools
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
Expand Down Expand Up @@ -455,6 +456,61 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl
return base_metadata.model_copy(update=metadata_updates)


@_apply_table_update.register(RemoveSnapshotsUpdate)
def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for remove_snapshot_id in update.snapshot_ids:
if remove_snapshot_id == base_metadata.current_snapshot_id:
raise ValueError(f"Can't remove current snapshot id {remove_snapshot_id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we block the current snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert in iceberg spec, but it's not clear what should happen if you try to remove the current snapshot.

I'm also not sure if I should update parent_snapshot_id in every snapshot that was referencing removed snapshots

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to set parent_snapshot_id to None if the parent is gone

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not clear what should happen if you try to remove the current snapshot.

im looking at the java implementation for answers, i think you can just remove the current snapshot... because you can have an empty table with no snapshots

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a separate pr for remove-snapshot-ref and added a unit test there #1598

if not any(s.snapshot_id == remove_snapshot_id for s in base_metadata.snapshots):
raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}")

snapshots = [
(s.model_copy(update={"parent_snapshot_id": None}) if s.parent_snapshot_id in update.snapshot_ids else s)
for s in base_metadata.snapshots
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a little more verbose:

Suggested change
for s in base_metadata.snapshots
for snapshot in base_metadata.snapshots

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure fb8f350

if s.snapshot_id not in update.snapshot_ids
]
snapshot_log = [
snapshot_log_entry
for snapshot_log_entry in base_metadata.snapshot_log
if snapshot_log_entry.snapshot_id not in update.snapshot_ids
]

remove_ref_updates = (
RemoveSnapshotRefUpdate(ref_name=ref_name)
for ref_name, ref in base_metadata.refs.items()
if ref.snapshot_id in update.snapshot_ids
)
remove_statistics_updates = (
RemoveStatisticsUpdate(statistics_file.snapshot_id)
for statistics_file in base_metadata.statistics
if statistics_file.snapshot_id in update.snapshot_ids
)
updates = itertools.chain(remove_ref_updates, remove_statistics_updates)
new_metadata = base_metadata
for upd in updates:
new_metadata = _apply_table_update(upd, new_metadata, context)

context.add_update(update)
return new_metadata.model_copy(update={"snapshots": snapshots, "snapshot_log": snapshot_log})


@_apply_table_update.register(RemoveSnapshotRefUpdate)
def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if (existing_ref := base_metadata.refs.get(update.ref_name, None)) is None:
return base_metadata

if base_metadata.snapshot_by_id(existing_ref.snapshot_id) is None:
raise ValueError(f"Cannot remove {update.ref_name} ref with unknown snapshot {existing_ref.snapshot_id}")

if update.ref_name == MAIN_BRANCH:
raise ValueError("Cannot remove main branch")

metadata_refs = {**base_metadata.refs}
metadata_refs.pop(update.ref_name, None)
context.add_update(update)
return base_metadata.model_copy(update={"refs": metadata_refs})


@_apply_table_update.register(AddSortOrderUpdate)
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
Expand Down
45 changes: 45 additions & 0 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from pyiceberg.table.update import (
AddSnapshotUpdate,
AssertRefSnapshotId,
RemoveSnapshotRefUpdate,
SetSnapshotRefUpdate,
TableRequirement,
TableUpdate,
Expand Down Expand Up @@ -749,6 +750,28 @@ def _commit(self) -> UpdatesAndRequirements:
"""Apply the pending changes and commit."""
return self._updates, self._requirements

def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots:
"""Remove a snapshot ref.

Args:
ref_name: branch / tag name to remove
Stages the updates and requirements for the remove-snapshot-ref.
Returns
This method for chaining
"""
updates = (RemoveSnapshotRefUpdate(ref_name=ref_name),)
requirements = (
AssertRefSnapshotId(
snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id
if ref_name in self._transaction.table_metadata.refs
else None,
ref=ref_name,
),
)
self._updates += updates
self._requirements += requirements
return self

def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots:
"""
Create a new tag pointing to the given snapshot id.
Expand All @@ -771,6 +794,17 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i
self._requirements += requirement
return self

def remove_tag(self, tag_name: str) -> ManageSnapshots:
"""
Remove a tag.

Args:
tag_name (str): name of tag to remove
Returns:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=tag_name)

def create_branch(
self,
snapshot_id: int,
Expand Down Expand Up @@ -802,3 +836,14 @@ def create_branch(
self._updates += update
self._requirements += requirement
return self

def remove_branch(self, branch_name: str) -> ManageSnapshots:
"""
Remove a branch.

Args:
branch_name (str): name of branch to remove
Returns:
This for method chaining
"""
return self._remove_ref_snapshot(ref_name=branch_name)
32 changes: 32 additions & 0 deletions tests/integration/test_snapshot_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,35 @@ def test_create_branch(catalog: Catalog) -> None:
branch_snapshot_id = tbl.history()[-2].snapshot_id
tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name="branch123").commit()
assert tbl.metadata.refs["branch123"] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch")


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_remove_tag(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 3
# first, create the tag to remove
tag_name = "tag_to_remove"
tag_snapshot_id = tbl.history()[-3].snapshot_id
tbl.manage_snapshots().create_tag(snapshot_id=tag_snapshot_id, tag_name=tag_name).commit()
assert tbl.metadata.refs[tag_name] == SnapshotRef(snapshot_id=tag_snapshot_id, snapshot_ref_type="tag")
# now, remove the tag
tbl.manage_snapshots().remove_tag(tag_name=tag_name).commit()
assert tbl.metadata.refs.get(tag_name, None) is None


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_remove_branch(catalog: Catalog) -> None:
identifier = "default.test_table_snapshot_operations"
tbl = catalog.load_table(identifier)
assert len(tbl.history()) > 2
# first, create the branch to remove
branch_name = "branch_to_remove"
branch_snapshot_id = tbl.history()[-2].snapshot_id
tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name=branch_name).commit()
assert tbl.metadata.refs[branch_name] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch")
# now, remove the branch
tbl.manage_snapshots().remove_branch(branch_name=branch_name).commit()
assert tbl.metadata.refs.get(branch_name, None) is None
35 changes: 35 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
AssertRefSnapshotId,
AssertTableUUID,
RemovePropertiesUpdate,
RemoveSnapshotsUpdate,
RemoveStatisticsUpdate,
SetDefaultSortOrderUpdate,
SetPropertiesUpdate,
Expand Down Expand Up @@ -793,6 +794,40 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None:
)


def test_update_remove_snapshots(table_v2: Table) -> None:
# assert fixture data to easily understand the test assumptions
assert len(table_v2.metadata.snapshots) == 2
assert len(table_v2.metadata.snapshot_log) == 2
assert len(table_v2.metadata.refs) == 2
update = RemoveSnapshotsUpdate(snapshot_ids=[3051729675574597004])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit can you make 3051729675574597004 a constant for readability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added constants REMOVE_SNAPSHOT and KEEP_SNAPSHOT

new_metadata = update_table_metadata(table_v2.metadata, (update,))
assert len(new_metadata.snapshots) == 1
assert new_metadata.snapshots[0].snapshot_id == 3055729675574597004
assert new_metadata.snapshots[0].parent_snapshot_id is None
assert new_metadata.current_snapshot_id == 3055729675574597004
assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms
assert len(new_metadata.snapshot_log) == 1
assert new_metadata.snapshot_log[0].snapshot_id == 3055729675574597004
assert len(new_metadata.refs) == 1
assert new_metadata.refs["main"].snapshot_id == 3055729675574597004


def test_update_remove_snapshots_doesnt_exist(table_v2: Table) -> None:
update = RemoveSnapshotsUpdate(
snapshot_ids=[123],
)
with pytest.raises(ValueError, match="Snapshot with snapshot id 123 does not exist"):
update_table_metadata(table_v2.metadata, (update,))


def test_update_remove_snapshots_cant_remove_current_snapshot_id(table_v2: Table) -> None:
update = RemoveSnapshotsUpdate(
snapshot_ids=[3055729675574597004],
)
with pytest.raises(ValueError, match="Can't remove current snapshot id 3055729675574597004"):
update_table_metadata(table_v2.metadata, (update,))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also add some tests for RemoveSnapshotRefUpdate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a separate pr for remove-snapshot-ref and added a unit test there #1598

def test_update_metadata_add_update_sort_order(table_v2: Table) -> None:
new_sort_order = SortOrder(order_id=table_v2.sort_order().order_id + 1)
new_metadata = update_table_metadata(
Expand Down