Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor refreshing of migration-status information for tables, eliminate another redundant refresh. #3270

Merged
merged 15 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.grants import Grant, GrantProgressEncoder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder
Expand Down Expand Up @@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
)

@cached_property
def grants_progress(self) -> GrantProgressEncoder:
def grants_progress(self) -> ProgressEncoder[Grant]:
return GrantProgressEncoder(
self.sql_backend,
self.grant_ownership,
Expand Down Expand Up @@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
)

@cached_property
def tables_progress(self) -> TableProgressEncoder:
def tables_progress(self) -> ProgressEncoder[Table]:
return TableProgressEncoder(
self.sql_backend,
self.table_ownership,
self.migration_status_refresher.index(force_refresh=False),
self.migration_status_refresher,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def key(self):


class TableMigrationIndex:
def __init__(self, tables: list[TableMigrationStatus]):
def __init__(self, tables: Iterable[TableMigrationStatus]):
self._index = {(ms.src_schema, ms.src_table): ms for ms in tables}

def is_migrated(self, schema: str, table: str) -> bool:
Expand Down
8 changes: 3 additions & 5 deletions src/databricks/labs/ucx/progress/history.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import annotations
import dataclasses
import datetime as dt
import typing
import json
import logging
from enum import Enum, EnumMeta
from collections.abc import Iterable, Sequence
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final
from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints

from databricks.labs.lsql.backends import SqlBackend

Expand Down Expand Up @@ -106,7 +105,7 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ
# are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism
# captures the type hints prior to resolution (which happens later in the class initialization process).
# As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly.
klass_type_hints = typing.get_type_hints(klass)
klass_type_hints = get_type_hints(klass)
field_names = [field.name for field in dataclasses.fields(klass)]
field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names}
if "failures" not in field_names_with_types:
Expand Down Expand Up @@ -280,11 +279,10 @@ def __init__(
def full_name(self) -> str:
return f"{self._catalog}.{self._schema}.{self._table}"

@final
def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None:
history_records = [self._encode_record_as_historical(record) for record in snapshot]
logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.")
# This is the only writer, and the mode is 'append'. This is documented as conflict-free.
# The mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _encode_record_as_historical(self, record: Record) -> Historical:
Expand Down
28 changes: 21 additions & 7 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import logging
from collections.abc import Iterable
from dataclasses import replace

from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus, TableMigrationIndex
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical


logger = logging.getLogger(__name__)


class TableProgressEncoder(ProgressEncoder[Table]):
"""Encoder class:Table to class:History.

Expand All @@ -21,7 +28,7 @@ def __init__(
self,
sql_backend: SqlBackend,
ownership: TableOwnership,
table_migration_index: TableMigrationIndex,
migration_status_refresher: CrawlerBase[TableMigrationStatus],
run_id: int,
workspace_id: int,
catalog: str,
Expand All @@ -38,17 +45,24 @@ def __init__(
schema,
table,
)
self._table_migration_index = table_migration_index
self._migration_status_refresher = migration_status_refresher

def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None:
migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot())
history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot]
logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.")
# The mode is 'append'. This is documented as conflict-free.
self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append")

def _encode_record_as_historical(self, record: Table) -> Historical:
"""Encode record as historical.
def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical:
"""Encode a table record, enriching with the migration status.

A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant
A table failure means that the table is pending migration. Grants are purposefully left out, because a grant
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
for tables that are migrated to UC with their relevant grants also being migrated.
"""
historical = super()._encode_record_as_historical(record)
failures = []
if not self._table_migration_index.is_migrated(record.database, record.name):
if not migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
return replace(historical, failures=historical.failures + failures)
32 changes: 17 additions & 15 deletions tests/unit/progress/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationStatus,
)
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder


Expand All @@ -19,21 +21,21 @@
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
table_migration_index = create_autospec(TableMigrationIndex)
table_migration_index.is_migrated.return_value = True
grant_progress_encoder = create_autospec(GrantProgressEncoder)
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
migration_status_crawler.snapshot.return_value = (
TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None),
)
encoder = TableProgressEncoder(
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
)

encoder.append_inventory_snapshot([table])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert rows, f"No rows written for: {encoder.full_name}"
assert len(rows[0].failures) == 0
ownership.owner_of.assert_called_once()
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
grant_progress_encoder.assert_not_called()
migration_status_crawler.snapshot.assert_called_once()


@pytest.mark.parametrize(
Expand All @@ -45,11 +47,12 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
table_migration_index = create_autospec(TableMigrationIndex)
table_migration_index.is_migrated.return_value = False
grant_progress_encoder = create_autospec(GrantProgressEncoder)
migration_status_crawler = create_autospec(TableMigrationStatusRefresher)
migration_status_crawler.snapshot.return_value = (
TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated.
)
encoder = TableProgressEncoder(
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test"
)

encoder.append_inventory_snapshot([table])
Expand All @@ -58,5 +61,4 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert rows[0].failures == ["Pending migration"]
ownership.owner_of.assert_called_once()
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
grant_progress_encoder.assert_not_called()
migration_status_crawler.snapshot.assert_called_once()
42 changes: 37 additions & 5 deletions tests/unit/progress/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment
Expand Down Expand Up @@ -44,8 +45,8 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history
mock_history_log.append_inventory_snapshot.assert_called_once()


def test_migration_progress_runtime_tables_refresh(run_workflow) -> None:
"""Ensure that the split crawl and update-history-log tasks perform their part of the refresh process."""
def test_migration_progress_runtime_tables_refresh_crawl_tables(run_workflow) -> None:
"""Ensure that step 1 of the split crawl/update-history-log tasks performs its part of the refresh process."""
mock_tables_crawler = create_autospec(TablesCrawler)
mock_history_log = create_autospec(ProgressEncoder)
context_replacements = {
Expand All @@ -54,13 +55,44 @@ def test_migration_progress_runtime_tables_refresh(run_workflow) -> None:
"named_parameters": {"parent_run_id": 53},
}

# The first part of a 2-step update: the crawl without updating the history log.
# The first part of a 3-step update: the table crawl without updating the history log.
run_workflow(MigrationProgress.crawl_tables, **context_replacements)
mock_tables_crawler.snapshot.assert_called_once_with(force_refresh=True)
mock_history_log.append_inventory_snapshot.assert_not_called()

mock_tables_crawler.snapshot.reset_mock()
# The second part of the 2-step update: updating the history log (without a forced crawl).

def test_migration_progress_runtime_tables_refresh_migration_status(run_workflow) -> None:
"""Ensure that step 2 of the split crawl/update-history-log tasks performs its part of the refresh process."""
mock_migration_status_refresher = create_autospec(TableMigrationStatusRefresher)
mock_history_log = create_autospec(ProgressEncoder)
context_replacements = {
"migration_status_refresher": mock_migration_status_refresher,
"tables_progress": mock_history_log,
"named_parameters": {"parent_run_id": 53},
}

# The second part of a 3-step update: updating table migration status without updating the history log.
task_dependencies = getattr(MigrationProgress.refresh_table_migration_status, "__task__").depends_on
assert MigrationProgress.crawl_tables.__name__ in task_dependencies
run_workflow(MigrationProgress.refresh_table_migration_status, **context_replacements)
mock_migration_status_refresher.snapshot.assert_called_once_with(force_refresh=True)
mock_history_log.append_inventory_snapshot.assert_not_called()


def test_migration_progress_runtime_tables_refresh_update_history_log(run_workflow) -> None:
"""Ensure that the split crawl and update-history-log tasks perform their part of the refresh process."""
mock_tables_crawler = create_autospec(TablesCrawler)
mock_history_log = create_autospec(ProgressEncoder)
context_replacements = {
"tables_crawler": mock_tables_crawler,
"tables_progress": mock_history_log,
"named_parameters": {"parent_run_id": 53},
}

# The final part of the 3-step update: updating the history log (without a forced crawl).
task_dependencies = getattr(MigrationProgress.update_tables_history_log, "__task__").depends_on
assert MigrationProgress.crawl_tables.__name__ in task_dependencies
assert MigrationProgress.refresh_table_migration_status.__name__ in task_dependencies
run_workflow(MigrationProgress.update_tables_history_log, **context_replacements)
mock_tables_crawler.snapshot.assert_called_once_with()
mock_history_log.append_inventory_snapshot.assert_called_once()
Expand Down