From 8ef7d2236f2797c9e770d4cc0728690d72564c4c Mon Sep 17 00:00:00 2001 From: Ziyuan Qin Date: Tue, 19 Mar 2024 14:59:42 -0700 Subject: [PATCH 1/4] handle SYNC command result --- .../labs/ucx/hive_metastore/table_migrate.py | 7 ++++++- tests/unit/hive_metastore/test_table_migrate.py | 16 ++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 9258243a28..84ae12235d 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -82,7 +82,12 @@ def _migrate_external_table(self, src_table: Table, rule: Rule): target_table_key = rule.as_uc_table_key table_migrate_sql = src_table.sql_migrate_external(target_table_key) logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}") - self._backend.execute(table_migrate_sql) + sync_result = next(self._backend.fetch(table_migrate_sql)) + if sync_result.status_code != "SUCCESS": + logger.warning( + f"SYNC command failed to migrate {src_table.key} to {target_table_key}. Status code: {sync_result.status_code}. Description: {sync_result.description}" + ) + return False self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id())) return True diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 986c7ac384..fefe7fc919 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -34,7 +34,7 @@ def ws(): def test_migrate_dbfs_root_tables_should_produce_proper_queries(ws): errors = {} - rows = {} + rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]} backend = MockBackend(fails_on_first=errors, rows=rows) table_crawler = TablesCrawler(backend, "inventory_database") table_mapping = table_mapping_mock(["managed_dbfs", "managed_mnt", "managed_other"]) @@ -77,7 +77,7 @@ def test_migrate_dbfs_root_tables_should_be_skipped_when_upgrading_external(ws): def test_migrate_external_tables_should_produce_proper_queries(ws): errors = {} - rows = {} + rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]} backend = MockBackend(fails_on_first=errors, rows=rows) table_crawler = TablesCrawler(backend, "inventory_database") table_mapping = table_mapping_mock(["external_src"]) @@ -95,6 +95,18 @@ def test_migrate_external_tables_should_produce_proper_queries(ws): ] +def test_migrate_external_table_failed_sync(ws, caplog): + errors = {} + rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("LOCATION_OVERLAP", "test")]} + backend = MockBackend(fails_on_first=errors, rows=rows) + table_crawler = TablesCrawler(backend, "inventory_database") + table_mapping = table_mapping_mock(["external_src"]) + migration_status_refresher = MigrationStatusRefresher(ws, backend, "inventory_database", table_crawler) + table_migrate = TablesMigrate(table_crawler, ws, backend, table_mapping, migration_status_refresher) + table_migrate.migrate_tables() + assert "SYNC command failed to migrate" in caplog.text + + def test_migrate_already_upgraded_table_should_produce_no_queries(ws): errors = {} rows = {} From 581db42698bfb2f6bb8366b14935fbddde30acd3 Mon Sep 17 00:00:00 2001 From: Ziyuan Qin Date: Tue, 19 Mar 2024 15:39:14 -0700 Subject: [PATCH 2/4] add an integration test --- .../hive_metastore/test_migrate.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/integration/hive_metastore/test_migrate.py b/tests/integration/hive_metastore/test_migrate.py index 6df85346f6..6de443d3d3 100644 --- a/tests/integration/hive_metastore/test_migrate.py +++ b/tests/integration/hive_metastore/test_migrate.py @@ -167,6 +167,43 @@ def test_migrate_external_table( # pylint: disable=too-many-locals assert migration_status[0].dst_table == src_external_table.name +@retried(on=[NotFound], timeout=timedelta(minutes=5)) +def test_migrate_external_table_failed_sync( + ws, + caplog, + sql_backend, + inventory_schema, + make_schema, + make_table, + env_or_skip, +): + if not ws.config.is_azure: + pytest.skip("temporary: only works in azure test env") + + src_schema = make_schema(catalog_name="hive_metastore") + existing_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c' + src_external_table = make_table(schema_name=src_schema.name, external_csv=existing_mounted_location) + table_crawler = StaticTablesCrawler(sql_backend, inventory_schema, [src_external_table]) + # create a mapping that will fail the SYNC because the target catalog and schema does not exist + rules = [ + Rule( + "workspace", + "non_existed_catalog", + src_schema.name, + "existed_schema", + src_external_table.name, + src_external_table.name, + ), + ] + migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) + table_migrate = TablesMigrate( + table_crawler, ws, sql_backend, StaticTableMapping(ws, sql_backend, rules=rules), migration_status_refresher + ) + + table_migrate.migrate_tables() + assert "SYNC command failed to migrate" in caplog.text + + @retried(on=[NotFound], timeout=timedelta(minutes=5)) def test_revert_migrated_table( ws, sql_backend, inventory_schema, make_schema, make_table, make_catalog From 8a4701b612560a9bb1b3db5cdff10194a2b0449a Mon Sep 17 00:00:00 2001 From: Ziyuan Qin Date: Wed, 20 Mar 2024 09:46:19 -0700 Subject: [PATCH 3/4] add iter() to deal with different return types from sqlbackend.fetch --- src/databricks/labs/ucx/hive_metastore/table_migrate.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 84ae12235d..e4882b77f8 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -82,7 +82,8 @@ def _migrate_external_table(self, src_table: Table, rule: Rule): target_table_key = rule.as_uc_table_key table_migrate_sql = src_table.sql_migrate_external(target_table_key) logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}") - sync_result = next(self._backend.fetch(table_migrate_sql)) + # have to wrap the fetch result with iter() for now, because StatementExecutionBackend returns iterator but RuntimeBackend returns list. + sync_result = next(iter(self._backend.fetch(table_migrate_sql))) if sync_result.status_code != "SUCCESS": logger.warning( f"SYNC command failed to migrate {src_table.key} to {target_table_key}. Status code: {sync_result.status_code}. Description: {sync_result.description}" From 8cda79f275d4a3257ee252142324afa933e6c7dd Mon Sep 17 00:00:00 2001 From: Ziyuan Qin Date: Wed, 20 Mar 2024 10:39:16 -0700 Subject: [PATCH 4/4] add return on installation not found error for migrate table workflow integration test --- tests/integration/test_installation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index acdc394829..73afdad50d 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -466,6 +466,7 @@ def test_check_inventory_database_exists(ws, new_installation): assert err.value.args[0] == f"Inventory database '{inventory_database}' already exists in another installation" +@retried(on=[NotFound], timeout=timedelta(minutes=10)) def test_table_migration_job( # pylint: disable=too-many-locals ws, new_installation, make_catalog, make_schema, make_table, env_or_skip, make_random, make_dbfs_data_copy ): @@ -527,6 +528,7 @@ def test_table_migration_job( # pylint: disable=too-many-locals assert cluster_spec.spark_conf["spark.sql.sources.parallelPartitionDiscovery.parallelism"] == "1000" +@retried(on=[NotFound], timeout=timedelta(minutes=5)) def test_table_migration_job_cluster_override( # pylint: disable=too-many-locals ws, new_installation, make_catalog, make_schema, make_table, env_or_skip, make_random, make_dbfs_data_copy ):