Skip to content

Commit

Permalink
Checking pipeline cluster config and cluster policy in 'crawl_pipelin…
Browse files Browse the repository at this point in the history
…es' task (#864)
  • Loading branch information
prajin-29 authored Feb 9, 2024
1 parent 0d83215 commit 6496b3b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 28 deletions.
10 changes: 10 additions & 0 deletions src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]:
pipeline_config = pipeline_response.spec.configuration
if pipeline_config:
failures.extend(self.check_spark_conf(pipeline_config, "pipeline"))
pipeline_cluster = pipeline_response.spec.clusters
if pipeline_cluster:
for cluster in pipeline_cluster:
if cluster.spark_conf:
failures.extend(self.check_spark_conf(cluster.spark_conf, "pipeline cluster"))
# Checking if cluster config is present in cluster policies
if cluster.policy_id:
failures.extend(self._check_cluster_policy(cluster.policy_id, "pipeline cluster"))
if cluster.init_scripts:
failures.extend(self._check_cluster_init_script(cluster.init_scripts, "pipeline cluster"))

pipeline_info.failures = json.dumps(failures)
if len(failures) > 0:
Expand Down
37 changes: 36 additions & 1 deletion tests/unit/assessment/pipelines/spec-with-spn.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
{
"spec": {
"clusters": [],
"clusters": [{
"autoscale": {
"max_workers": 6,
"min_workers": 1
},
"custom_tags": {
"cluster_type": "default"
},
"label": "default",
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/users/test@test.com/init_scripts/test.sh"
}
}
],
"node_type_id": "Standard_F4s",
"num_workers": 1,
"policy_id": "single-user-with-spn",
"spark_conf": {
"spark.databricks.delta.preview.enabled": "true"
}
},
{
"autoscale": {
"max_workers": 6,
"min_workers": 1
},
"custom_tags": {
"cluster_type": "default"
},
"label": "default",
"init_scripts": [],
"node_type_id": "Standard_F4s",
"num_workers": 1
}],
"configuration": {
"spark.hadoop.fs.azure.account.oauth2.client.id.newstorageacct.dfs.core.windows.net": "pipeline_dummy_application_id",
"spark.hadoop.fs.azure.account.oauth2.client.endpoint.newstorageacct.dfs.core.windows.net": "https://login.microsoftonline.com/directory_12345/oauth2/token",
Expand Down
51 changes: 24 additions & 27 deletions tests/unit/assessment/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,51 @@
from databricks.labs.ucx.assessment.pipelines import PipelineInfo, PipelinesCrawler

from ..framework.mocks import MockBackend
from . import workspace_client_mock


def test_pipeline_assessment_with_config(mocker):
def test_pipeline_assessment_with_config():
sample_pipelines = [
PipelineStateInfo(
cluster_id=None,
creator_user_name="abcde.defgh@databricks.com",
latest_updates=None,
name="New DLT Pipeline",
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407",
pipeline_id="spec-with-spn",
run_as_user_name="abcde.defgh@databricks.com",
state=PipelineState.IDLE,
)
]

ws = Mock()
config_dict = {
"spark.hadoop.fs.azure.account.auth.type.abcde.dfs.core.windows.net": "SAS",
"spark.hadoop.fs.azure.sas.token.provider.type.abcde.dfs."
"core.windows.net": "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider",
"spark.hadoop.fs.azure.sas.fixed.token.abcde.dfs.core.windows.net": "{{secrets/abcde_access/sasFixedToken}}",
}
ws.pipelines.get().spec.configuration = config_dict
ws = workspace_client_mock(clusters="job-source-cluster.json")
ws.workspace.export().content = "JXNoCmVjaG8gIj0="
ws.dbfs.read().data = "JXNoCmVjaG8gIj0="

crawler = PipelinesCrawler(ws, MockBackend(), "ucx")._assess_pipelines(sample_pipelines)
ws.pipelines.list_pipelines.return_value = sample_pipelines
crawler = PipelinesCrawler(ws, MockBackend(), "ucx").snapshot()
result_set = list(crawler)

assert len(result_set) == 1
assert result_set[0].success == 0


def test_pipeline_assessment_without_config(mocker):
def test_pipeline_assessment_without_config():
sample_pipelines = [
PipelineStateInfo(
cluster_id=None,
creator_user_name="abcde.defgh@databricks.com",
latest_updates=None,
name="New DLT Pipeline",
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7497",
pipeline_id="empty-spec",
run_as_user_name="abcde.defgh@databricks.com",
state=PipelineState.IDLE,
)
]
ws = Mock()
config_dict = {}
ws.pipelines.get().spec.configuration = config_dict
crawler = PipelinesCrawler(ws, MockBackend(), "ucx")._assess_pipelines(sample_pipelines)
ws = workspace_client_mock(clusters="job-source-cluster.json")
ws.workspace.export().content = "JXNoCmVjaG8gIj0="
ws.dbfs.read().data = "JXNoCmVjaG8gIj0="
ws.pipelines.list_pipelines.return_value = sample_pipelines
crawler = PipelinesCrawler(ws, MockBackend(), "ucx").snapshot()
result_set = list(crawler)

assert len(result_set) == 1
Expand All @@ -69,7 +67,7 @@ def test_pipeline_snapshot_with_config():
failures="",
)
]
mock_ws = Mock()
mock_ws = workspace_client_mock(clusters="job-source-cluster.json")
crawler = PipelinesCrawler(mock_ws, MockBackend(), "ucx")
crawler._try_fetch = Mock(return_value=[])
crawler._crawl = Mock(return_value=sample_pipelines)
Expand All @@ -85,16 +83,14 @@ def test_pipeline_list_with_no_config():
PipelineInfo(
creator_name="abcde.defgh@databricks.com",
pipeline_name="New DLT Pipeline",
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7497",
pipeline_id="empty-spec",
success=1,
failures="",
)
]
mock_ws = Mock()
mock_ws = workspace_client_mock(clusters="no-spark-conf.json")
mock_ws.pipelines.list_pipelines.return_value = sample_pipelines
config_dict = {"spark.hadoop.fs.azure1.account.oauth2.client.id.abcde.dfs.core.windows.net": "wewewerty"}
mock_ws.pipelines.get().spec.configuration = config_dict
crawler = AzureServicePrincipalCrawler(mock_ws, MockBackend(), "ucx")._list_all_pipeline_with_spn_in_spark_conf()
crawler = AzureServicePrincipalCrawler(mock_ws, MockBackend(), "ucx").snapshot()

assert len(crawler) == 0

Expand All @@ -106,22 +102,23 @@ def test_pipeline_without_owners_should_have_empty_creator_name():
creator_user_name=None,
latest_updates=None,
name="New DLT Pipeline",
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407",
pipeline_id="empty-spec",
run_as_user_name="abcde.defgh@databricks.com",
state=PipelineState.IDLE,
)
]

ws = Mock()
ws = workspace_client_mock(clusters="no-spark-conf.json")
ws.pipelines.list_pipelines.return_value = sample_pipelines
ws.pipelines.get().spec.configuration = {}
ws.workspace.export().content = "JXNoCmVjaG8gIj0="
ws.dbfs.read().data = "JXNoCmVjaG8gIj0="
mockbackend = MockBackend()
PipelinesCrawler(ws, mockbackend, "ucx").snapshot()
result = mockbackend.rows_written_for("hive_metastore.ucx.pipelines", "append")

assert result == [
PipelineInfo(
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407",
pipeline_id="empty-spec",
pipeline_name="New DLT Pipeline",
creator_name=None,
success=1,
Expand Down

0 comments on commit 6496b3b

Please sign in to comment.