Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param unwrap_single: If True (default), returns a single URI string when there's only one file.
If False, always returns a list of URIs. Default will change to False in a future release.
"""

template_fields: Sequence[str] = (
Expand All @@ -145,6 +147,7 @@ def __init__(
api_version: str = "v4",
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
unwrap_single: bool | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -155,8 +158,20 @@ def __init__(
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain

def execute(self, context: Context) -> str:
if unwrap_single is None:
self.unwrap_single = True
import warnings

warnings.warn(
"The default value of unwrap_single will change from True to False in a future release. "
"Please set unwrap_single explicitly to avoid this warning.",
FutureWarning,
stacklevel=2,
)
else:
self.unwrap_single = unwrap_single

def execute(self, context: Context) -> str | list[str]:
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
Expand Down Expand Up @@ -194,4 +209,8 @@ def execute(self, context: Context) -> str:
filename=os.path.join(tmp_dir, fname),
gzip=False,
)
return f"{self.bucket_name}/{self.object_name}"
result = [f"gs://{self.bucket_name}/{self.object_name}"]

if self.unwrap_single:
return result[0]
return result
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def test_execute(self, mock_open, mock_hook, gcs_hook_mock, temp_dir_mock, os_mo
gcp_conn_id=GCP_CONN_ID,
task_id="test_task",
impersonation_chain=IMPERSONATION_CHAIN,
unwrap_single=True,
)

result = op.execute(context=None)
Expand Down Expand Up @@ -106,7 +107,78 @@ def test_execute(self, mock_open, mock_hook, gcs_hook_mock, temp_dir_mock, os_mo
gzip=False,
)

assert result == f"{BUCKET_NAME}/{OBJECT_NAME}"
assert result == f"gs://{BUCKET_NAME}/{OBJECT_NAME}"

@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.zipfile")
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.os")
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.tempfile.TemporaryDirectory"
)
@mock.patch("airflow.providers.google.marketing_platform.operators.display_video.GCSHook")
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators.display_video.open",
new_callable=mock.mock_open,
)
def test_execute_with_unwrap_single_false(
self, mock_open, mock_hook, gcs_hook_mock, temp_dir_mock, os_mock, zipfile_mock
):
operation = {"response": {"resourceName": RESOURCE_NAME}}
media = mock.Mock()

mock_hook.return_value.get_sdf_download_operation.return_value = operation
mock_hook.return_value.download_media.return_value = media

tmp_dir = "/tmp/mock_dir"
temp_dir_mock.return_value.__enter__.return_value = tmp_dir

# Mock os behavior
os_mock.path.join.side_effect = lambda *args: "/".join(args)
os_mock.listdir.return_value = [FILENAME]

# Mock zipfile behavior
zipfile_mock.ZipFile.return_value.__enter__.return_value.extractall.return_value = None

op = GoogleDisplayVideo360SDFtoGCSOperator(
operation_name=OPERATION_NAME,
bucket_name=BUCKET_NAME,
object_name=OBJECT_NAME,
gzip=False,
api_version=API_VERSION,
gcp_conn_id=GCP_CONN_ID,
task_id="test_task",
impersonation_chain=IMPERSONATION_CHAIN,
unwrap_single=False,
)

result = op.execute(context=None)

# Assertions
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
api_version=API_VERSION,
impersonation_chain=IMPERSONATION_CHAIN,
)
mock_hook.return_value.get_sdf_download_operation.assert_called_once_with(
operation_name=OPERATION_NAME
)
mock_hook.return_value.download_media.assert_called_once_with(resource_name=RESOURCE_NAME)
mock_hook.return_value.download_content_from_request.assert_called_once()

gcs_hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
gcs_hook_mock.return_value.upload.assert_called_once_with(
bucket_name=BUCKET_NAME,
object_name=OBJECT_NAME,
filename=f"{tmp_dir}/{FILENAME}",
gzip=False,
)

assert result == [f"gs://{BUCKET_NAME}/{OBJECT_NAME}"]


class TestGoogleDisplayVideo360CreateSDFDownloadTaskOperator:
Expand Down