Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,23 +532,31 @@ def stop_pipeline(
instance_url: str,
namespace: str = "default",
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
run_id: str | None = None,
) -> None:
"""
Stop a Cloud Data Fusion pipeline. Works for both batch and stream pipelines.

:param pipeline_name: Your pipeline name.
:param instance_url: Endpoint on which the REST APIs is accessible for the instance.
:param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID
:param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise edition instance, you
can create a namespace.
:param pipeline_type: Can be either BATCH or STREAM.
:param run_id: The specific run_id to stop execution if available; when absent it will stop all runs under pipeline_name.
"""
url = os.path.join(
base_stop_url = os.path.join(
self._base_url(instance_url, namespace),
quote(pipeline_name),
self.cdap_program_type(pipeline_type=pipeline_type),
self.cdap_program_id(pipeline_type=pipeline_type),
"stop",
)

if run_id:
url = os.path.join(base_stop_url, "runs", quote(str(run_id)), "stop")
else:
url = os.path.join(base_stop_url, "stop")

response = self._cdap_request(url=url, method="POST")
self._check_response_status_and_data(
response, f"Stopping a pipeline failed with code {response.status}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator):

:param pipeline_name: Your pipeline name.
:param instance_name: The name of the instance.
:param pipeline_type: Can be either BATCH or STREAM.
:param location: The Cloud Data Fusion location in which to handle the request.
:param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise edition instance, you
Expand All @@ -916,26 +917,25 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param run_id: The specific run_id to stop execution if available; when absent it will stop all runs under pipeline_name.
"""

template_fields: Sequence[str] = (
"instance_name",
"pipeline_name",
"impersonation_chain",
)
template_fields: Sequence[str] = ("instance_name", "pipeline_name", "impersonation_chain", "run_id")
operator_extra_links = (DataFusionPipelineLink(),)

def __init__(
self,
*,
pipeline_name: str,
instance_name: str,
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
location: str,
namespace: str = "default",
project_id: str = PROVIDE_PROJECT_ID,
api_version: str = "v1beta1",
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
run_id: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -947,6 +947,8 @@ def __init__(
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.run_id = run_id
self.pipeline_type = pipeline_type

def execute(self, context: Context) -> None:
hook = DataFusionHook(
Expand All @@ -970,7 +972,23 @@ def execute(self, context: Context) -> None:
)
hook.stop_pipeline(
pipeline_name=self.pipeline_name,
pipeline_type=self.pipeline_type,
instance_url=api_url,
namespace=self.namespace,
)
self.log.info("Pipeline stopped")
run_id=self.run_id,
)
if self.run_id:
self.log.info(
"Stopped Cloud Data Fusion pipeline '%s' (namespace: '%s') on instance '%s'. Terminated run id: '%s'.",
self.pipeline_name,
self.namespace,
self.instance_name,
self.run_id,
)
else:
self.log.info(
"Stopped Cloud Data Fusion pipeline '%s' (namespace: '%s') on instance '%s'.",
self.pipeline_name,
self.namespace,
self.instance_name,
)
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,16 @@ def test_stop_pipeline(self, mock_request, hook):
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_with_run_id(self, mock_request, hook):
mock_request.return_value.status = 200
hook.stop_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, run_id="eaf-2fr-4rf")
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"workflows/DataPipelineWorkflow/runs/eaf-2fr-4rf/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_empty_data_response(self, mock_request, hook):
mock_request.return_value.status = 200
Expand All @@ -481,6 +491,27 @@ def test_stop_pipeline_should_fail_if_empty_data_response(self, mock_request, ho
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_empty_data_response_with_run_id(self, mock_request, hook):
mock_request.return_value.status = 200
mock_request.return_value.data = None
with pytest.raises(
ValueError,
match=r"Empty response received. Please, check for possible root causes "
r"of this behavior either in DAG code or on Cloud DataFusion side",
):
hook.stop_pipeline(
pipeline_name=PIPELINE_NAME,
instance_url=INSTANCE_URL,
pipeline_type=DataFusionPipelineType.STREAM,
run_id="eaf-2fr-4rf",
)
mock_request.assert_called_once_with(
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/"
f"spark/DataStreamsSparkStreaming/runs/eaf-2fr-4rf/stop",
method="POST",
)

@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_stop_pipeline_should_fail_if_status_not_200(self, mock_request, hook):
mock_request.return_value.status = 404
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,39 @@ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook):
)

mock_hook.return_value.stop_pipeline.assert_called_once_with(
instance_url=INSTANCE_URL, pipeline_name=PIPELINE_NAME, namespace=NAMESPACE
instance_url=INSTANCE_URL,
pipeline_name=PIPELINE_NAME,
namespace=NAMESPACE,
pipeline_type=DataFusionPipelineType.BATCH,
run_id=None,
)

@mock.patch(HOOK_STR)
def test_execute_check_hook_call_should_execute_successfully_with_runId(self, mock_hook):
mock_hook.return_value.get_instance.return_value = {
"apiEndpoint": INSTANCE_URL,
"serviceEndpoint": INSTANCE_URL,
}
op = CloudDataFusionStopPipelineOperator(
task_id="test_tasks",
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
namespace=NAMESPACE,
location=LOCATION,
project_id=PROJECT_ID,
run_id="sample-run-id",
)
op.execute(context=mock.MagicMock())
mock_hook.return_value.get_instance.assert_called_once_with(
instance_name=INSTANCE_NAME, location=LOCATION, project_id=PROJECT_ID
)

mock_hook.return_value.stop_pipeline.assert_called_once_with(
instance_url=INSTANCE_URL,
pipeline_name=PIPELINE_NAME,
pipeline_type=DataFusionPipelineType.BATCH,
namespace=NAMESPACE,
run_id="sample-run-id",
)


Expand Down