From 2ee24c369ce4a1b8f0eded4ebdca9aa5a4070b5c Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sat, 31 Jan 2026 19:47:16 +0530 Subject: [PATCH 01/13] feat: datafusion: stop_pipeline() to support targeted stopping program when runId is provided --- .../google/cloud/hooks/datafusion.py | 13 ++++++-- .../google/cloud/hooks/test_datafusion.py | 31 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index b30da5271467a..abb4aad9e0a0e 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -532,6 +532,7 @@ def stop_pipeline( instance_url: str, namespace: str = "default", pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH, + run_id: str = None, ) -> None: """ Stop a Cloud Data Fusion pipeline. Works for both batch and stream pipelines. @@ -541,14 +542,22 @@ def stop_pipeline( :param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. + :pipeline_type: Can be either BATCH or STREAM + :param run_id : The specific runId to stop execution if available , when absent It will stop all Runs under App: pipeline_name """ - url = os.path.join( + base_stop_url = os.path.join( self._base_url(instance_url, namespace), quote(pipeline_name), self.cdap_program_type(pipeline_type=pipeline_type), self.cdap_program_id(pipeline_type=pipeline_type), - "stop", ) + + if run_id: + url = os.path.join(base_stop_url, "runs", quote(str(run_id)), "stop") + + else: + url = os.path.join(base_stop_url, "stop") + response = self._cdap_request(url=url, method="POST") self._check_response_status_and_data( response, f"Stopping a pipeline failed with code {response.status}" diff --git a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py index 734e9b595db1f..2998912b0cbdc 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py @@ -465,6 +465,16 @@ def test_stop_pipeline(self, mock_request, hook): method="POST", ) + @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request")) + def test_stop_pipeline_with_run_id(self, mock_request, hook): + mock_request.return_value.status = 200 + hook.stop_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, run_id="eaf-2fr-4rf") + mock_request.assert_called_once_with( + url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/" + f"workflows/DataPipelineWorkflow/runs/eaf-2fr-4rf/stop", + method="POST", + ) + @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request")) def test_stop_pipeline_should_fail_if_empty_data_response(self, mock_request, hook): mock_request.return_value.status = 200 @@ -481,6 +491,27 @@ def test_stop_pipeline_should_fail_if_empty_data_response(self, mock_request, ho method="POST", ) + @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request")) + def test_stop_pipeline_should_fail_if_empty_data_response_with_run_id(self, mock_request, hook): + mock_request.return_value.status = 200 + mock_request.return_value.data = None + with pytest.raises( + ValueError, + match=r"Empty response received. Please, check for possible root causes " + r"of this behavior either in DAG code or on Cloud DataFusion side", + ): + hook.stop_pipeline( + pipeline_name=PIPELINE_NAME, + instance_url=INSTANCE_URL, + pipeline_type=DataFusionPipelineType.STREAM, + run_id="eaf-2fr-4rf", + ) + mock_request.assert_called_once_with( + url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/" + f"spark/DataStreamsSparkStreaming//runs/eaf-2fr-4rf/stop", + method="POST", + ) + @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request")) def test_stop_pipeline_should_fail_if_status_not_200(self, mock_request, hook): mock_request.return_value.status = 404 From 884a2741e2b2ccc7bbe138bdea5e4fbe7125eafe Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sat, 31 Jan 2026 20:19:04 +0530 Subject: [PATCH 02/13] feat: added run_id field to CloudDataFusionStopPipelineOperator --- .../google/cloud/operators/datafusion.py | 32 ++++++++++---- .../cloud/datafusion/example_datafusion.py | 42 +++++++++++++++++-- .../google/tests/system/google/conftest.py | 2 +- .../google/cloud/operators/test_datafusion.py | 27 ++++++++++++ 4 files changed, 91 insertions(+), 12 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py index d59d214d7e362..dfeb7c54f2215 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py @@ -901,6 +901,7 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator): :ref:`howto/operator:CloudDataFusionStopPipelineOperator` :param pipeline_name: Your pipeline name. + :param pipeline_type: Nature of Pipeline, by default BATCH (workflows). :param instance_name: The name of the instance. :param location: The Cloud Data Fusion location in which to handle the request. :param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID @@ -916,13 +917,10 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param run_id: To stop a particular run tagged with provided run_id """ - template_fields: Sequence[str] = ( - "instance_name", - "pipeline_name", - "impersonation_chain", - ) + template_fields: Sequence[str] = ("instance_name", "pipeline_name", "impersonation_chain", "run_id") operator_extra_links = (DataFusionPipelineLink(),) def __init__( @@ -930,12 +928,14 @@ def __init__( *, pipeline_name: str, instance_name: str, + pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH, location: str, namespace: str = "default", project_id: str = PROVIDE_PROJECT_ID, api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + run_id: str = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -947,6 +947,8 @@ def __init__( self.api_version = api_version self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain + self.run_id = run_id + self.pipeline_type = pipeline_type def execute(self, context: Context) -> None: hook = DataFusionHook( @@ -970,7 +972,23 @@ def execute(self, context: Context) -> None: ) hook.stop_pipeline( pipeline_name=self.pipeline_name, + pipeline_type=self.pipeline_type, instance_url=api_url, namespace=self.namespace, - ) - self.log.info("Pipeline stopped") + run_id=self.run_id, + ) + if self.run_id: + self.log.info( + "Stopped Cloud Data Fusion pipeline '%s' (namespace: '%s') on instance '%s'. Terminated run id: '%s'.", + self.pipeline_name, + self.namespace, + self.instance_name, + self.run_id, + ) + else: + self.log.info( + "Stopped Cloud Data Fusion pipeline '%s' (namespace: '%s') on instance '%s'.", + self.pipeline_name, + self.namespace, + self.instance_name, + ) diff --git a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py index 577a20728e55d..b56ae5d42037f 100644 --- a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py +++ b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py @@ -61,19 +61,19 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") LOCATION = "europe-north1" DAG_ID = "datafusion" -INSTANCE_NAME = f"df-{ENV_ID}".replace("_", "-") +INSTANCE_NAME = f"test-instance-{DAG_ID}".replace("_", "-") INSTANCE = { "type": "BASIC", "displayName": INSTANCE_NAME, "dataprocServiceAccount": SERVICE_ACCOUNT, } -BUCKET_NAME_1 = f"bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-") -BUCKET_NAME_2 = f"bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-") +BUCKET_NAME_1 = f"bucket1-{DAG_ID}".replace("_", "-") +BUCKET_NAME_2 = f"bucket2-{DAG_ID}".replace("_", "-") BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}" BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}" -PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-") +PIPELINE_NAME = f"pipe-{DAG_ID}-test".replace("_", "-") PIPELINE = { "artifact": { "name": "cdap-data-pipeline", @@ -271,6 +271,18 @@ def get_artifacts_versions(ti=None): ) # [END howto_cloud_data_fusion_start_pipeline_def] + # [START howto_cloud_data_fusion_start_pipeline_def_sensor] + start_pipeline_def_sensor = CloudDataFusionPipelineStateSensor( + task_id="pipeline_state_sensor_def", + pipeline_name=PIPELINE_NAME, + pipeline_id=start_pipeline_def.output, + expected_statuses=["COMPLETED"], + failure_statuses=["FAILED"], + instance_name=INSTANCE_NAME, + location=LOCATION, + ) + # [END howto_cloud_data_fusion_start_pipeline_def_sensor] + # [START howto_cloud_data_fusion_start_pipeline_async] start_pipeline_async = CloudDataFusionStartPipelineOperator( location=LOCATION, @@ -302,6 +314,25 @@ def get_artifacts_versions(ti=None): ) # [END howto_cloud_data_fusion_stop_pipeline] + # [START howto_cloud_data_fusion_start_pipeline] + start_pipeline_frunid = CloudDataFusionStartPipelineOperator( + location=LOCATION, + pipeline_name=PIPELINE_NAME, + instance_name=INSTANCE_NAME, + pipeline_timeout=1000, + task_id="start_pipeline_for_runid", + ) + + # [START howto_cloud_data_fusion_stop_pipeline_with_runid] + stop_pipeline_wrunid = CloudDataFusionStopPipelineOperator( + location=LOCATION, + pipeline_name=PIPELINE_NAME, + instance_name=INSTANCE_NAME, + task_id="stop_pipeline_wrun_id", + run_id=start_pipeline_frunid.output, + ) + # [END howto_cloud_data_fusion_stop_pipeline] + # [START howto_cloud_data_fusion_delete_pipeline] delete_pipeline = CloudDataFusionDeletePipelineOperator( location=LOCATION, @@ -339,7 +370,10 @@ def get_artifacts_versions(ti=None): >> update_instance >> create_pipeline >> list_pipelines + >> start_pipeline_frunid + >> stop_pipeline_wrunid >> start_pipeline_def + >> start_pipeline_def_sensor >> start_pipeline_async >> start_pipeline_sensor >> start_pipeline diff --git a/providers/google/tests/system/google/conftest.py b/providers/google/tests/system/google/conftest.py index 1c377b7a34880..6d1d5eb3185a0 100644 --- a/providers/google/tests/system/google/conftest.py +++ b/providers/google/tests/system/google/conftest.py @@ -18,7 +18,7 @@ import pytest -from system.openlineage.conftest import set_transport_variable # noqa: F401 +# from system.openlineage.conftest import set_transport_variable REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",) diff --git a/providers/google/tests/unit/google/cloud/operators/test_datafusion.py b/providers/google/tests/unit/google/cloud/operators/test_datafusion.py index 1d816eb395c5b..16b2f5e1b3540 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_datafusion.py +++ b/providers/google/tests/unit/google/cloud/operators/test_datafusion.py @@ -497,6 +497,33 @@ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook): instance_url=INSTANCE_URL, pipeline_name=PIPELINE_NAME, namespace=NAMESPACE ) + @mock.patch(HOOK_STR) + def test_execute_check_hook_call_should_execute_successfully_with_runId(self, mock_hook): + mock_hook.return_value.get_instance.return_value = { + "apiEndpoint": INSTANCE_URL, + "serviceEndpoint": INSTANCE_URL, + } + op = CloudDataFusionStopPipelineOperator( + task_id="test_tasks", + pipeline_name=PIPELINE_NAME, + instance_name=INSTANCE_NAME, + namespace=NAMESPACE, + location=LOCATION, + project_id=PROJECT_ID, + run_id="sample-run-id", + ) + op.execute(context=mock.MagicMock()) + mock_hook.return_value.get_instance.assert_called_once_with( + instance_name=INSTANCE_NAME, location=LOCATION, project_id=PROJECT_ID + ) + + mock_hook.return_value.stop_pipeline.assert_called_once_with( + instance_url=INSTANCE_URL, + pipeline_name=PIPELINE_NAME, + namespace=NAMESPACE, + run_id="sample-run-id", + ) + class TestCloudDataFusionListPipelinesOperator: @mock.patch(HOOK_STR) From 5bf0f754331ffa38b594734da34c18e37a6bfcda Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sat, 31 Jan 2026 20:39:31 +0530 Subject: [PATCH 03/13] feat: added run_id field to CloudDataFusionStopPipelineOperator --- .../src/airflow/providers/google/cloud/operators/datafusion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py index dfeb7c54f2215..2a5b0de0da2fd 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py @@ -935,7 +935,7 @@ def __init__( api_version: str = "v1beta1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - run_id: str = None, + run_id: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) From 071c63fe5a7e82d122b9e8aed44c1d269793d3ce Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sat, 31 Jan 2026 20:44:43 +0530 Subject: [PATCH 04/13] feat: added run_id field to CloudDataFusionStopPipelineOperator --- .../src/airflow/providers/google/cloud/hooks/datafusion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index abb4aad9e0a0e..38a73faf423b9 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -532,7 +532,7 @@ def stop_pipeline( instance_url: str, namespace: str = "default", pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH, - run_id: str = None, + run_id: str | None = None, ) -> None: """ Stop a Cloud Data Fusion pipeline. Works for both batch and stream pipelines. From 48872800c145b7c313a4f7d8c5690bb19f55d370 Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 13:35:45 +0530 Subject: [PATCH 05/13] feat: addressed failing test cases --- .../system/google/cloud/datafusion/example_datafusion.py | 4 ++-- providers/google/tests/system/google/conftest.py | 2 -- .../tests/unit/google/cloud/operators/test_datafusion.py | 7 ++++++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py index b56ae5d42037f..68de9849ad47b 100644 --- a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py +++ b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py @@ -83,8 +83,8 @@ "description": "Data Pipeline Application", "name": PIPELINE_NAME, "config": { - "resources": {"memoryMB": 2048, "virtualCores": 1}, - "driverResources": {"memoryMB": 2048, "virtualCores": 1}, + "resources": {"memoryMB": 1048, "virtualCores": 1}, + "driverResources": {"memoryMB": 1048, "virtualCores": 1}, "connections": [{"from": "GCS", "to": "GCS2"}], "comments": [], "postActions": [], diff --git a/providers/google/tests/system/google/conftest.py b/providers/google/tests/system/google/conftest.py index 6d1d5eb3185a0..cfd74f1387bae 100644 --- a/providers/google/tests/system/google/conftest.py +++ b/providers/google/tests/system/google/conftest.py @@ -18,8 +18,6 @@ import pytest -# from system.openlineage.conftest import set_transport_variable - REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",) diff --git a/providers/google/tests/unit/google/cloud/operators/test_datafusion.py b/providers/google/tests/unit/google/cloud/operators/test_datafusion.py index 16b2f5e1b3540..cf1a162a3ae0e 100644 --- a/providers/google/tests/unit/google/cloud/operators/test_datafusion.py +++ b/providers/google/tests/unit/google/cloud/operators/test_datafusion.py @@ -494,7 +494,11 @@ def test_execute_check_hook_call_should_execute_successfully(self, mock_hook): ) mock_hook.return_value.stop_pipeline.assert_called_once_with( - instance_url=INSTANCE_URL, pipeline_name=PIPELINE_NAME, namespace=NAMESPACE + instance_url=INSTANCE_URL, + pipeline_name=PIPELINE_NAME, + namespace=NAMESPACE, + pipeline_type=DataFusionPipelineType.BATCH, + run_id=None, ) @mock.patch(HOOK_STR) @@ -520,6 +524,7 @@ def test_execute_check_hook_call_should_execute_successfully_with_runId(self, mo mock_hook.return_value.stop_pipeline.assert_called_once_with( instance_url=INSTANCE_URL, pipeline_name=PIPELINE_NAME, + pipeline_type=DataFusionPipelineType.BATCH, namespace=NAMESPACE, run_id="sample-run-id", ) From 4c715f67815241f061385a0b2a5156fe3a4e4b43 Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 13:59:11 +0530 Subject: [PATCH 06/13] feat: imports restored --- providers/google/tests/system/google/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/google/tests/system/google/conftest.py b/providers/google/tests/system/google/conftest.py index cfd74f1387bae..1c377b7a34880 100644 --- a/providers/google/tests/system/google/conftest.py +++ b/providers/google/tests/system/google/conftest.py @@ -18,6 +18,8 @@ import pytest +from system.openlineage.conftest import set_transport_variable # noqa: F401 + REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",) From 624d9233e30d34c9e31042df114e1a3b2231ff60 Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 14:07:09 +0530 Subject: [PATCH 07/13] feat: imports restored --- .../cloud/datafusion/example_datafusion.py | 22 +------------------ .../google/cloud/hooks/test_datafusion.py | 2 +- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py index 68de9849ad47b..6133093a2e1e3 100644 --- a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py +++ b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py @@ -311,25 +311,7 @@ def get_artifacts_versions(ti=None): pipeline_name=PIPELINE_NAME, instance_name=INSTANCE_NAME, task_id="stop_pipeline", - ) - # [END howto_cloud_data_fusion_stop_pipeline] - - # [START howto_cloud_data_fusion_start_pipeline] - start_pipeline_frunid = CloudDataFusionStartPipelineOperator( - location=LOCATION, - pipeline_name=PIPELINE_NAME, - instance_name=INSTANCE_NAME, - pipeline_timeout=1000, - task_id="start_pipeline_for_runid", - ) - - # [START howto_cloud_data_fusion_stop_pipeline_with_runid] - stop_pipeline_wrunid = CloudDataFusionStopPipelineOperator( - location=LOCATION, - pipeline_name=PIPELINE_NAME, - instance_name=INSTANCE_NAME, - task_id="stop_pipeline_wrun_id", - run_id=start_pipeline_frunid.output, + run_id=start_pipeline.output, ) # [END howto_cloud_data_fusion_stop_pipeline] @@ -370,8 +352,6 @@ def get_artifacts_versions(ti=None): >> update_instance >> create_pipeline >> list_pipelines - >> start_pipeline_frunid - >> stop_pipeline_wrunid >> start_pipeline_def >> start_pipeline_def_sensor >> start_pipeline_async diff --git a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py index 2998912b0cbdc..87d9f2d7b4665 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py @@ -508,7 +508,7 @@ def test_stop_pipeline_should_fail_if_empty_data_response_with_run_id(self, mock ) mock_request.assert_called_once_with( url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/" - f"spark/DataStreamsSparkStreaming//runs/eaf-2fr-4rf/stop", + f"spark/DataStreamsSparkStreaming/runs/eaf-2fr-4rf/stop", method="POST", ) From ea3342a53e66a279f42ec6099fffa2a5fb6bb9ca Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 14:36:26 +0530 Subject: [PATCH 08/13] feat: typo corrected --- .../src/airflow/providers/google/cloud/hooks/datafusion.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index 38a73faf423b9..f5182b51133fc 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -539,11 +539,11 @@ def stop_pipeline( :param pipeline_name: Your pipeline name. :param instance_url: Endpoint on which the REST APIs is accessible for the instance. - :param namespace: f your pipeline belongs to a Basic edition instance, the namespace ID + :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. :pipeline_type: Can be either BATCH or STREAM - :param run_id : The specific runId to stop execution if available , when absent It will stop all Runs under App: pipeline_name + :param run_id : The specific `runId` to stop execution if available , when absent It will stop all Runs under App: pipeline_name """ base_stop_url = os.path.join( self._base_url(instance_url, namespace), From 2ef375102abf069516e8130aca3b649dff5e5530 Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 14:38:19 +0530 Subject: [PATCH 09/13] feat: typo corrected --- .../src/airflow/providers/google/cloud/hooks/datafusion.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index f5182b51133fc..c6002a2513bbc 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -539,9 +539,7 @@ def stop_pipeline( :param pipeline_name: Your pipeline name. :param instance_url: Endpoint on which the REST APIs is accessible for the instance. - :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID - is always default. If your pipeline belongs to an Enterprise edition instance, you - can create a namespace. + :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. :pipeline_type: Can be either BATCH or STREAM :param run_id : The specific `runId` to stop execution if available , when absent It will stop all Runs under App: pipeline_name """ From 37f1884dfaa369aef9cbfdff1b045aee11d37b31 Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 15:05:18 +0530 Subject: [PATCH 10/13] feat: documentation error fix --- .../airflow/providers/google/cloud/hooks/datafusion.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index c6002a2513bbc..f88655816ad72 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -539,9 +539,11 @@ def stop_pipeline( :param pipeline_name: Your pipeline name. :param instance_url: Endpoint on which the REST APIs is accessible for the instance. - :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. - :pipeline_type: Can be either BATCH or STREAM - :param run_id : The specific `runId` to stop execution if available , when absent It will stop all Runs under App: pipeline_name + :pipeline_type: Can be either BATCH or STREAM. + :param run_id : The specific run_id to stop execution if available, when absent It will stop all runs under pipeline_name. + :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID + is always default. If your pipeline belongs to an Enterprise edition instance, you + can create a namespace. """ base_stop_url = os.path.join( self._base_url(instance_url, namespace), From dc966123025165a43ac9b5f80f0660524299ed0f Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 15:46:00 +0530 Subject: [PATCH 11/13] feat: documentation error fix --- .../src/airflow/providers/google/cloud/hooks/datafusion.py | 2 +- .../src/airflow/providers/google/cloud/operators/datafusion.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index f88655816ad72..de9b666331999 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -539,7 +539,7 @@ def stop_pipeline( :param pipeline_name: Your pipeline name. :param instance_url: Endpoint on which the REST APIs is accessible for the instance. - :pipeline_type: Can be either BATCH or STREAM. + :param pipeline_type: Can be either BATCH or STREAM. :param run_id : The specific run_id to stop execution if available, when absent It will stop all runs under pipeline_name. :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you diff --git a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py index 2a5b0de0da2fd..d88a03904e084 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py @@ -909,6 +909,7 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator): can create a namespace. :param api_version: The version of the api that will be requested for example 'v3'. :param gcp_conn_id: The connection ID to use when fetching connection info. + :param run_id: To stop a particular run tagged with provided run_id. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -917,7 +918,6 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). - :param run_id: To stop a particular run tagged with provided run_id """ template_fields: Sequence[str] = ("instance_name", "pipeline_name", "impersonation_chain", "run_id") From a8a53daaac25a24d09b9e40f35ea702adca349fa Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Sun, 1 Feb 2026 16:19:44 +0530 Subject: [PATCH 12/13] fixed the documentation errors --- .../src/airflow/providers/google/cloud/hooks/datafusion.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index de9b666331999..857adf43ad49e 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -540,8 +540,8 @@ def stop_pipeline( :param pipeline_name: Your pipeline name. :param instance_url: Endpoint on which the REST APIs is accessible for the instance. :param pipeline_type: Can be either BATCH or STREAM. - :param run_id : The specific run_id to stop execution if available, when absent It will stop all runs under pipeline_name. - :param namespace: if your pipeline belongs to a Basic edition instance, the namespace ID + :param run_id: The specific run_id to stop execution if available; when absent it will stop all runs under pipeline_name. + :param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. """ From 3375d9cb05ab7e1c8c7b3df32d33a28f78cdb4b0 Mon Sep 17 00:00:00 2001 From: Chirodip Lodh Choudhury Date: Mon, 2 Feb 2026 00:04:33 +0530 Subject: [PATCH 13/13] feat: addressed review comments --- .../google/cloud/hooks/datafusion.py | 5 ++-- .../google/cloud/operators/datafusion.py | 4 +-- .../cloud/datafusion/example_datafusion.py | 26 +++++-------------- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py index 857adf43ad49e..ff2f4e42c0ea5 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py @@ -539,11 +539,11 @@ def stop_pipeline( :param pipeline_name: Your pipeline name. :param instance_url: Endpoint on which the REST APIs is accessible for the instance. - :param pipeline_type: Can be either BATCH or STREAM. - :param run_id: The specific run_id to stop execution if available; when absent it will stop all runs under pipeline_name. :param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. + :param pipeline_type: Can be either BATCH or STREAM. + :param run_id: The specific run_id to stop execution if available; when absent it will stop all runs under pipeline_name. """ base_stop_url = os.path.join( self._base_url(instance_url, namespace), @@ -554,7 +554,6 @@ def stop_pipeline( if run_id: url = os.path.join(base_stop_url, "runs", quote(str(run_id)), "stop") - else: url = os.path.join(base_stop_url, "stop") diff --git a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py index d88a03904e084..8ce2daae579e5 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/datafusion.py @@ -901,15 +901,14 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator): :ref:`howto/operator:CloudDataFusionStopPipelineOperator` :param pipeline_name: Your pipeline name. - :param pipeline_type: Nature of Pipeline, by default BATCH (workflows). :param instance_name: The name of the instance. + :param pipeline_type: Can be either BATCH or STREAM. :param location: The Cloud Data Fusion location in which to handle the request. :param namespace: If your pipeline belongs to a Basic edition instance, the namespace ID is always default. If your pipeline belongs to an Enterprise edition instance, you can create a namespace. :param api_version: The version of the api that will be requested for example 'v3'. :param gcp_conn_id: The connection ID to use when fetching connection info. - :param run_id: To stop a particular run tagged with provided run_id. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -918,6 +917,7 @@ class CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param run_id: The specific run_id to stop execution if available; when absent it will stop all runs under pipeline_name. """ template_fields: Sequence[str] = ("instance_name", "pipeline_name", "impersonation_chain", "run_id") diff --git a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py index 6133093a2e1e3..577a20728e55d 100644 --- a/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py +++ b/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py @@ -61,19 +61,19 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") LOCATION = "europe-north1" DAG_ID = "datafusion" -INSTANCE_NAME = f"test-instance-{DAG_ID}".replace("_", "-") +INSTANCE_NAME = f"df-{ENV_ID}".replace("_", "-") INSTANCE = { "type": "BASIC", "displayName": INSTANCE_NAME, "dataprocServiceAccount": SERVICE_ACCOUNT, } -BUCKET_NAME_1 = f"bucket1-{DAG_ID}".replace("_", "-") -BUCKET_NAME_2 = f"bucket2-{DAG_ID}".replace("_", "-") +BUCKET_NAME_1 = f"bucket1-{DAG_ID}-{ENV_ID}".replace("_", "-") +BUCKET_NAME_2 = f"bucket2-{DAG_ID}-{ENV_ID}".replace("_", "-") BUCKET_NAME_1_URI = f"gs://{BUCKET_NAME_1}" BUCKET_NAME_2_URI = f"gs://{BUCKET_NAME_2}" -PIPELINE_NAME = f"pipe-{DAG_ID}-test".replace("_", "-") +PIPELINE_NAME = f"pipe-{ENV_ID}".replace("_", "-") PIPELINE = { "artifact": { "name": "cdap-data-pipeline", @@ -83,8 +83,8 @@ "description": "Data Pipeline Application", "name": PIPELINE_NAME, "config": { - "resources": {"memoryMB": 1048, "virtualCores": 1}, - "driverResources": {"memoryMB": 1048, "virtualCores": 1}, + "resources": {"memoryMB": 2048, "virtualCores": 1}, + "driverResources": {"memoryMB": 2048, "virtualCores": 1}, "connections": [{"from": "GCS", "to": "GCS2"}], "comments": [], "postActions": [], @@ -271,18 +271,6 @@ def get_artifacts_versions(ti=None): ) # [END howto_cloud_data_fusion_start_pipeline_def] - # [START howto_cloud_data_fusion_start_pipeline_def_sensor] - start_pipeline_def_sensor = CloudDataFusionPipelineStateSensor( - task_id="pipeline_state_sensor_def", - pipeline_name=PIPELINE_NAME, - pipeline_id=start_pipeline_def.output, - expected_statuses=["COMPLETED"], - failure_statuses=["FAILED"], - instance_name=INSTANCE_NAME, - location=LOCATION, - ) - # [END howto_cloud_data_fusion_start_pipeline_def_sensor] - # [START howto_cloud_data_fusion_start_pipeline_async] start_pipeline_async = CloudDataFusionStartPipelineOperator( location=LOCATION, @@ -311,7 +299,6 @@ def get_artifacts_versions(ti=None): pipeline_name=PIPELINE_NAME, instance_name=INSTANCE_NAME, task_id="stop_pipeline", - run_id=start_pipeline.output, ) # [END howto_cloud_data_fusion_stop_pipeline] @@ -353,7 +340,6 @@ def get_artifacts_versions(ti=None): >> create_pipeline >> list_pipelines >> start_pipeline_def - >> start_pipeline_def_sensor >> start_pipeline_async >> start_pipeline_sensor >> start_pipeline