Skip to content

Commit

Permalink
Refresh migration status at the end of the migrate_tables workflows (
Browse files Browse the repository at this point in the history
…#1599)

## Changes
Refresh migration status at the end of the `migrate_tables` task

### Linked issues
Resolves #1597

### Functionality 

- [ ] added relevant user documentation
- [ ] added new CLI command
- [ ] modified existing command: `databricks labs ucx ...`
- [ ] added a new workflow
- [x] modified existing workflow: `migrate-*-tables*`
- [ ] added a new table
- [ ] modified existing table: `...`

### Tests
<!-- How is this tested? Please see the checklist below and also
describe any other relevant tests -->

- [x] manually tested
- [ ] added unit tests
- [x] added integration tests
- [ ] verified on staging environment (screenshot attached)
  • Loading branch information
JCZuurmond authored May 2, 2024
1 parent da71371 commit f3c8cb0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_group
Threads.strict("migrate views", tasks)
self._migration_status_refresher.reset()
all_tasks.extend(tasks)
self._migration_status_refresher.reset()
return all_tasks

def _compute_grants(
Expand Down
28 changes: 24 additions & 4 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW, acl_strategy=[AclMigrationWhat.LEGACY_TACL])

@job_task(dashboard="migration_main", depends_on=[migrate_views])
@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def refresh_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.index()

@job_task(dashboard="migration_main", depends_on=[refresh_migration_status])
def migration_report(self, ctx: RuntimeContext):
"""Refreshes the migration dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""
Expand Down Expand Up @@ -66,7 +71,12 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW, acl_strategy=[AclMigrationWhat.LEGACY_TACL])

@job_task(dashboard="migration_main", depends_on=[migrate_views])
@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def refresh_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.index()

@job_task(dashboard="migration_main", depends_on=[refresh_migration_status])
def migration_report(self, ctx: RuntimeContext):
"""Refreshes the migration dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""
Expand Down Expand Up @@ -104,7 +114,12 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW, acl_strategy=[AclMigrationWhat.LEGACY_TACL])

@job_task(dashboard="migration_main", depends_on=[migrate_views])
@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def refresh_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.index()

@job_task(dashboard="migration_main", depends_on=[refresh_migration_status])
def migration_report(self, ctx: RuntimeContext):
"""Refreshes the migration dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""
Expand All @@ -121,7 +136,12 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
located under the assessment."""
ctx.tables_in_mounts.snapshot()

@job_task(dashboard="migration_main", depends_on=[scan_tables_in_mounts_experimental])
@job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental])
def refresh_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.index()

@job_task(dashboard="migration_main", depends_on=[refresh_migration_status])
def migration_report(self, ctx: RuntimeContext):
"""Refreshes the migration dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""
40 changes: 40 additions & 0 deletions tests/integration/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import timedelta

import pytest
from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried


@retried(on=[NotFound], timeout=timedelta(minutes=5))
@pytest.mark.parametrize(
"prepare_tables_for_migration,workflow",
[
("regular", "migrate-tables"),
("hiveserde", "migrate-external-hiveserde-tables-in-place-experimental"),
("hiveserde", "migrate-external-tables-ctas"),
],
indirect=("prepare_tables_for_migration",),
)
def test_table_migration_job_refreshes_migration_status(ws, installation_ctx, prepare_tables_for_migration, workflow):
"""The migration status should be refreshed after the migration job."""
tables, _ = prepare_tables_for_migration
ctx = installation_ctx.replace(
extend_prompts={
r".*Do you want to update the existing installation?.*": 'yes',
},
)

ctx.workspace_installation.run()
ctx.deployed_workflows.run_workflow(workflow)

for table in tables.values():
# Avoiding MigrationStatusRefresh as it will refresh the status before fetching
query_migration_status = (
f"SELECT * FROM {ctx.config.inventory_database}.migration_status "
f"WHERE src_schema = '{table.schema_name}' AND src_table = '{table.name}'"
)
migration_status = list(ctx.sql_backend.fetch(query_migration_status))
assert_message_postfix = f" found for {table.table_type} {table.full_name}"
assert len(migration_status) == 1, "No migration status found" + assert_message_postfix
assert migration_status[0].dst_schema is not None, "No destination schema" + assert_message_postfix
assert migration_status[0].dst_table is not None, "No destination table" + assert_message_postfix

0 comments on commit f3c8cb0

Please sign in to comment.