Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved reliability of table migration status refresher #1623

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def get_seen_tables(self) -> dict[str, str]:
seen_tables: dict[str, str] = {}
for schema in self._iter_schemas():
try:
tables = self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name)
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception
tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name))
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
except NotFound:
logger.warning(
f"Schema {schema.catalog_name}.{schema.name} no longer exists. Skipping checking its migration status."
Expand Down
19 changes: 10 additions & 9 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
backend: SqlBackend,
table_mapping: TableMapping,
group_manager: GroupManager,
migration_status_refresher: 'MigrationStatusRefresher',
migration_status_refresher: MigrationStatusRefresher,
principal_grants: PrincipalACL,
):
self._tc = table_crawler
Expand All @@ -57,7 +57,11 @@ def __init__(
self._principal_grants = principal_grants

def index(self):
# TODO: remove this method
return self._migration_status_refresher.index()

def _index_with_reset(self):
# when we want the latest up-to-date status, e.g. to determine whether views dependencies have been migrated
self._migration_status_refresher.reset()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't reset it here. we use .index in CLI and it has to be executed fast.

return self._migration_status_refresher.index()

def migrate_tables(
Expand Down Expand Up @@ -116,9 +120,8 @@ def _migrate_tables(

def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants):
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc)
self._migration_status_refresher.reset()
all_tasks = []
sequencer = ViewsMigrationSequencer(tables_to_migrate, self.index())
sequencer = ViewsMigrationSequencer(tables_to_migrate, self._index_with_reset())
batches = sequencer.sequence_batches()
for batch in batches:
tasks = []
Expand All @@ -134,9 +137,7 @@ def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_group
)
)
Threads.strict("migrate views", tasks)
self._migration_status_refresher.reset()
all_tasks.extend(tasks)
self._migration_status_refresher.reset()
return all_tasks

def _compute_grants(
Expand Down Expand Up @@ -201,7 +202,7 @@ def _migrate_view(
def _view_can_be_migrated(self, view: ViewToMigrate):
# dependencies have already been computed, therefore an empty dict is good enough
for table in view.dependencies:
if not self.index().get(table.schema, table.name):
if not self._index_with_reset().get(table.schema, table.name):
logger.info(f"View {view.src.key} cannot be migrated because {table.key} is not migrated yet")
return False
return True
Expand All @@ -219,8 +220,8 @@ def _sql_migrate_view(self, src_view: ViewToMigrate) -> str:
# CREATE VIEW x.y (col1, col2) AS SELECT * FROM w.t
create_statement = self._backend.fetch(f"SHOW CREATE TABLE {src_view.src.safe_sql_key}")
src_view.src.view_text = next(iter(create_statement))["createtab_stmt"]
migration_index = self._migration_status_refresher.index()
return src_view.sql_migrate_view(migration_index)
# this does not require the index to be refreshed because the dependencies have already been validated
return src_view.sql_migrate_view(self.index())

def _migrate_external_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
target_table_key = rule.as_uc_table_key
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ def test_table_migration_job_refreshes_migration_status(ws, installation_ctx, pr
)
migration_status = list(ctx.sql_backend.fetch(query_migration_status))
assert_message_postfix = f" found for {table.table_type} {table.full_name}"
assert len(migration_status) == 1, "No migration status found" + assert_message_postfix
assert len(migration_status) == 1, "No migration status" + assert_message_postfix
assert migration_status[0].dst_schema is not None, "No destination schema" + assert_message_postfix
assert migration_status[0].dst_table is not None, "No destination table" + assert_message_postfix