Skip to content

Commit

Permalink
Exclude UCX jobs from crawling (#3733)
Browse files Browse the repository at this point in the history
## Changes
Exclude UCX jobs from crawling to avoid confusing for users when they
see UCX jobs in their assessment report.

### Linked issues

Fixes #3656
Resolves #3722
Follow up on #3732
Relates to #3731

### Functionality

- [x] modified `JobsCrawler`
- [x] modified existing workflow: `assessment`

### Tests

- [x] added unit tests
- [x] added integration tests

### PRs merged into this branch

> Merged the following PRs into this branch in an attempt to let the CI
pass. Those PRs contain fixes for integration tests

From #3767:

Scope linted dashboards on mock runtime context. We should use
`make_dashboard` instead of the dashboard fixture directly
`_make_dashboard`. Also changed one dashboard to a `LakeviewDashboard`
so that we lint that too

From #3759

Add retry mechanism to wait for the grants to exists before crawling
Resolves #3758
- [x] modified integration tests: `test_all_grant_types`
  • Loading branch information
JCZuurmond authored Feb 27, 2025
1 parent ef45443 commit 5998451
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 47 deletions.
42 changes: 37 additions & 5 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.service import compute
from databricks.sdk.service.compute import ClusterDetails, ClusterSpec
from databricks.sdk.service.jobs import (
Expand Down Expand Up @@ -94,15 +95,49 @@ def _job_clusters(job: BaseJob) -> Iterable[tuple[BaseJob, ClusterSpec]]:


class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin):
"""Crawl jobs (workflows), assess them and store the result in the inventory.
Args :
ws (WorkspaceClient): The workspace client to crawl the jobs with.
sql_backend (SqlBackend): The SQL backend to store the results with.
schema (str): The schema to store the results in.
include_job_ids (list[int] | None): If provided, only include these job ids. Otherwise, include all jobs.
exclude_job_ids (list[int] | None): If provided, exclude these job ids. Otherwise, include all jobs. Note: We
prefer `include_job_ids` for more strict scoping, but sometimes it's easier to exclude a few jobs.
"""

def __init__(
self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, *, include_job_ids: list[int] | None = None
self,
ws: WorkspaceClient,
sql_backend: SqlBackend,
schema,
*,
include_job_ids: list[int] | None = None,
exclude_job_ids: list[int] | None = None,
):
super().__init__(sql_backend, "hive_metastore", schema, "jobs", JobInfo)
self._ws = ws
self._include_job_ids = include_job_ids
self._exclude_job_ids = exclude_job_ids

def _list_jobs(self) -> Iterable[BaseJob]:
"""List the jobs.
If provided, excludes jobs with id in `exclude_job_ids`.
If provided, excludes jobs with id not in `include_job_ids`.
If both provided, `exclude_job_ids` takes precedence.
"""
try:
for job in self._ws.jobs.list(expand_tasks=True):
if self._exclude_job_ids is not None and job.job_id in self._exclude_job_ids:
continue
if self._include_job_ids is None or job.job_id in self._include_job_ids:
yield job
except DatabricksError as e:
logger.error("Cannot list jobs", exc_info=e)

def _crawl(self) -> Iterable[JobInfo]:
all_jobs = list(self._ws.jobs.list(expand_tasks=True))
all_jobs = list(self._list_jobs())
all_clusters = {c.cluster_id: c for c in self._ws.clusters.list() if c.cluster_id}
return self._assess_jobs(all_jobs, all_clusters)

Expand All @@ -112,9 +147,6 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[
job_id = job.job_id
if not job_id:
continue
if self._include_job_ids is not None and job_id not in self._include_job_ids:
logger.info(f"Skipping job_id={job_id}")
continue
cluster_details = ClusterDetails.from_dict(cluster_config.as_dict())
cluster_failures = self._check_cluster_failures(cluster_details, "Job cluster")
cluster_failures.extend(self._check_jar_task(job.settings.tasks))
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def jobs_crawler(self) -> JobsCrawler:
self.sql_backend,
self.inventory_database,
include_job_ids=self.config.include_job_ids,
exclude_job_ids=list(self.install_state.jobs.values()),
)

@cached_property
Expand Down
11 changes: 1 addition & 10 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
PolicyInfo,
)
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo, JobsCrawler, SubmitRunsCrawler
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo, SubmitRunsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler, PipelineInfo, PipelineOwnership
from databricks.labs.ucx.assessment.sequencing import MigrationSequencer
from databricks.labs.ucx.config import WorkspaceConfig
Expand Down Expand Up @@ -68,15 +68,6 @@ def installation(self) -> Installation:
install_folder = self._config_path.parent.as_posix().removeprefix("/Workspace")
return Installation(self.workspace_client, "ucx", install_folder=install_folder)

@cached_property
def jobs_crawler(self) -> JobsCrawler:
return JobsCrawler(
self.workspace_client,
self.sql_backend,
self.inventory_database,
include_job_ids=self.config.include_job_ids,
)

@cached_property
def job_ownership(self) -> JobOwnership:
return JobOwnership(self.administrator_locator)
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/assessment/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,25 @@ def test_job_crawler(ws, make_job, inventory_schema, sql_backend):
assert int(results[0].job_id) == new_job.job_id


def test_job_crawler_excludes_job(ws, make_job, inventory_schema, sql_backend) -> None:
"""Test if the job crawler can exclude a job."""
new_job = make_job(spark_conf=_SPARK_CONF)
skip_job = make_job(spark_conf=_SPARK_CONF)
job_crawler = JobsCrawler(
ws,
sql_backend,
inventory_schema,
# Adding the skip job to the `include_job_ids` scope the crawler from not crawling all jobs while still testing
# the exclude job behaviour
include_job_ids=[new_job.job_id, skip_job.job_id],
exclude_job_ids=[skip_job.job_id],
)

jobs = job_crawler.snapshot()

assert not any(job.job_id == str(skip_job.job_id) for job in jobs)


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_job_run_crawler(ws, env_or_skip, inventory_schema, sql_backend):
cluster_id = env_or_skip("TEST_DEFAULT_CLUSTER_ID")
Expand Down
7 changes: 3 additions & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,10 +631,9 @@ def make_linting_resources(self) -> None:
self.make_job(content="spark.table('old.stuff')")
self.make_job(content="spark.read.parquet('dbfs://mnt/file/')", task_type=SparkPythonTask)
self.make_job(content="spark.table('some.table')", task_type=SparkPythonTask)
query_1 = self.make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`')
self._make_dashboard(query=query_1)
query_2 = self.make_query(sql_query='SELECT * from my_schema.my_table')
self._make_dashboard(query=query_2)
query_1 = self.make_query(sql_query="SELECT * from parquet.`dbfs://mnt/foo2/bar2`")
self.make_dashboard(query=query_1)
self.make_lakeview_dashboard(query="SELECT * from my_schema.my_table")

def add_table(self, table: TableInfo):
self._tables.append(table)
Expand Down
56 changes: 34 additions & 22 deletions tests/integration/hive_metastore/test_grant_detail.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import logging
import datetime as dt
from collections.abc import Callable, Iterable

import pytest
from databricks.labs.lsql.backends import StatementExecutionBackend
from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried

from databricks.labs.lsql.backends import StatementExecutionBackend
from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler
from databricks.labs.ucx.install import deploy_schema

from ..conftest import MockRuntimeContext


logger = logging.getLogger(__name__)


Expand All @@ -23,42 +25,52 @@ def _deployed_schema(runtime_ctx) -> None:


@retried(on=[NotFound, TimeoutError], timeout=dt.timedelta(minutes=3))
def test_all_grant_types(
runtime_ctx: MockRuntimeContext, sql_backend: StatementExecutionBackend, _deployed_schema: None
):
"""Test that all types of grants are properly handled by the view when reporting the object type and identifier."""
def test_all_grant_types(runtime_ctx: MockRuntimeContext, _deployed_schema: None):
"""All types of grants should be reported by the grant_detail view."""

# Fixture: a group and schema to hold all the objects, the objects themselves and a grant on each to the group.
group = runtime_ctx.make_group()
schema = runtime_ctx.make_schema()
table = runtime_ctx.make_table(schema_name=schema.name)
view = runtime_ctx.make_table(schema_name=schema.name, view=True, ctas="select 1")
udf = runtime_ctx.make_udf(schema_name=schema.name)
sql_backend.execute(f"GRANT SELECT ON CATALOG {schema.catalog_name} TO `{group.display_name}`")
sql_backend.execute(f"GRANT SELECT ON SCHEMA {schema.full_name} TO `{group.display_name}`")
sql_backend.execute(f"GRANT SELECT ON TABLE {table.full_name} TO `{group.display_name}`")
sql_backend.execute(f"GRANT SELECT ON VIEW {view.full_name} TO `{group.display_name}`")
sql_backend.execute(f"GRANT SELECT ON FUNCTION {udf.full_name} TO `{group.display_name}`")
sql_backend.execute(f"GRANT SELECT ON ANY FILE TO `{group.display_name}`")
sql_backend.execute(f"GRANT SELECT ON ANONYMOUS FUNCTION TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON CATALOG {schema.catalog_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON SCHEMA {schema.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON TABLE {table.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON VIEW {view.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON FUNCTION {udf.full_name} TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON ANY FILE TO `{group.display_name}`")
runtime_ctx.sql_backend.execute(f"GRANT SELECT ON ANONYMOUS FUNCTION TO `{group.display_name}`")

@retried(on=[ValueError], timeout=dt.timedelta(minutes=2))
def wait_for_grants(condition: Callable[[Iterable[Grant]], bool], **kwargs) -> None:
"""Wait for grants to meet the condition.
The method retries the condition check to account for eventual consistency of the permission API.
"""
grants = runtime_ctx.grants_crawler.grants(**kwargs)
if not condition(grants):
raise ValueError("Grants do not meet condition")

# Ensure the view is populated (it's based on the crawled grants) and fetch the content.
GrantsCrawler(runtime_ctx.tables_crawler, runtime_ctx.udfs_crawler).snapshot()
def contains_select_on_any_file(grants: Iterable[Grant]) -> bool:
"""Check if the SELECT permission on ANY FILE is present in the grants."""
return any(g.principal == group.display_name and g.action_type == "SELECT" for g in grants)

# Wait for the grants to be available so that we can snapshot them.
# Only verifying the SELECT permission on ANY FILE as it takes a while to propagate.
wait_for_grants(contains_select_on_any_file, any_file=True)

rows = list(
sql_backend.fetch(
f"""
runtime_ctx.grants_crawler.snapshot()

grants_detail_query = f"""
SELECT object_type, object_id
FROM {runtime_ctx.inventory_database}.grant_detail
WHERE principal_type='group' AND principal='{group.display_name}' and action_type='SELECT'
"""
)
)
grants = {(row.object_type, row.object_id) for row in rows}
grants = {(row.object_type, row.object_id) for row in runtime_ctx.sql_backend.fetch(grants_detail_query)}

# TODO: The types of objects targeted by grants is missclassified; this needs to be fixed.

# Test the results.
expected_grants = {
("TABLE", table.full_name),
("VIEW", view.full_name),
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def mock_workspace_client(
ws.pipelines.get = _pipeline
ws.workspace.get_status = lambda _: ObjectInfo(object_id=123)
ws.get_workspace_id.return_value = 123
ws.jobs.list.return_value = _id_list(BaseJob, job_ids)
ws.jobs.list.return_value = iter(_id_list(BaseJob, job_ids))
ws.jobs.list_runs.return_value = _id_list(BaseRun, jobruns_ids)
ws.warehouses.get_workspace_warehouse_config().data_access_config = _load_list(EndpointConfPair, warehouse_config)
ws.workspace.export = _workspace_export
Expand Down Expand Up @@ -193,6 +193,13 @@ def mock_workspace_client(
{'workspace_id': 789, 'deployment_name': 'test3'},
]
),
'state.json': json.dumps(
{
'resources': {
'jobs': {'test': '123', 'assessment': '456'},
}
}
),
}
ws.workspace.download.side_effect = lambda file_name, *, format=None: io.StringIO(
download_yaml[os.path.basename(file_name)]
Expand Down
79 changes: 75 additions & 4 deletions tests/unit/assessment/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ def test_jobs_assessment_with_spn_cluster_no_job_tasks():

def test_job_crawler_creator():
ws = mock_workspace_client()
ws.jobs.list.return_value = (
BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None),
BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""),
BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"),
ws.jobs.list.return_value = iter(
[
BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None),
BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""),
BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"),
]
)
result = JobsCrawler(ws, MockBackend(), "ucx").snapshot(force_refresh=True)

Expand All @@ -80,6 +82,75 @@ def test_job_crawler_creator():
assert set(expected_creators) == set(crawled_creators)


def test_job_crawler_skips_all_jobs_with_empty_include_job_ids() -> None:
"""If `include_job_ids` is empty, all jobs should be skipped."""
ws = mock_workspace_client()
ws.jobs.list.return_value = iter(
[
BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None),
BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""),
BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"),
]
)

result = JobsCrawler(ws, MockBackend(), "ucx", include_job_ids=[]).snapshot(force_refresh=True)

assert not result


def test_job_crawler_include_job_ids() -> None:
"""Only jobs with IDs in `include_job_ids` should be crawled."""

ws = mock_workspace_client()
ws.jobs.list.return_value = iter(
[
BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None),
BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""),
BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"),
]
)

result = JobsCrawler(ws, MockBackend(), "ucx", include_job_ids=[1]).snapshot(force_refresh=True)

assert result == [JobInfo(job_id="1", success=1, failures="[]", job_name="Unknown")]


def test_job_crawler_exclude_job_ids() -> None:
"""The jobs with IDs in `exclude_job_ids` should be skipped."""

ws = mock_workspace_client()
ws.jobs.list.return_value = iter(
[
BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None),
BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""),
BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"),
]
)

result = JobsCrawler(ws, MockBackend(), "ucx", exclude_job_ids=[2, 3]).snapshot(force_refresh=True)

assert result == [JobInfo(job_id="1", success=1, failures="[]", job_name="Unknown")]


def test_job_crawler_exclude_job_ids_takes_preference_over_include_job_ids() -> None:
"""The jobs with IDs in `exclude_job_ids` should be skipped, also when they are in include_job_ids."""

ws = mock_workspace_client()
ws.jobs.list.return_value = iter(
[
BaseJob(job_id=1, settings=JobSettings(), creator_user_name=None),
BaseJob(job_id=2, settings=JobSettings(), creator_user_name=""),
BaseJob(job_id=3, settings=JobSettings(), creator_user_name="bob"),
]
)

result = JobsCrawler(ws, MockBackend(), "ucx", include_job_ids=[1, 2], exclude_job_ids=[2, 3]).snapshot(
force_refresh=True
)

assert result == [JobInfo(job_id="1", success=1, failures="[]", job_name="Unknown")]


@pytest.mark.parametrize(
"jobruns_ids,cluster_ids,run_ids,failures",
[
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/hive_metastore/test_pipeline_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_migrate_pipelines_no_pipelines(ws) -> None:
pipelines_crawler = PipelinesCrawler(ws, sql_backend, "inventory_database")
jobs_crawler = JobsCrawler(ws, sql_backend, "inventory_database")
pipelines_migrator = PipelinesMigrator(ws, pipelines_crawler, jobs_crawler, "catalog_name")
ws.jobs.list.return_value = [BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)]
ws.jobs.list.return_value = iter([BaseJob(job_id=536591785949415), BaseJob(), BaseJob(job_id=536591785949417)])
pipelines_migrator.migrate_pipelines()


Expand Down

0 comments on commit 5998451

Please sign in to comment.