Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for catalog_name in table identifier string #963

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 4 additions & 42 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
RecursiveDict,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.deprecated import deprecated, deprecation_message
from pyiceberg.utils.deprecated import deprecated as deprecated
from pyiceberg.utils.deprecated import deprecation_message

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -630,44 +631,6 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
NoSuchViewError: If a view with the given name does not exist.
"""

@deprecated(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.

Args:
identifier (str | Identifier): Table identifier.

Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

def _identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.

Args:
identifier (str | Identifier): Table identifier.

Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Support for parsing catalog level identifier in Catalog identifiers is deprecated. Please refer to the table using only its namespace and its table name.",
)
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down Expand Up @@ -809,9 +772,8 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
return False

def purge_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
self.drop_table(identifier_tuple)
table = self.load_table(identifier)
self.drop_table(identifier)
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
Expand Down
11 changes: 4 additions & 7 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)

Expand All @@ -258,8 +257,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

try:
self._delete_dynamo_item(
Expand Down Expand Up @@ -289,8 +287,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name)
Expand Down Expand Up @@ -321,7 +318,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e

try:
self.drop_table(from_identifier_tuple)
self.drop_table(from_identifier)
except (NoSuchTableError, GenericDynamoDbError) as e:
log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. "

Expand Down
9 changes: 3 additions & 6 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name))

Expand All @@ -570,8 +569,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
Expand All @@ -596,8 +594,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
Expand Down
9 changes: 3 additions & 6 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)

with self._client as open_client:
hive_table = self._get_hive_table(open_client, database_name, table_name)
Expand All @@ -548,8 +547,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
try:
with self._client as open_client:
open_client.drop_table(dbname=database_name, name=table_name, deleteData=False)
Expand All @@ -576,8 +574,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
Expand Down
27 changes: 7 additions & 20 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:

def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable:
return StagedTable(
identifier=identifier_tuple if self.name else identifier_tuple,
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
Expand Down Expand Up @@ -578,7 +578,6 @@ def _create_table(
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

identifier = self._identifier_to_tuple_without_catalog(identifier)
namespace_and_table = self._split_identifier_for_path(identifier)
if location:
location = location.rstrip("/")
Expand Down Expand Up @@ -659,7 +658,6 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
Raises:
TableAlreadyExistsError: If the table already exists
"""
identifier = self._identifier_to_tuple_without_catalog(identifier)
namespace_and_table = self._split_identifier_for_path(identifier)
request = RegisterTableRequest(
name=namespace_and_table["table"],
Expand Down Expand Up @@ -691,25 +689,19 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:

@retry(**_RETRY_ARGS)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.get(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
response = self._session.get(self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)))
try:
response.raise_for_status()
except HTTPError as exc:
self._handle_non_200_response(exc, {404: NoSuchTableError})

table_response = TableResponse(**response.json())
return self._response_to_table(identifier_tuple, table_response)
return self._response_to_table(self.identifier_to_tuple(identifier), table_response)

@retry(**_RETRY_ARGS)
def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(
Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple)
),
self.url(Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier)),
)
try:
response.raise_for_status()
Expand All @@ -722,9 +714,8 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:

@retry(**_RETRY_ARGS)
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
payload = {
"source": self._split_identifier_for_json(from_identifier_tuple),
"source": self._split_identifier_for_json(from_identifier),
"destination": self._split_identifier_for_json(to_identifier),
}
response = self._session.post(self.url(Endpoints.rename_table), json=payload)
Expand Down Expand Up @@ -899,9 +890,8 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:
Returns:
bool: True if the table exists, False otherwise.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.head(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier))
)

if response.status_code == 404:
Expand All @@ -918,11 +908,8 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool:

@retry(**_RETRY_ARGS)
def drop_view(self, identifier: Union[str]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(
Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier_tuple, IdentifierKind.VIEW)
),
self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)),
)
try:
response.raise_for_status()
Expand Down
30 changes: 12 additions & 18 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,8 @@ def create_table(
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier_nocatalog = self._identifier_to_tuple_without_catalog(identifier)
namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
table_name = Catalog.table_name_from(identifier_nocatalog)
namespace_identifier = Catalog.namespace_from(identifier)
table_name = Catalog.table_name_from(identifier)
if not self._namespace_exists(namespace_identifier):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace_identifier}")

Expand Down Expand Up @@ -246,10 +245,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(identifier)
if not self._namespace_exists(namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

Expand Down Expand Up @@ -285,10 +283,9 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(identifier)
with Session(self.engine) as session:
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
Expand All @@ -309,10 +306,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
namespace_tuple = Catalog.namespace_from(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(identifier)
with Session(self.engine) as session:
if self.engine.dialect.supports_sane_rowcount:
res = session.execute(
Expand Down Expand Up @@ -356,14 +352,12 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
TableAlreadyExistsError: If a table with the new name already exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
to_identifier_tuple = self._identifier_to_tuple_without_catalog(to_identifier)
from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple)
from_namespace_tuple = Catalog.namespace_from(from_identifier)
from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
from_table_name = Catalog.table_name_from(from_identifier_tuple)
to_namespace_tuple = Catalog.namespace_from(to_identifier_tuple)
from_table_name = Catalog.table_name_from(from_identifier)
to_namespace_tuple = Catalog.namespace_from(to_identifier)
to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
to_table_name = Catalog.table_name_from(to_identifier_tuple)
to_table_name = Catalog.table_name_from(to_identifier)
if not self._namespace_exists(to_namespace):
raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}")
with Session(self.engine) as session:
Expand Down
17 changes: 2 additions & 15 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@
)
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.deprecated import deprecated, deprecation_message
from pyiceberg.utils.deprecated import deprecated
from pyiceberg.utils.deprecated import deprecation_message as deprecation_message
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
Expand Down Expand Up @@ -787,20 +788,6 @@ def refresh(self) -> Table:
self.metadata_location = fresh.metadata_location
return self

@property
def identifier(self) -> Identifier:
"""Return the identifier of this table.

Returns:
An Identifier tuple of the table name
"""
deprecation_message(
deprecated_in="0.8.0",
removed_in="0.9.0",
help_message="Table.identifier property is deprecated. Please use Table.name() function instead.",
)
return (self.catalog.name,) + self._identifier

def name(self) -> Identifier:
"""Return the identifier of this table.

Expand Down
Loading