From cf594fc639bbcb15903afc170644cc36ed96431e Mon Sep 17 00:00:00 2001 From: Emery Finkelstein Date: Wed, 6 Mar 2024 09:39:51 +1100 Subject: [PATCH 1/2] Restore delegate_to for Google Transfer Operators retrieving from Google Cloud. --- .../providers/google/suite/transfers/gcs_to_gdrive.py | 6 ++++++ .../providers/google/suite/transfers/gcs_to_sheets.py | 5 ++++- .../google/suite/transfers/test_gcs_to_gdrive.py | 10 ++++++++++ .../google/suite/transfers/test_gcs_to_sheets.py | 3 +++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py index 09105320a5895..e9df2d4a065f0 100644 --- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py +++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py @@ -79,6 +79,9 @@ class GCSToGoogleDriveOperator(BaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param delegate_to: (Optional) The account to impersonate using domain-wide delegation + of authority, if any. For this to work, the service account making the + request must have domain-wide delegation enabled. This only applies to the Google Drive connection. """ template_fields: Sequence[str] = ( @@ -99,6 +102,7 @@ def __init__( move_object: bool = False, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + delegate_to: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -110,6 +114,7 @@ def __init__( self.move_object = move_object self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain + self.delegate_to = delegate_to self.gcs_hook: GCSHook | None = None self.gdrive_hook: GoogleDriveHook | None = None @@ -121,6 +126,7 @@ def execute(self, context: Context): self.gdrive_hook = GoogleDriveHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + delegate_to=self.delegate_to, ) if WILDCARD in self.source_object: diff --git a/airflow/providers/google/suite/transfers/gcs_to_sheets.py b/airflow/providers/google/suite/transfers/gcs_to_sheets.py index dc0e9f3933770..0a49ad4144167 100644 --- a/airflow/providers/google/suite/transfers/gcs_to_sheets.py +++ b/airflow/providers/google/suite/transfers/gcs_to_sheets.py @@ -40,7 +40,7 @@ class GCSToGoogleSheetsOperator(BaseOperator): :param gcp_conn_id: The connection ID to use when fetching connection info. :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have - domain-wide delegation enabled. + domain-wide delegation enabled. This only applies to the Google Sheet Connection :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. @@ -68,6 +68,7 @@ def __init__( spreadsheet_range: str = "Sheet1", gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + delegate_to: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) @@ -78,11 +79,13 @@ def __init__( self.bucket_name = bucket_name self.object_name = object_name self.impersonation_chain = impersonation_chain + self.delegate_to = delegate_to def execute(self, context: Any) -> None: sheet_hook = GSheetsHook( gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + delegate_to=self.delegate_to, ) gcs_hook = GCSHook( gcp_conn_id=self.gcp_conn_id, diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py index 8e97be5b0e003..3e81e166bda58 100644 --- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py +++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py @@ -26,6 +26,7 @@ MODULE = "airflow.providers.google.suite.transfers.gcs_to_gdrive" IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"] +DELEGATE_TO = "test_account@xxx.zzz" class TestGcsToGDriveOperator: @@ -41,6 +42,8 @@ def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, m source_bucket="data", source_object="sales/sales-2017/january.avro", destination_object="copied_sales/2017/january-backup.avro", + impersonation_chain=None, + delegate_to=DELEGATE_TO, ) task.execute(mock.MagicMock()) @@ -60,6 +63,7 @@ def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, m mock_gdrive.assert_has_calls( [ mock.call( + delegate_to=DELEGATE_TO, gcp_conn_id="google_cloud_default", impersonation_chain=None, ), @@ -84,6 +88,7 @@ def test_should_copy_single_file_with_folder(self, mock_named_temporary_file, mo source_object="sales/sales-2017/january.avro", destination_object="copied_sales/2017/january-backup.avro", destination_folder_id="aAopls6bE4tUllZVGJvRUU", + delegate_to=DELEGATE_TO, ) task.execute(mock.MagicMock()) @@ -104,6 +109,7 @@ def test_should_copy_single_file_with_folder(self, mock_named_temporary_file, mo [ mock.call( gcp_conn_id="google_cloud_default", + delegate_to=DELEGATE_TO, impersonation_chain=None, ), mock.call().upload_file( @@ -130,6 +136,7 @@ def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gc source_object="sales/sales-2017/*.avro", destination_object="copied_sales/2017/", impersonation_chain=IMPERSONATION_CHAIN, + delegate_to=DELEGATE_TO, ) task.execute(mock.MagicMock()) @@ -152,6 +159,7 @@ def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gc mock_gdrive.assert_has_calls( [ mock.call( + delegate_to=DELEGATE_TO, gcp_conn_id="google_cloud_default", impersonation_chain=IMPERSONATION_CHAIN, ), @@ -181,6 +189,7 @@ def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gc source_object="sales/sales-2017/*.avro", move_object=True, impersonation_chain=IMPERSONATION_CHAIN, + delegate_to=DELEGATE_TO, ) task.execute(mock.MagicMock()) @@ -206,6 +215,7 @@ def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gc mock_gdrive.assert_has_calls( [ mock.call( + delegate_to=DELEGATE_TO, gcp_conn_id="google_cloud_default", impersonation_chain=IMPERSONATION_CHAIN, ), diff --git a/tests/providers/google/suite/transfers/test_gcs_to_sheets.py b/tests/providers/google/suite/transfers/test_gcs_to_sheets.py index 30f48d5137372..0e624284e5e10 100644 --- a/tests/providers/google/suite/transfers/test_gcs_to_sheets.py +++ b/tests/providers/google/suite/transfers/test_gcs_to_sheets.py @@ -26,6 +26,7 @@ VALUES = [[1, 2, 3]] BUCKET = "destination_bucket" PATH = "path/to/reports" +DELEGATE_TO = "test_account@xxx.zzz" class TestGCSToGoogleSheets: @@ -47,11 +48,13 @@ def test_execute(self, mock_reader, mock_tempfile, mock_sheet_hook, mock_gcs_hoo object_name=PATH, gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN, + delegate_to=DELEGATE_TO, ) op.execute(None) mock_sheet_hook.assert_called_once_with( gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, impersonation_chain=IMPERSONATION_CHAIN, ) mock_gcs_hook.assert_called_once_with( From 4afaf1f5cc9f26b1bf29abbdca00eefa9ff7cf52 Mon Sep 17 00:00:00 2001 From: Emery Finkelstein Date: Fri, 8 Mar 2024 10:19:41 +1100 Subject: [PATCH 2/2] Add newsfragment --- newsfragments/37925.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 newsfragments/37925.bugfix diff --git a/newsfragments/37925.bugfix b/newsfragments/37925.bugfix new file mode 100644 index 0000000000000..b0511a3905dd3 --- /dev/null +++ b/newsfragments/37925.bugfix @@ -0,0 +1 @@ +Restores `delegate_to` argument of Google Cloud to Google Drive Transfer Operators, to only be used in the Drive Hook