Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SYNC command failures #1073

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ def _migrate_external_table(self, src_table: Table, rule: Rule):
target_table_key = rule.as_uc_table_key
table_migrate_sql = src_table.sql_migrate_external(target_table_key)
logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}")
self._backend.execute(table_migrate_sql)
# have to wrap the fetch result with iter() for now, because StatementExecutionBackend returns iterator but RuntimeBackend returns list.
sync_result = next(iter(self._backend.fetch(table_migrate_sql)))
if sync_result.status_code != "SUCCESS":
logger.warning(
f"SYNC command failed to migrate {src_table.key} to {target_table_key}. Status code: {sync_result.status_code}. Description: {sync_result.description}"
)
return False
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
return True

Expand Down
37 changes: 37 additions & 0 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,43 @@ def test_migrate_external_table( # pylint: disable=too-many-locals
assert migration_status[0].dst_table == src_external_table.name


@retried(on=[NotFound], timeout=timedelta(minutes=5))
def test_migrate_external_table_failed_sync(
ws,
caplog,
sql_backend,
inventory_schema,
make_schema,
make_table,
env_or_skip,
):
if not ws.config.is_azure:
pytest.skip("temporary: only works in azure test env")

src_schema = make_schema(catalog_name="hive_metastore")
existing_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c'
src_external_table = make_table(schema_name=src_schema.name, external_csv=existing_mounted_location)
table_crawler = StaticTablesCrawler(sql_backend, inventory_schema, [src_external_table])
# create a mapping that will fail the SYNC because the target catalog and schema does not exist
rules = [
Rule(
"workspace",
"non_existed_catalog",
src_schema.name,
"existed_schema",
src_external_table.name,
src_external_table.name,
),
]
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
table_migrate = TablesMigrate(
table_crawler, ws, sql_backend, StaticTableMapping(ws, sql_backend, rules=rules), migration_status_refresher
)

table_migrate.migrate_tables()
assert "SYNC command failed to migrate" in caplog.text


@retried(on=[NotFound], timeout=timedelta(minutes=5))
def test_revert_migrated_table(
ws, sql_backend, inventory_schema, make_schema, make_table, make_catalog
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ def test_check_inventory_database_exists(ws, new_installation):
assert err.value.args[0] == f"Inventory database '{inventory_database}' already exists in another installation"


@retried(on=[NotFound], timeout=timedelta(minutes=10))
def test_table_migration_job( # pylint: disable=too-many-locals
ws, new_installation, make_catalog, make_schema, make_table, env_or_skip, make_random, make_dbfs_data_copy
):
Expand Down Expand Up @@ -527,6 +528,7 @@ def test_table_migration_job( # pylint: disable=too-many-locals
assert cluster_spec.spark_conf["spark.sql.sources.parallelPartitionDiscovery.parallelism"] == "1000"


@retried(on=[NotFound], timeout=timedelta(minutes=5))
def test_table_migration_job_cluster_override( # pylint: disable=too-many-locals
ws, new_installation, make_catalog, make_schema, make_table, env_or_skip, make_random, make_dbfs_data_copy
):
Expand Down
16 changes: 14 additions & 2 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def ws():

def test_migrate_dbfs_root_tables_should_produce_proper_queries(ws):
errors = {}
rows = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
table_mapping = table_mapping_mock(["managed_dbfs", "managed_mnt", "managed_other"])
Expand Down Expand Up @@ -77,7 +77,7 @@ def test_migrate_dbfs_root_tables_should_be_skipped_when_upgrading_external(ws):

def test_migrate_external_tables_should_produce_proper_queries(ws):
errors = {}
rows = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("SUCCESS", "test")]}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
table_mapping = table_mapping_mock(["external_src"])
Expand All @@ -95,6 +95,18 @@ def test_migrate_external_tables_should_produce_proper_queries(ws):
]


def test_migrate_external_table_failed_sync(ws, caplog):
errors = {}
rows = {r"SYNC .*": MockBackend.rows("status_code", "description")[("LOCATION_OVERLAP", "test")]}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
table_mapping = table_mapping_mock(["external_src"])
migration_status_refresher = MigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
table_migrate = TablesMigrate(table_crawler, ws, backend, table_mapping, migration_status_refresher)
table_migrate.migrate_tables()
assert "SYNC command failed to migrate" in caplog.text


def test_migrate_already_upgraded_table_should_produce_no_queries(ws):
errors = {}
rows = {}
Expand Down
Loading