Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let WorkflowLinter.refresh_report lint jobs from JobsCrawler #3732

Merged
merged 2 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def assess_dashboards(self, ctx: RuntimeContext):
"""
ctx.query_linter.refresh_report()

@job_task
@job_task(depends_on=[assess_jobs])
def assess_workflows(self, ctx: RuntimeContext):
"""Scans all jobs for migration issues in notebooks jobs.

Expand Down
3 changes: 1 addition & 2 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,13 +584,12 @@ def dependency_resolver(self) -> DependencyResolver:
def workflow_linter(self) -> WorkflowLinter:
return WorkflowLinter(
self.workspace_client,
self.jobs_crawler,
self.dependency_resolver,
self.path_lookup,
TableMigrationIndex([]), # TODO: bring back self.tables_migrator.index()
self.directfs_access_crawler_for_paths,
self.used_tables_crawler_for_paths,
self.config.include_job_ids,
self.config.debug_listing_upper_limit,
)

@cached_property
Expand Down
19 changes: 4 additions & 15 deletions src/databricks/labs/ucx/source_code/linters/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from databricks.sdk.errors import NotFound
from databricks.sdk.service import jobs

from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.source_code.base import (
DirectFsAccess,
Expand Down Expand Up @@ -40,37 +41,25 @@ class WorkflowLinter:
def __init__(
self,
ws: WorkspaceClient,
jobs_crawler: JobsCrawler,
resolver: DependencyResolver,
path_lookup: PathLookup,
migration_index: TableMigrationIndex,
directfs_crawler: DirectFsAccessCrawler,
used_tables_crawler: UsedTablesCrawler,
include_job_ids: list[int] | None = None,
debug_listing_upper_limit: int | None = None,
):
self._ws = ws
self._jobs_crawler = jobs_crawler
self._resolver = resolver
self._path_lookup = path_lookup
self._migration_index = migration_index
self._directfs_crawler = directfs_crawler
self._used_tables_crawler = used_tables_crawler
self._include_job_ids = include_job_ids
self._debug_listing_upper_limit = debug_listing_upper_limit

def refresh_report(self, sql_backend: SqlBackend, inventory_database: str) -> None:
tasks = []
items_listed = 0
for job in self._ws.jobs.list():
if self._include_job_ids is not None and job.job_id not in self._include_job_ids:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we replicate the filtering capability with the Job Crawler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.info(f"Skipping job_id={job.job_id}")
continue
if self._debug_listing_upper_limit is not None and items_listed >= self._debug_listing_upper_limit:
logger.warning(f"Debug listing limit reached: {self._debug_listing_upper_limit}")
break
if job.settings is not None and job.settings.name is not None:
logger.info(f"Found job_id={job.job_id}: {job.settings.name}")
for job in self._jobs_crawler.snapshot():
tasks.append(functools.partial(self.lint_job, job.job_id))
items_listed += 1
logger.info(f"Running {len(tasks)} linting tasks in parallel...")
job_results, errors = Threads.gather('linting workflows', tasks)
job_problems: list[JobProblem] = []
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ def test_running_real_assessment_job(
assert actual_tables == expected_tables

query = f"SELECT * FROM {installation_ctx.inventory_database}.workflow_problems"
for row in sql_backend.fetch(query):
assert row['path'] != 'UNKNOWN'
workflow_problems_without_path = [problem for problem in sql_backend.fetch(query) if problem["path"] == "UNKNOWN"]
assert not workflow_problems_without_path
8 changes: 3 additions & 5 deletions tests/integration/source_code/test_directfs_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ def test_lakeview_query_dfsa_ownership(runtime_ctx) -> None:

def test_path_dfsa_ownership(
runtime_ctx,
make_notebook,
make_job,
make_directory,
inventory_schema,
sql_backend,
Expand All @@ -88,18 +86,18 @@ def test_path_dfsa_ownership(

# A job with a notebook task that contains direct filesystem access.
notebook_source = b"display(spark.read.csv('/mnt/things/e/f/g'))"
notebook = make_notebook(path=f"{make_directory()}/notebook.py", content=notebook_source)
job = make_job(notebook_path=notebook)
notebook = runtime_ctx.make_notebook(path=f"{make_directory()}/notebook.py", content=notebook_source)
runtime_ctx.make_job(notebook_path=notebook)

# Produce a DFSA record for the job.
linter = WorkflowLinter(
runtime_ctx.workspace_client,
runtime_ctx.jobs_crawler,
runtime_ctx.dependency_resolver,
runtime_ctx.path_lookup,
TableMigrationIndex([]),
runtime_ctx.directfs_access_crawler_for_paths,
runtime_ctx.used_tables_crawler_for_paths,
include_job_ids=[job.job_id],
)
linter.refresh_report(sql_backend, inventory_schema)

Expand Down
45 changes: 30 additions & 15 deletions tests/unit/source_code/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
from unittest.mock import create_autospec

import pytest
from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath
from databricks.labs.lsql.backends import MockBackend
from databricks.sdk.service.compute import LibraryInstallStatus
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service import compute, jobs
from databricks.sdk.service.jobs import Job, SparkPythonTask
from databricks.sdk.service.pipelines import NotebookLibrary, GetPipelineResponse, PipelineLibrary, FileLibrary
from databricks.sdk.service.pipelines import (
GetPipelineResponse,
FileLibrary,
NotebookLibrary,
PipelineLibrary,
PipelineSpec,
)
from databricks.sdk.service.workspace import ExportFormat, Language, ObjectInfo

from databricks.labs.blueprint.paths import DBFSPath, WorkspacePath
from databricks.labs.ucx.assessment.jobs import JobsCrawler
from databricks.labs.ucx.source_code.base import CurrentSessionState
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service import compute, jobs, pipelines
from databricks.sdk.service.workspace import ExportFormat, ObjectInfo, Language

from databricks.labs.ucx.source_code.files import FileLoader, ImportFileResolver
from databricks.labs.ucx.source_code.graph import (
Dependency,
Expand All @@ -27,7 +31,8 @@
)
from databricks.labs.ucx.source_code.jobs import JobProblem, WorkflowTaskContainer
from databricks.labs.ucx.source_code.linters.jobs import WorkflowLinter
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, NotebookResolver
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler


Expand Down Expand Up @@ -228,10 +233,17 @@ def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_l
expected_message = "Found job problems:\nUNKNOWN:-1 [library-install-failed] 'pip --disable-pip-version-check install unknown-library"

ws = create_autospec(WorkspaceClient)
jobs_crawler = create_autospec(JobsCrawler)
directfs_crawler = create_autospec(DirectFsAccessCrawler)
used_tables_crawler = create_autospec(UsedTablesCrawler)
linter = WorkflowLinter(
ws, dependency_resolver, mock_path_lookup, empty_index, directfs_crawler, used_tables_crawler
ws,
jobs_crawler,
dependency_resolver,
mock_path_lookup,
empty_index,
directfs_crawler,
used_tables_crawler,
)

libraries = [compute.Library(pypi=compute.PythonPyPiLibrary(package="unknown-library-name"))]
Expand All @@ -243,6 +255,7 @@ def test_workflow_linter_lint_job_logs_problems(dependency_resolver, mock_path_l
with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.source_code.jobs"):
linter.lint_job(1234)

jobs_crawler.assert_not_called() # Only called through refresh_report
directfs_crawler.assert_not_called()
used_tables_crawler.assert_not_called()
assert any(message.startswith(expected_message) for message in caplog.messages), caplog.messages
Expand Down Expand Up @@ -326,7 +339,7 @@ def test_workflow_task_container_with_existing_cluster_builds_dependency_graph_p
whl=None,
),
messages=None,
status=LibraryInstallStatus.PENDING,
status=compute.LibraryInstallStatus.PENDING,
)
]

Expand Down Expand Up @@ -446,7 +459,7 @@ def test_workflow_linter_dlt_pipeline_task(graph) -> None:
ws.pipelines.get.return_value = GetPipelineResponse(
pipeline_id=pipeline.pipeline_id,
name="test-pipeline",
spec=pipelines.PipelineSpec(continuous=False),
spec=PipelineSpec(continuous=False),
)

workflow_task_container = WorkflowTaskContainer(ws, task, Job())
Expand All @@ -456,7 +469,7 @@ def test_workflow_linter_dlt_pipeline_task(graph) -> None:
ws.pipelines.get.return_value = GetPipelineResponse(
pipeline_id=pipeline.pipeline_id,
name="test-pipeline",
spec=pipelines.PipelineSpec(
spec=PipelineSpec(
libraries=[
PipelineLibrary(
jar="some.jar",
Expand Down Expand Up @@ -549,19 +562,21 @@ def test_workflow_linter_refresh_report(dependency_resolver, mock_path_lookup, m
ws.jobs.get.return_value = Job(job_id=2, settings=settings)

sql_backend = MockBackend()
jobs_crawler = create_autospec(JobsCrawler)
directfs_crawler = DirectFsAccessCrawler.for_paths(sql_backend, "test")
used_tables_crawler = UsedTablesCrawler.for_paths(sql_backend, "test")
linter = WorkflowLinter(
ws,
jobs_crawler,
dependency_resolver,
mock_path_lookup,
migration_index,
directfs_crawler,
used_tables_crawler,
[1],
)
linter.refresh_report(sql_backend, 'test')

jobs_crawler.snapshot.assert_called_once()
sql_backend.has_rows_written_for('test.workflow_problems')
sql_backend.has_rows_written_for('hive_metastore.test.used_tables_in_paths')
sql_backend.has_rows_written_for('hive_metastore.test.directfs_in_paths')
Loading