From 30e4cebc737658a95cbee4676ac56c03c003b2bc Mon Sep 17 00:00:00 2001 From: Alexis BRENON Date: Fri, 25 Apr 2025 10:19:52 +0200 Subject: [PATCH 1/2] fix: return the list of copied files even in deferrable mode --- .../airflow/providers/google/cloud/transfers/s3_to_gcs.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py index ba11be3bd5b9c..10fddbc8558f2 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Iterable, Sequence from datetime import datetime, timezone from tempfile import NamedTemporaryFile from typing import TYPE_CHECKING, Any @@ -281,6 +281,7 @@ def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H poll_interval=self.poll_interval, ), method_name="execute_complete", + kwargs=dict(files=files), ) def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> list[str]: @@ -335,7 +336,7 @@ def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H return job_names - def execute_complete(self, context: Context, event: dict[str, Any]) -> None: + def execute_complete(self, context: Context, event: dict[str, Any], files: Iterable[str]) -> list[str]: """ Return immediately and relies on trigger to throw a success event. Callback for the trigger. @@ -345,6 +346,7 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None: if event["status"] == "error": raise AirflowException(event["message"]) self.log.info("%s completed with response %s ", self.task_id, event["message"]) + return list(files) def get_transfer_hook(self): return CloudDataTransferServiceHook( From 30c74c742d089e52bf28ada0d92cd4f93533c030 Mon Sep 17 00:00:00 2001 From: Alexis BRENON Date: Fri, 11 Jul 2025 12:08:21 +0200 Subject: [PATCH 2/2] keep compatibility --- .../providers/google/cloud/transfers/s3_to_gcs.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py index 10fddbc8558f2..38bc35acfed8c 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/s3_to_gcs.py @@ -20,7 +20,7 @@ from collections.abc import Iterable, Sequence from datetime import datetime, timezone from tempfile import NamedTemporaryFile -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, overload from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -336,7 +336,15 @@ def submit_transfer_jobs(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3H return job_names - def execute_complete(self, context: Context, event: dict[str, Any], files: Iterable[str]) -> list[str]: + @overload + def execute_complete(self, context: Context, event: dict[str, Any], files: None) -> None: ... + @overload + def execute_complete( + self, context: Context, event: dict[str, Any], files: Iterable[str] + ) -> list[str]: ... + def execute_complete( + self, context: Context, event: dict[str, Any], files: Iterable[str] | None = None + ) -> list[str] | None: """ Return immediately and relies on trigger to throw a success event. Callback for the trigger. @@ -346,7 +354,7 @@ def execute_complete(self, context: Context, event: dict[str, Any], files: Itera if event["status"] == "error": raise AirflowException(event["message"]) self.log.info("%s completed with response %s ", self.task_id, event["message"]) - return list(files) + return None if files is None else list(files) def get_transfer_hook(self): return CloudDataTransferServiceHook(