Skip to content

Commit

Permalink
remove catalog identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Dec 19, 2024
1 parent 392c9ce commit 57093f4
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 234 deletions.
43 changes: 2 additions & 41 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,44 +630,6 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
NoSuchViewError: If a view with the given name does not exist.
"""

@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.
Args:
identifier (str | Identifier): Table identifier.
Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

def _identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.
Args:
identifier (str | Identifier): Table identifier.
Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down Expand Up @@ -809,9 +771,8 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
return False

def purge_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
self.drop_table(identifier_tuple)
table = self.load_table(identifier)
self.drop_table(identifier)
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
Expand Down
11 changes: 4 additions & 7 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)

Expand All @@ -258,8 +257,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

try:
self._delete_dynamo_item(
Expand Down Expand Up @@ -289,8 +287,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name)
Expand Down Expand Up @@ -321,7 +318,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e

try:
self.drop_table(from_identifier_tuple)
self.drop_table(from_identifier)
except (NoSuchTableError, GenericDynamoDbError) as e:
log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. "

Expand Down
9 changes: 3 additions & 6 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))

Expand All @@ -570,8 +569,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
Expand All @@ -596,8 +594,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
Expand Down
9 changes: 3 additions & 6 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

with self._client as open_client:
hive_table = self._get_hive_table(open_client, database_name, table_name)
Expand All @@ -548,8 +547,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
try:
with self._client as open_client:
open_client.drop_table(dbname=database_name, name=table_name, deleteData=False)
Expand All @@ -576,8 +574,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
Expand Down
27 changes: 7 additions & 20 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:

def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable:
return StagedTable(
identifier=identifier_tuple if self.name else identifier_tuple,
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
Expand Down Expand Up @@ -578,7 +578,6 @@ def _create_table(
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

identifier = self._identifier_to_tuple_without_catalog(identifier)
namespace_and_table = self._split_identifier_for_path(identifier)
if location:
location = location.rstrip("/")
Expand Down Expand Up @@ -659,7 +658,6 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
Raises:
TableAlreadyExistsError: If the table already exists
"""
identifier = self._identifier_to_tuple_without_catalog(identifier)
namespace_and_table = self._split_identifier_for_path(identifier)
request = RegisterTableRequest(
name=namespace_and_table["table"],
Expand Down Expand Up @@ -691,25 +689,19 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:

@retry(**_RETRY_ARGS)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.get(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
response = self._session.get(self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)))
try:
response.raise_for_status()
except HTTPError as exc:
self._handle_non_200_response(exc, {404: NoSuchTableError})

table_response = TableResponse(**response.json())
return self._response_to_table(identifier_tuple, table_response)
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

@retry(**_RETRY_ARGS)
def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(
Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple)
),
self.url(Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier)),
)
try:
response.raise_for_status()
Expand All @@ -722,9 +714,8 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:

@retry(**_RETRY_ARGS)
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
payload = {
"source": self._split_identifier_for_json(from_identifier_tuple),
"source": self._split_identifier_for_json(from_identifier),
"destination": self._split_identifier_for_json(to_identifier),
}
response = self._session.post(self.url(Endpoints.rename_table), json=payload)
Expand Down Expand Up @@ -899,9 +890,8 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
Returns:
bool: True if the table exists, False otherwise.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.head(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier))
)

if response.status_code == 404:
Expand All @@ -918,11 +908,8 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:

@retry(**_RETRY_ARGS)
def drop_view(self, identifier: Union[str]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(
Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier_tuple, IdentifierKind.VIEW)
),
self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)),
)
try:
response.raise_for_status()
Expand Down
30 changes: 12 additions & 18 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,8 @@ def create_table(
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier_nocatalog = self._identifier_to_tuple_without_catalog(identifier)
namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
table_name = Catalog.table_name_from(identifier_nocatalog)
namespace_identifier = Catalog.namespace_from(identifier)
table_name = Catalog.table_name_from(identifier)
if not self._namespace_exists(namespace_identifier):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace_identifier}")

Expand Down Expand Up @@ -246,10 +245,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(identifier)
if not self._namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

Expand Down Expand Up @@ -285,10 +283,9 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(identifier)
with Session(self.engine) as session:
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
Expand All @@ -309,10 +306,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(identifier)
with Session(self.engine) as session:
if self.engine.dialect.supports_sane_rowcount:
res = session.execute(
Expand Down Expand Up @@ -356,14 +352,12 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
TableAlreadyExistsError: If a table with the new name already exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
to_identifier_tuple = self._identifier_to_tuple_without_catalog(to_identifier)
from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple)
from_namespace_tuple = Catalog.namespace_from(from_identifier)
from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
from_table_name = Catalog.table_name_from(from_identifier_tuple)
to_namespace_tuple = Catalog.namespace_from(to_identifier_tuple)
from_table_name = Catalog.table_name_from(from_identifier)
to_namespace_tuple = Catalog.namespace_from(to_identifier)
to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
to_table_name = Catalog.table_name_from(to_identifier_tuple)
to_table_name = Catalog.table_name_from(to_identifier)
if not self._namespace_exists(to_namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}")
with Session(self.engine) as session:
Expand Down
6 changes: 3 additions & 3 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ def commit_table(
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
try:
identifier_tuple = Catalog.identifier_to_tuple(identifier)
return self.__tables[identifier_tuple]
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error

def drop_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
try:
identifier_tuple = Catalog.identifier_to_tuple(identifier)
self.__tables.pop(identifier_tuple)
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error
Expand All @@ -173,8 +173,8 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
try:
identifier_tuple = Catalog.identifier_to_tuple(from_identifier)
table = self.__tables.pop(identifier_tuple)
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error
Expand Down
Loading

0 comments on commit 57093f4

Please sign in to comment.