From 23906bce891490788ab602852dcec357e83e057c Mon Sep 17 00:00:00 2001 From: Srabasti Date: Sat, 31 Jan 2026 23:20:53 -0800 Subject: [PATCH 1/9] Adding changes for http_to_gcs operator --- .../google/cloud/transfers/http_to_gcs.py | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py index d203b6187c1cb..6ce0452e28502 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py @@ -69,6 +69,8 @@ class HttpToGCSOperator(BaseOperator): the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. + :param unwrap_single: If True (default), returns a single URI string when there's only one file. + If False, always returns a list of URIs. Default will change to False in a future release. :param bucket_name: The bucket to upload to. :param object_name: The object name to set when uploading the file. :param mime_type: The file mime type set when uploading the file. @@ -112,6 +114,7 @@ def __init__( tcp_keep_alive_interval: int = 30, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + unwrap_single: bool | None = None, bucket_name: str, object_name: str, mime_type: str | None = None, @@ -140,6 +143,18 @@ def __init__( self.tcp_keep_alive_interval = tcp_keep_alive_interval self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain + if unwrap_single is None: + self.unwrap_single = True + import warnings + + warnings.warn( + "The default value of unwrap_single will change from True to False in a future release." + "Please set unwrap_single explicitly to avoid this warning.", + FutureWarning, + stacklevel=2, + ) + else: + self.unwrap_single = unwrap_single self.bucket_name = bucket_name self.object_name = object_name self.mime_type = mime_type @@ -170,7 +185,7 @@ def gcs_hook(self) -> GCSHook: """Create and return an GCSHook.""" return GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - def execute(self, context: Context): + def execute(self, context: Context) -> str | list[str]: self.log.info("Calling HTTP method") response = self.http_hook.run( endpoint=self.endpoint, data=self.data, headers=self.headers, extra_options=self.extra_options @@ -191,3 +206,9 @@ def execute(self, context: Context): cache_control=self.cache_control, user_project=self.user_project, ) + + result = [f"gs://{self.bucket_name}/{self.object_name}"] + + if self.unwrap_single: + return result[0] + return result From f21c7ba98b6dc378dee7a61157dd6f4b39b6d50b Mon Sep 17 00:00:00 2001 From: Srabasti Date: Sun, 1 Feb 2026 13:46:14 -0800 Subject: [PATCH 2/9] Adding changes for http_to_gcs operator --- .../airflow/providers/google/cloud/transfers/http_to_gcs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py index 6ce0452e28502..7dc01e3179d02 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py @@ -70,7 +70,7 @@ class HttpToGCSOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. :param unwrap_single: If True (default), returns a single URI string when there's only one file. - If False, always returns a list of URIs. Default will change to False in a future release. + If False, always returns a list of URIs. Default will change to False in a future release. :param bucket_name: The bucket to upload to. :param object_name: The object name to set when uploading the file. :param mime_type: The file mime type set when uploading the file. @@ -185,7 +185,7 @@ def gcs_hook(self) -> GCSHook: """Create and return an GCSHook.""" return GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - def execute(self, context: Context) -> str | list[str]: + def execute(self, context: Context) -> str | list[str]: self.log.info("Calling HTTP method") response = self.http_hook.run( endpoint=self.endpoint, data=self.data, headers=self.headers, extra_options=self.extra_options From b2adcb46b6fdd2ed532b9dc228bc0f0b840e048d Mon Sep 17 00:00:00 2001 From: Srabasti Date: Mon, 2 Feb 2026 21:42:07 -0800 Subject: [PATCH 3/9] Adding tests --- .../cloud/transfers/test_http_to_gcs.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py index 9fe9c6b25cc45..7677a9a1c50a0 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py @@ -67,6 +67,56 @@ def test_execute_copy_single_file(self, http_hook, gcs_hook): bucket_name=TEST_BUCKET, gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, + unwrap_single=True, + ) + task.execute(None) + + # GCS + gcs_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN) + task.gcs_hook.upload.assert_called_once_with( + bucket_name=TEST_BUCKET, + object_name=DESTINATION_PATH_FILE, + data=task.http_hook.run.return_value.content, + mime_type=None, + gzip=False, + encoding=task.http_hook.run.return_value.encoding, + chunk_size=None, + timeout=None, + num_max_attempts=NUM_MAX_ATTEMPTS, + metadata=None, + cache_control=None, + user_project=None, + ) + + # HTTP + http_hook.assert_called_once_with( + DEFAULT_HTTP_METHOD, + http_conn_id=HTTP_CONN_ID, + auth_type=None, + tcp_keep_alive=True, + tcp_keep_alive_idle=TCP_KEEP_ALIVE_IDLE, + tcp_keep_alive_count=TCP_KEEP_ALIVE_COUNT, + tcp_keep_alive_interval=TCP_KEEP_ALIVE_INTERVAL, + ) + task.http_hook.run.assert_called_once_with( + endpoint=ENDPOINT, headers=HEADERS, data=DATA, extra_options=EXTRA_OPTIONS + ) + + @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.HttpHook") + def test_execute_with_single_file_unwrap_false(self, http_hook, gcs_hook): + task = HttpToGCSOperator( + task_id="http_to_gcs_operator", + http_conn_id=HTTP_CONN_ID, + endpoint=ENDPOINT, + headers=HEADERS, + data=DATA, + extra_options=EXTRA_OPTIONS, + object_name=DESTINATION_PATH_FILE, + bucket_name=TEST_BUCKET, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + unwrap_single=False, ) task.execute(None) From 6d790eb479ffd5330976bd6cf44ac47a9279a6d4 Mon Sep 17 00:00:00 2001 From: Srabasti Date: Wed, 11 Feb 2026 21:09:53 -0800 Subject: [PATCH 4/9] Changes per review comments --- .../google/cloud/transfers/http_to_gcs.py | 24 +++---------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py index 7dc01e3179d02..39748d6815087 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/http_to_gcs.py @@ -69,8 +69,6 @@ class HttpToGCSOperator(BaseOperator): the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. - :param unwrap_single: If True (default), returns a single URI string when there's only one file. - If False, always returns a list of URIs. Default will change to False in a future release. :param bucket_name: The bucket to upload to. :param object_name: The object name to set when uploading the file. :param mime_type: The file mime type set when uploading the file. @@ -114,7 +112,6 @@ def __init__( tcp_keep_alive_interval: int = 30, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, - unwrap_single: bool | None = None, bucket_name: str, object_name: str, mime_type: str | None = None, @@ -143,18 +140,6 @@ def __init__( self.tcp_keep_alive_interval = tcp_keep_alive_interval self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain - if unwrap_single is None: - self.unwrap_single = True - import warnings - - warnings.warn( - "The default value of unwrap_single will change from True to False in a future release." - "Please set unwrap_single explicitly to avoid this warning.", - FutureWarning, - stacklevel=2, - ) - else: - self.unwrap_single = unwrap_single self.bucket_name = bucket_name self.object_name = object_name self.mime_type = mime_type @@ -185,7 +170,8 @@ def gcs_hook(self) -> GCSHook: """Create and return an GCSHook.""" return GCSHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - def execute(self, context: Context) -> str | list[str]: + def execute(self, context: Context) -> list[str]: + """Return List of destination URIs (gs://bucket_name/object_name) for uploaded file.""" self.log.info("Calling HTTP method") response = self.http_hook.run( endpoint=self.endpoint, data=self.data, headers=self.headers, extra_options=self.extra_options @@ -207,8 +193,4 @@ def execute(self, context: Context) -> str | list[str]: user_project=self.user_project, ) - result = [f"gs://{self.bucket_name}/{self.object_name}"] - - if self.unwrap_single: - return result[0] - return result + return [f"gs://{self.bucket_name}/{self.object_name}"] From b2c24bc351188a41bffe85280f181182319d619b Mon Sep 17 00:00:00 2001 From: Srabasti Date: Wed, 11 Feb 2026 22:21:47 -0800 Subject: [PATCH 5/9] Changes per review comments --- .../cloud/transfers/test_http_to_gcs.py | 81 ------------------- 1 file changed, 81 deletions(-) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py index 7677a9a1c50a0..e8fd9e52e064c 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py @@ -67,86 +67,5 @@ def test_execute_copy_single_file(self, http_hook, gcs_hook): bucket_name=TEST_BUCKET, gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, - unwrap_single=True, ) task.execute(None) - - # GCS - gcs_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN) - task.gcs_hook.upload.assert_called_once_with( - bucket_name=TEST_BUCKET, - object_name=DESTINATION_PATH_FILE, - data=task.http_hook.run.return_value.content, - mime_type=None, - gzip=False, - encoding=task.http_hook.run.return_value.encoding, - chunk_size=None, - timeout=None, - num_max_attempts=NUM_MAX_ATTEMPTS, - metadata=None, - cache_control=None, - user_project=None, - ) - - # HTTP - http_hook.assert_called_once_with( - DEFAULT_HTTP_METHOD, - http_conn_id=HTTP_CONN_ID, - auth_type=None, - tcp_keep_alive=True, - tcp_keep_alive_idle=TCP_KEEP_ALIVE_IDLE, - tcp_keep_alive_count=TCP_KEEP_ALIVE_COUNT, - tcp_keep_alive_interval=TCP_KEEP_ALIVE_INTERVAL, - ) - task.http_hook.run.assert_called_once_with( - endpoint=ENDPOINT, headers=HEADERS, data=DATA, extra_options=EXTRA_OPTIONS - ) - - @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.GCSHook") - @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.HttpHook") - def test_execute_with_single_file_unwrap_false(self, http_hook, gcs_hook): - task = HttpToGCSOperator( - task_id="http_to_gcs_operator", - http_conn_id=HTTP_CONN_ID, - endpoint=ENDPOINT, - headers=HEADERS, - data=DATA, - extra_options=EXTRA_OPTIONS, - object_name=DESTINATION_PATH_FILE, - bucket_name=TEST_BUCKET, - gcp_conn_id=GCP_CONN_ID, - impersonation_chain=IMPERSONATION_CHAIN, - unwrap_single=False, - ) - task.execute(None) - - # GCS - gcs_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN) - task.gcs_hook.upload.assert_called_once_with( - bucket_name=TEST_BUCKET, - object_name=DESTINATION_PATH_FILE, - data=task.http_hook.run.return_value.content, - mime_type=None, - gzip=False, - encoding=task.http_hook.run.return_value.encoding, - chunk_size=None, - timeout=None, - num_max_attempts=NUM_MAX_ATTEMPTS, - metadata=None, - cache_control=None, - user_project=None, - ) - - # HTTP - http_hook.assert_called_once_with( - DEFAULT_HTTP_METHOD, - http_conn_id=HTTP_CONN_ID, - auth_type=None, - tcp_keep_alive=True, - tcp_keep_alive_idle=TCP_KEEP_ALIVE_IDLE, - tcp_keep_alive_count=TCP_KEEP_ALIVE_COUNT, - tcp_keep_alive_interval=TCP_KEEP_ALIVE_INTERVAL, - ) - task.http_hook.run.assert_called_once_with( - endpoint=ENDPOINT, headers=HEADERS, data=DATA, extra_options=EXTRA_OPTIONS - ) From a7b3ee1a400526a498429d7f6b72d0bd118885fd Mon Sep 17 00:00:00 2001 From: Srabasti Date: Wed, 11 Feb 2026 23:29:44 -0800 Subject: [PATCH 6/9] Changes reverted for tests --- .../cloud/transfers/test_http_to_gcs.py | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py index e8fd9e52e064c..57bbb4ca5c0be 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py @@ -49,9 +49,6 @@ def test_init(self): bucket_name=TEST_BUCKET, ) assert operator.endpoint == ENDPOINT - assert operator.object_name == DESTINATION_PATH_FILE - assert operator.bucket_name == TEST_BUCKET - assert operator.http_conn_id == HTTP_CONN_ID @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.GCSHook") @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.HttpHook") @@ -69,3 +66,34 @@ def test_execute_copy_single_file(self, http_hook, gcs_hook): impersonation_chain=IMPERSONATION_CHAIN, ) task.execute(None) + + # GCS + gcs_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN) + task.gcs_hook.upload.assert_called_once_with( + bucket_name=TEST_BUCKET, + object_name=DESTINATION_PATH_FILE, + data=task.http_hook.run.return_value.content, + mime_type=None, + gzip=False, + encoding=task.http_hook.run.return_value.encoding, + chunk_size=None, + timeout=None, + num_max_attempts=NUM_MAX_ATTEMPTS, + metadata=None, + cache_control=None, + user_project=None, + ) + + # HTTP + http_hook.assert_called_once_with( + DEFAULT_HTTP_METHOD, + http_conn_id=HTTP_CONN_ID, + auth_type=None, + tcp_keep_alive=True, + tcp_keep_alive_idle=TCP_KEEP_ALIVE_IDLE, + tcp_keep_alive_count=TCP_KEEP_ALIVE_COUNT, + tcp_keep_alive_interval=TCP_KEEP_ALIVE_INTERVAL, + ) + task.http_hook.run.assert_called_once_with( + endpoint=ENDPOINT, headers=HEADERS, data=DATA, extra_options=EXTRA_OPTIONS + ) From b9dcf3422761866a97194a8e123c5bd7ab9ee844 Mon Sep 17 00:00:00 2001 From: Srabasti Date: Wed, 11 Feb 2026 23:35:45 -0800 Subject: [PATCH 7/9] Changes reverted for tests --- .../tests/unit/google/cloud/transfers/test_http_to_gcs.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py index 57bbb4ca5c0be..9fe9c6b25cc45 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py @@ -49,6 +49,9 @@ def test_init(self): bucket_name=TEST_BUCKET, ) assert operator.endpoint == ENDPOINT + assert operator.object_name == DESTINATION_PATH_FILE + assert operator.bucket_name == TEST_BUCKET + assert operator.http_conn_id == HTTP_CONN_ID @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.GCSHook") @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.HttpHook") From c4f69c08a9b5e5ac74e038a00c75a009d079d030 Mon Sep 17 00:00:00 2001 From: Srabasti Date: Sat, 14 Feb 2026 16:11:05 -0800 Subject: [PATCH 8/9] Adding tests for http_to_gcs operator --- .../cloud/transfers/test_http_to_gcs.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py index 9fe9c6b25cc45..a8f640d92b044 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py @@ -68,7 +68,7 @@ def test_execute_copy_single_file(self, http_hook, gcs_hook): gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, ) - task.execute(None) + result = task.execute(None) # GCS gcs_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN) @@ -100,3 +100,25 @@ def test_execute_copy_single_file(self, http_hook, gcs_hook): task.http_hook.run.assert_called_once_with( endpoint=ENDPOINT, headers=HEADERS, data=DATA, extra_options=EXTRA_OPTIONS ) + + # Return value: list of destination GCS URIs (per issue #11323 / PR #61306) + expected_uri = f"gs://{TEST_BUCKET}/{DESTINATION_PATH_FILE}" + assert result == [expected_uri] + + @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.HttpHook") + def test_execute_returns_destination_uris(self, http_hook, gcs_hook): + """Test that execute() returns a list of destination GCS URIs (gs://bucket/object).""" + bucket_name = TEST_BUCKET + object_name = DESTINATION_PATH_FILE + task = HttpToGCSOperator( + task_id="http_to_gcs_operator", + http_conn_id=HTTP_CONN_ID, + endpoint=ENDPOINT, + object_name=DESTINATION_PATH_FILE, + bucket_name=TEST_BUCKET, + ) + result = task.execute(None) + + expected_uris = f"gs://{TEST_BUCKET}/{DESTINATION_PATH_FILE}" + assert result == [expected_uris] From 1cc96fe34092aa42220fce54bff9b2450ffa52ee Mon Sep 17 00:00:00 2001 From: Srabasti Date: Sat, 14 Feb 2026 16:37:33 -0800 Subject: [PATCH 9/9] Adding tests --- .../tests/unit/google/cloud/transfers/test_http_to_gcs.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py index a8f640d92b044..6976afe6ef669 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_http_to_gcs.py @@ -109,14 +109,17 @@ def test_execute_copy_single_file(self, http_hook, gcs_hook): @mock.patch("airflow.providers.google.cloud.transfers.http_to_gcs.HttpHook") def test_execute_returns_destination_uris(self, http_hook, gcs_hook): """Test that execute() returns a list of destination GCS URIs (gs://bucket/object).""" - bucket_name = TEST_BUCKET - object_name = DESTINATION_PATH_FILE task = HttpToGCSOperator( task_id="http_to_gcs_operator", http_conn_id=HTTP_CONN_ID, endpoint=ENDPOINT, + headers=HEADERS, + data=DATA, + extra_options=EXTRA_OPTIONS, object_name=DESTINATION_PATH_FILE, bucket_name=TEST_BUCKET, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, ) result = task.execute(None)