diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index fe23e42fa0..844cb11785 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -94,9 +94,12 @@ def _job_clusters(job: BaseJob) -> Iterable[tuple[BaseJob, ClusterSpec]]: class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin): - def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema): + def __init__( + self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, *, include_job_ids: list[int] | None = None + ): super().__init__(sql_backend, "hive_metastore", schema, "jobs", JobInfo) self._ws = ws + self._include_job_ids = include_job_ids def _crawl(self) -> Iterable[JobInfo]: all_jobs = list(self._ws.jobs.list(expand_tasks=True)) @@ -109,6 +112,9 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> Iterable[ job_id = job.job_id if not job_id: continue + if self._include_job_ids is not None and job_id not in self._include_job_ids: + logger.info(f"Skipping job_id={job_id}") + continue cluster_details = ClusterDetails.from_dict(cluster_config.as_dict()) cluster_failures = self._check_cluster_failures(cluster_details, "Job cluster") cluster_failures.extend(self._check_jar_task(job.settings.tasks)) diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index fb0549e5e7..20c4f81c42 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -278,7 +278,12 @@ def tables_crawler(self) -> TablesCrawler: @cached_property def jobs_crawler(self) -> JobsCrawler: - return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database) + return JobsCrawler( + self.workspace_client, + self.sql_backend, + self.inventory_database, + include_job_ids=self.config.include_job_ids, + ) @cached_property def table_ownership(self) -> TableOwnership: diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 298d3b0ffa..bcd0555b95 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -70,7 +70,12 @@ def installation(self) -> Installation: @cached_property def jobs_crawler(self) -> JobsCrawler: - return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database) + return JobsCrawler( + self.workspace_client, + self.sql_backend, + self.inventory_database, + include_job_ids=self.config.include_job_ids, + ) @cached_property def job_ownership(self) -> JobOwnership: diff --git a/tests/integration/assessment/test_jobs.py b/tests/integration/assessment/test_jobs.py index f2e48fd460..a3bfc9c59b 100644 --- a/tests/integration/assessment/test_jobs.py +++ b/tests/integration/assessment/test_jobs.py @@ -15,7 +15,9 @@ @retried(on=[NotFound], timeout=timedelta(minutes=5)) def test_job_crawler(ws, make_job, inventory_schema, sql_backend): new_job = make_job(spark_conf=_SPARK_CONF) - job_crawler = JobsCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) + skip_job = make_job(spark_conf=_SPARK_CONF) + + job_crawler = JobsCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema, include_job_ids=[new_job.job_id]) jobs = job_crawler.snapshot() results = [] for job in jobs: @@ -23,6 +25,8 @@ def test_job_crawler(ws, make_job, inventory_schema, sql_backend): continue if int(job.job_id) == new_job.job_id: results.append(job) + if int(job.job_id) == skip_job.job_id: + assert False, "Job should have been skipped" assert len(results) >= 1 assert int(results[0].job_id) == new_job.job_id