Skip to content

Commit

Permalink
DBR 16 and later support (#3481)
Browse files Browse the repository at this point in the history
## Changes

This PR adds DBR16 compatibility for the code that (optionally) converts
HMS tables to external tables within the `migrate-tables` workflow.

### Linked issues

Follows #3459
Resolves #3460

### Functionality

- modified existing workflow: `migrate-tables`

### Tests

- manually tested (for DBR16)
- existing integration tests (for DBR15)
- verified on staging environment (with DBR16, see screenshot below)


![image](https://github.com/user-attachments/assets/ae75c4fb-2d6e-4bad-907c-2f9b8b3abff0)
  • Loading branch information
asnare authored Jan 9, 2025
1 parent db29a63 commit 368b3a5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
26 changes: 20 additions & 6 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,28 @@ def _catalog_type(self):
def _catalog_table(self):
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access

def _convert_hms_table_to_external(self, src_table: Table) -> bool:
"""Converts a Hive metastore table to external using Spark JVM methods.
@staticmethod
def _get_entity_storage_locations(table_metadata):
"""Obtain the entityStorageLocations property for table metadata, if the property is present."""
# This is needed because:
# - DBR 16.0 introduced entityStorageLocations as a property on table metadata, and this is required for
# as a constructor parameter for CatalogTable.
# - We need to be compatible with earlier versions of DBR.
# - The normal hasattr() check does not work with Py4J-based objects: it always returns True and non-existent
# methods will be automatically created on the proxy but fail when invoked.
# Instead the only approach is to use dir() to check if the method exists _prior_ to trying to access it.
# (After trying to access it, dir() will also include it even though it doesn't exist.)
return table_metadata.entityStorageLocations() if 'entityStorageLocations' in dir(table_metadata) else None

TODO:
This method fails for Databricks runtime 16.0, probably due to the JDK update (https://docs.databricks.com/en/release-notes/runtime/16.0.html#breaking-change-jdk-17-is-now-the-default).
"""
def _convert_hms_table_to_external(self, src_table: Table) -> bool:
"""Converts a Hive metastore table to external using Spark JVM methods."""
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
inventory_table = self._tables_crawler.full_name
try:
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
table_identifier = self._table_identifier(src_table.name, database)
old_table = self._catalog.getTableMetadata(table_identifier)
entity_storage_locations = self._get_entity_storage_locations(old_table)
new_table = self._catalog_table(
old_table.identifier(),
self._catalog_type('EXTERNAL'),
Expand All @@ -327,13 +337,17 @@ def _convert_hms_table_to_external(self, src_table: Table) -> bool:
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
# From DBR 16, there's a new constructor argument: entityStorageLocations (Seq[EntityStorageLocation])
# (We can't detect whether the argument is needed by the constructor, but assume that if the accessor
# is present on the source table then the argument is needed.)
*([entity_storage_locations] if entity_storage_locations is not None else []),
)
self._catalog.alterTable(new_table)
self._update_table_status(src_table, inventory_table)
logger.info(f"Converted {src_table.name} to External Table type.")
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"Error converting HMS table {src_table.name} to external: {e}", exc_info=True)
return False
logger.info(f"Converted {src_table.name} to External Table type.")
return True

def _update_table_status(self, src_table: Table, inventory_table: str):
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_running_real_assessment_job(
installation_ctx.workspace_installation.run()

workflow = "assessment"
installation_ctx.deployed_workflows.run_workflow(workflow)
installation_ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True)
assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}"

after = installation_ctx.generic_permissions_support.load_as_dict("cluster-policies", cluster_policy.policy_id)
Expand Down
18 changes: 6 additions & 12 deletions tests/integration/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ def test_table_migration_job_refreshes_migration_status(


def test_table_migration_convert_manged_to_external(installation_ctx, make_table_migration_context) -> None:
"""Convert managed tables to external before migrating.
Note:
This test fails from Databricks runtime 16.0 (https://docs.databricks.com/en/release-notes/runtime/16.0.html),
probably due to the JDK update (https://docs.databricks.com/en/release-notes/runtime/16.0.html#breaking-change-jdk-17-is-now-the-default).
"""
"""Convert managed tables to external before migrating."""
tables, dst_schema = make_table_migration_context("managed", installation_ctx)
ctx = installation_ctx.replace(
config_transform=lambda wc: dataclasses.replace(
Expand All @@ -121,11 +116,11 @@ def test_table_migration_convert_manged_to_external(installation_ctx, make_table

# The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete
# before we can test the migration workflow.
ctx.deployed_workflows.run_workflow("assessment")
ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True)
assert ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment"

# The workflow under test.
ctx.deployed_workflows.run_workflow("migrate-tables")
ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True)
assert ctx.deployed_workflows.validate_step("migrate-tables")

missing_tables = set[str]()
Expand Down Expand Up @@ -163,7 +158,7 @@ def test_hiveserde_table_in_place_migration_job(installation_ctx, make_table_mig

# The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete
# before we can test the migration workflow.
ctx.deployed_workflows.run_workflow("assessment")
ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True)
assert ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment"

# The workflow under test.
Expand Down Expand Up @@ -191,12 +186,11 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, make_table_mig

# The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete
# before we can test the migration workflow.
ctx.deployed_workflows.run_workflow("assessment")
ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True)
assert ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment"

# The workflow under test.
ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas")
# assert the workflow is successful
ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas", skip_job_wait=True)
assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas")
# assert the tables are migrated
missing_tables = set[str]()
Expand Down

0 comments on commit 368b3a5

Please sign in to comment.