diff --git a/providers/google/src/airflow/providers/google/marketing_platform/operators/display_video.py b/providers/google/src/airflow/providers/google/marketing_platform/operators/display_video.py index f351fd90f6df9..46bdb6021d390 100644 --- a/providers/google/src/airflow/providers/google/marketing_platform/operators/display_video.py +++ b/providers/google/src/airflow/providers/google/marketing_platform/operators/display_video.py @@ -126,6 +126,8 @@ class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param unwrap_single: If True (default), returns a single URI string when there's only one file. + If False, always returns a list of URIs. Default will change to False in a future release. """ template_fields: Sequence[str] = ( @@ -145,6 +147,7 @@ def __init__( api_version: str = "v4", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + unwrap_single: bool | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -155,8 +158,20 @@ def __init__( self.api_version = api_version self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain - - def execute(self, context: Context) -> str: + if unwrap_single is None: + self.unwrap_single = True + import warnings + + warnings.warn( + "The default value of unwrap_single will change from True to False in a future release. " + "Please set unwrap_single explicitly to avoid this warning.", + FutureWarning, + stacklevel=2, + ) + else: + self.unwrap_single = unwrap_single + + def execute(self, context: Context) -> str | list[str]: hook = GoogleDisplayVideo360Hook( gcp_conn_id=self.gcp_conn_id, api_version=self.api_version, @@ -194,4 +209,8 @@ def execute(self, context: Context) -> str: filename=os.path.join(tmp_dir, fname), gzip=False, ) - return f"{self.bucket_name}/{self.object_name}" + result = [f"gs://{self.bucket_name}/{self.object_name}"] + + if self.unwrap_single: + return result[0] + return result diff --git a/providers/google/tests/unit/google/marketing_platform/operators/test_display_video.py b/providers/google/tests/unit/google/marketing_platform/operators/test_display_video.py index 0612e8b22f234..28dab42c7b693 100644 --- a/providers/google/tests/unit/google/marketing_platform/operators/test_display_video.py +++ b/providers/google/tests/unit/google/marketing_platform/operators/test_display_video.py @@ -79,6 +79,7 @@ def test_execute(self, mock_open, mock_hook, gcs_hook_mock, temp_dir_mock, os_mo gcp_conn_id=GCP_CONN_ID, task_id="test_task", impersonation_chain=IMPERSONATION_CHAIN, + unwrap_single=True, ) result = op.execute(context=None) @@ -106,7 +107,78 @@ def test_execute(self, mock_open, mock_hook, gcs_hook_mock, temp_dir_mock, os_mo gzip=False, ) - assert result == f"{BUCKET_NAME}/{OBJECT_NAME}" + assert result == f"gs://{BUCKET_NAME}/{OBJECT_NAME}" + + @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.zipfile") + @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.os") + @mock.patch( + "airflow.providers.google.marketing_platform.operators.display_video.tempfile.TemporaryDirectory" + ) + @mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook") + @mock.patch( + "airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook" + ) + @mock.patch( + "airflow.providers.google.marketing_platform.operators.display_video.open", + new_callable=mock.mock_open, + ) + def test_execute_with_unwrap_single_false( + self, mock_open, mock_hook, gcs_hook_mock, temp_dir_mock, os_mock, zipfile_mock + ): + operation = {"response": {"resourceName": RESOURCE_NAME}} + media = mock.Mock() + + mock_hook.return_value.get_sdf_download_operation.return_value = operation + mock_hook.return_value.download_media.return_value = media + + tmp_dir = "/tmp/mock_dir" + temp_dir_mock.return_value.__enter__.return_value = tmp_dir + + # Mock os behavior + os_mock.path.join.side_effect = lambda *args: "/".join(args) + os_mock.listdir.return_value = [FILENAME] + + # Mock zipfile behavior + zipfile_mock.ZipFile.return_value.__enter__.return_value.extractall.return_value = None + + op = GoogleDisplayVideo360SDFtoGCSOperator( + operation_name=OPERATION_NAME, + bucket_name=BUCKET_NAME, + object_name=OBJECT_NAME, + gzip=False, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + task_id="test_task", + impersonation_chain=IMPERSONATION_CHAIN, + unwrap_single=False, + ) + + result = op.execute(context=None) + + # Assertions + mock_hook.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + mock_hook.return_value.get_sdf_download_operation.assert_called_once_with( + operation_name=OPERATION_NAME + ) + mock_hook.return_value.download_media.assert_called_once_with(resource_name=RESOURCE_NAME) + mock_hook.return_value.download_content_from_request.assert_called_once() + + gcs_hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + gcs_hook_mock.return_value.upload.assert_called_once_with( + bucket_name=BUCKET_NAME, + object_name=OBJECT_NAME, + filename=f"{tmp_dir}/{FILENAME}", + gzip=False, + ) + + assert result == [f"gs://{BUCKET_NAME}/{OBJECT_NAME}"] class TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator: