diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py index 7aebbe1b6850c..ccb7e4e485f4e 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_sftp.py @@ -80,7 +80,9 @@ class GCSToSFTPOperator(BaseOperator): :param destination_path: The sftp remote path. This is the specified directory path for uploading to the SFTP server. :param keep_directory_structure: (Optional) When set to False the path of the file - on the bucket is recreated within path passed in destination_path. + on the bucket is recreated within path passed in destination_path. + :param create_intermediate_dirs: (Optional) When set to True the intermediate directories + in the specified file path will be created. :param move_object: When move object is True, the object is moved instead of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. @@ -112,6 +114,7 @@ def __init__( source_object: str, destination_path: str, keep_directory_structure: bool = True, + create_intermediate_dirs: bool = True, move_object: bool = False, gcp_conn_id: str = "google_cloud_default", sftp_conn_id: str = "ssh_default", @@ -124,6 +127,7 @@ def __init__( self.source_object = source_object self.destination_path = destination_path self.keep_directory_structure = keep_directory_structure + self.create_intermediate_dirs = create_intermediate_dirs self.move_object = move_object self.gcp_conn_id = gcp_conn_id self.sftp_conn_id = sftp_conn_id @@ -190,7 +194,9 @@ def _copy_single_object( ) dir_path = os.path.dirname(destination_path) - sftp_hook.create_directory(dir_path) + + if self.create_intermediate_dirs: + sftp_hook.create_directory(dir_path) with NamedTemporaryFile("w") as tmp: gcs_hook.download( diff --git a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py index a880d3eead00f..2e8a7a0f24ccd 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_sftp.py @@ -400,3 +400,49 @@ def test_get_openlineage_facets( assert result.inputs[0].name == expected_source assert result.outputs[0].namespace == "file://11.222.33.44:22" assert result.outputs[0].name == expected_destination + + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.SFTPHook") + def test_create_intermediate_dirs_true(self, sftp_hook_mock, gcp_hook_mock): + task = GCSToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="folder/test_object.txt", # Hard-coding + destination_path=DESTINATION_SFTP, + keep_directory_structure=True, # Hard-coding + create_intermediate_dirs=True, + move_object=False, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + assert task.create_intermediate_dirs + + task.execute(None) + + sftp_hook_mock.return_value.create_directory.assert_called_once_with( + os.path.join(DESTINATION_SFTP, "folder") + ) + + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.GCSHook") + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_sftp.SFTPHook") + def test_create_intermediate_dirs_false(self, sftp_hook_mock, gcp_hook_mock): + task = GCSToSFTPOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object="folder/test_object.txt", # Hard-coding + destination_path=DESTINATION_SFTP, + keep_directory_structure=True, # Hard-coding + create_intermediate_dirs=False, + move_object=False, + gcp_conn_id=GCP_CONN_ID, + sftp_conn_id=SFTP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + assert not task.create_intermediate_dirs + + task.execute(None) + + sftp_hook_mock.return_value.create_directory.assert_not_called()